feat: add seed peer discovery via --peer flag
All checks were successful
CI / fmt (push) Successful in 28s
CI / clippy (push) Successful in 2m19s
CI / test (push) Successful in 3m12s
CI / dashboard (push) Successful in 39s

Nodes without OPNsense credentials can bootstrap their peer database by
fetching peer lists from known seed peers. Usage:

  cichlid --peer roosta.hanzalova.internal --peer other.host

The seed peer discovery task fetches /peers from each seed peer and any
full-trust peers already in the local database, then independently TLS
probes and classifies trust for each discovered peer before upserting.

Extracts the shared probe+enrich+upsert logic from peer_discovery into
a reusable function used by both discovery tasks.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-12 18:16:39 +03:00
parent ad780d31fd
commit da22420bb6
6 changed files with 189 additions and 11 deletions

View File

@@ -137,6 +137,10 @@ struct CliArgs {
/// Disable periodic update check
#[arg(long, default_value_t = false)]
no_update_check: bool,
/// Seed peer hostname(s) for bootstrapping peer discovery
#[arg(long = "peer")]
peers: Vec<String>,
}
#[tokio::main]
@@ -190,6 +194,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
ilo_interval_secs: args.ilo_interval,
update_check_interval_secs: args.update_check_interval,
no_update_check: args.no_update_check,
seed_peers: args.peers,
});
let app = Router::new()

View File

@@ -4,7 +4,7 @@ use std::sync::Arc;
use std::time::Duration;
use tracing::{error, info};
use crate::tasks::{ilo_discovery, peer_discovery, update_check};
use crate::tasks::{ilo_discovery, peer_discovery, seed_peer_discovery, update_check};
pub struct Config {
pub db: Arc<Db>,
@@ -14,6 +14,7 @@ pub struct Config {
pub ilo_interval_secs: u64,
pub update_check_interval_secs: u64,
pub no_update_check: bool,
pub seed_peers: Vec<String>,
}
pub fn start(config: Config) {
@@ -36,6 +37,19 @@ pub fn start(config: Config) {
}
});
// Seed peer discovery task (runs after a short delay to let OPNsense discovery go first)
let db = config.db.clone();
let ca_cert_path = config.ca_cert_path.clone();
let listen_port = config.listen_port;
let seed_peers = config.seed_peers.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(10)).await;
loop {
seed_peer_discovery::run(&db, &ca_cert_path, listen_port, &seed_peers).await;
tokio::time::sleep(peer_interval).await;
}
});
// iLO discovery task (runs after a short delay to let peer discovery populate hosts first)
let db = config.db.clone();
tokio::spawn(async move {

View File

@@ -1,3 +1,4 @@
pub mod ilo_discovery;
pub mod peer_discovery;
pub mod seed_peer_discovery;
pub mod update_check;

View File

@@ -77,11 +77,19 @@ pub async fn run(db: &Db, ca_cert_path: &Path, listen_port: u16) {
peer_sites.len() + 1
);
// Probe all peers for TLS certs and services in parallel
probe_and_upsert(&mut all_peers, db, ca_cert_path, listen_port).await;
}
pub async fn probe_and_upsert(
peers: &mut [CichlidHost],
db: &Db,
ca_cert_path: &Path,
listen_port: u16,
) {
let api_port = listen_port;
let mut probe_handles = Vec::new();
let mut service_handles = Vec::new();
for (idx, peer) in all_peers.iter().enumerate() {
for (idx, peer) in peers.iter().enumerate() {
let ip = peer.ip_address;
let ca_path = ca_cert_path.to_path_buf();
probe_handles.push((
@@ -94,7 +102,6 @@ pub async fn run(db: &Db, ca_cert_path: &Path, listen_port: u16) {
));
}
// Enrich with cert info and trust
for (idx, handle) in probe_handles {
match handle.await {
Ok((results, health_ok)) => {
@@ -110,28 +117,26 @@ pub async fn run(db: &Db, ca_cert_path: &Path, listen_port: u16) {
})
.collect();
let trust = classify_trust(&certs, api_port, health_ok);
all_peers[idx].certs = certs;
all_peers[idx].trust_level = Some(trust);
peers[idx].certs = certs;
peers[idx].trust_level = Some(trust);
}
Err(e) => warn!("Probe task failed: {}", e),
}
}
// Enrich with services
for (idx, handle) in service_handles {
match handle.await {
Ok(services) if !services.is_empty() => {
all_peers[idx].services = services;
peers[idx].services = services;
}
Ok(_) => {}
Err(e) => warn!("Service probe task failed: {}", e),
}
}
// Write all peers to database
let conn = db.conn();
let mut upserted = 0u64;
for peer in &all_peers {
for peer in peers.iter() {
if let Err(e) = hosts::upsert(conn, peer).await {
warn!("Failed to upsert host {}: {}", peer.hostname, e);
} else {
@@ -139,7 +144,7 @@ pub async fn run(db: &Db, ca_cert_path: &Path, listen_port: u16) {
}
}
info!("Peer discovery complete: {} hosts upserted", upserted);
info!("Probe complete: {} hosts upserted", upserted);
}
async fn fetch_leases_from_router(

View File

@@ -0,0 +1,136 @@
use cichlid_data::Db;
use cichlid_data::queries::hosts;
use cichlid_model::CichlidHost;
use serde::Deserialize;
use std::collections::HashSet;
use std::path::Path;
use std::time::Duration;
use tracing::{info, warn};
use super::peer_discovery;
#[derive(Deserialize)]
struct PeersResponse {
peers: Vec<CichlidHost>,
}
pub async fn run(db: &Db, ca_cert_path: &Path, listen_port: u16, seed_peers: &[String]) {
// Build source list: explicit seeds + full-trust peers from DB
let mut sources: Vec<String> = seed_peers.to_vec();
match hosts::list_full_trust(db.conn()).await {
Ok(trusted) => {
for (hostname, _ip) in trusted {
if !sources.iter().any(|s| s == &hostname) {
sources.push(hostname);
}
}
}
Err(e) => warn!("Failed to query full-trust peers: {}", e),
}
if sources.is_empty() {
return;
}
info!("Seed peer discovery: querying {} source(s)", sources.len());
// Build HTTPS client with CA cert
let ca_pem = match std::fs::read(ca_cert_path) {
Ok(pem) => pem,
Err(e) => {
warn!("Failed to read CA cert for seed peer fetch: {}", e);
return;
}
};
let ca_cert = match reqwest::Certificate::from_pem(&ca_pem) {
Ok(c) => c,
Err(e) => {
warn!("Failed to parse CA cert: {}", e);
return;
}
};
let client = match reqwest::Client::builder()
.add_root_certificate(ca_cert)
.timeout(Duration::from_secs(10))
.build()
{
Ok(c) => c,
Err(e) => {
warn!("Failed to build HTTP client: {}", e);
return;
}
};
// Fetch /peers from each source concurrently
let mut fetch_handles = Vec::new();
for source in &sources {
let url = format!("https://{}:{}/peers", source, listen_port);
let client = client.clone();
let source_name = source.clone();
fetch_handles.push(tokio::spawn(async move {
match client.get(&url).send().await {
Ok(resp) if resp.status().is_success() => {
match resp.json::<PeersResponse>().await {
Ok(body) => {
info!("Fetched {} peers from {}", body.peers.len(), source_name);
body.peers
}
Err(e) => {
warn!("Failed to parse peers from {}: {}", source_name, e);
vec![]
}
}
}
Ok(resp) => {
warn!("Peer fetch from {} returned {}", source_name, resp.status());
vec![]
}
Err(e) => {
warn!("Failed to reach {}: {}", source_name, e);
vec![]
}
}
}));
}
let mut all_peers: Vec<CichlidHost> = Vec::new();
for handle in fetch_handles {
match handle.await {
Ok(peers) => all_peers.extend(peers),
Err(e) => warn!("Seed fetch task panicked: {}", e),
}
}
if all_peers.is_empty() {
return;
}
// Deduplicate by (mac_address, gateway) and filter out self
let local_hostname = std::env::var("HOSTNAME").unwrap_or_default();
let mut seen = HashSet::new();
all_peers.retain(|peer| {
if peer.hostname == local_hostname {
return false;
}
let key = (
peer.mac_address.clone(),
peer.gateway.map(|g| g.to_string()),
);
seen.insert(key)
});
// Strip trust data — we'll re-establish it independently
for peer in &mut all_peers {
peer.certs.clear();
peer.trust_level = None;
peer.services.clear();
}
info!(
"Seed peer discovery: probing {} unique peers",
all_peers.len()
);
peer_discovery::probe_and_upsert(&mut all_peers, db, ca_cert_path, listen_port).await;
}

View File

@@ -125,6 +125,23 @@ pub async fn list(conn: &Connection) -> Result<Vec<CichlidHost>, libsql::Error>
Ok(hosts)
}
pub async fn list_full_trust(conn: &Connection) -> Result<Vec<(String, String)>, libsql::Error> {
let mut rows = conn
.query(
"SELECT hostname, ip_address FROM hosts WHERE trust_level = 'full'",
(),
)
.await?;
let mut peers = Vec::new();
while let Some(row) = rows.next().await? {
let hostname: String = row.get(0)?;
let ip: String = row.get(1)?;
peers.push((hostname, ip));
}
Ok(peers)
}
pub async fn delete_stale(conn: &Connection, older_than: &str) -> Result<u64, libsql::Error> {
conn.execute(
"DELETE FROM hosts WHERE updated_at < ?1",