feat(controller): complete reaper, health checks, row lifecycle
All checks were successful
build / fmt (push) Successful in 49s
build / clippy (push) Successful in 2m41s
build / check (push) Successful in 3m13s
build / test (push) Successful in 3m34s

Reaper now implements all planned rules:
- Orphaned Gitea registration: deregisters ephemeral gf-* runners
  from Gitea that have no matching active DB row
- Silent runner: terminates runners in 'running' state with no
  update for 10 minutes
- Reaped state: transitions completed/failed runners to 'reaped'
  after 5 minutes, deletes reaped rows after retention period
  (configurable, default 7 days)

Host health:
- check_host_health probes stale hosts via agent health endpoint
- Marks unresponsive hosts as drained (excluded from placement)
- Hosts that respond to the probe get their timestamp refreshed

Brew loop:
- max_per_labelset clamp (configurable, default 10) prevents
  unbounded runner spawning
- Passes BrewConfig through to reaper and health check functions

Config (BrewConfig):
- Added max_per_labelset, host_timeout_secs, runner_retention_days

Removed all #[allow(dead_code)] annotations — every field and
method is now consumed. Removed unused GiteaRunnerLabel struct
and GiteaRunner fields (busy, labels) that were only needed for
deserialization.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-05 11:36:25 +03:00
parent f0b6e32ad5
commit 48436684c9
5 changed files with 209 additions and 44 deletions

View File

@@ -77,7 +77,6 @@ impl AgentClient {
}
/// Health check an agent.
#[allow(dead_code)]
pub async fn health(&self, agent_endpoint: &str) -> anyhow::Result<HealthCheckResponse> {
let url = format!("{agent_endpoint}/v1/health");
let resp = self

View File

@@ -8,6 +8,7 @@ use uuid::Uuid;
use gongfoo_proto::{SpawnRequest, TerminateRequest};
use crate::agent_client::AgentClient;
use crate::config::BrewConfig;
use crate::gitea::GiteaClient;
use crate::placement;
@@ -22,15 +23,15 @@ pub async fn run(
pool: &PgPool,
gitea: &GiteaClient,
agent: &AgentClient,
interval: Duration,
config: &BrewConfig,
) -> anyhow::Result<()> {
let mut tick = time::interval(interval);
let mut tick = time::interval(Duration::from_secs(config.interval_secs));
let mut spawn_failures: HashMap<Uuid, Instant> = HashMap::new();
loop {
tick.tick().await;
if let Err(e) = brew_once(pool, gitea, agent, &mut spawn_failures).await {
if let Err(e) = brew_once(pool, gitea, agent, config, &mut spawn_failures).await {
tracing::error!("brew loop iteration failed: {e}");
}
}
@@ -41,6 +42,7 @@ async fn brew_once(
pool: &PgPool,
gitea: &GiteaClient,
agent: &AgentClient,
config: &BrewConfig,
spawn_failures: &mut HashMap<Uuid, Instant>,
) -> anyhow::Result<()> {
// Step 1: Poll Gitea for queued jobs, grouped by label set.
@@ -77,7 +79,7 @@ async fn brew_once(
// Step 4: For each label set, compute desired vs current, spawn if needed.
for (label_set, queued_count) in &snapshot.by_label_set {
let current = active_counts.get(label_set.as_str()).copied().unwrap_or(0);
let desired = (*queued_count as i64) + 1; // buffer of 1
let desired = ((*queued_count as i64) + 1).min(config.max_per_labelset);
let delta = desired - current;
if delta <= 0 {
@@ -151,15 +153,17 @@ async fn brew_once(
// Step 5: Confirm started→registered transitions via Gitea.
confirm_registrations(pool, gitea).await?;
// Step 6: Reaper rules.
reap_stale(pool, agent).await?;
// Step 6: Health-check agents and mark silent hosts.
check_host_health(pool, agent, config).await?;
// Step 7: Reaper rules.
reap_stale(pool, agent, gitea, config).await?;
Ok(())
}
/// Check whether the last N observations for this label set all show
/// queued_count exceeding current_runners. Returns false if fewer than
/// N observations exist (we haven't been watching long enough).
/// queued_count exceeding current_runners.
async fn should_scale_up(
pool: &PgPool,
label_set: &str,
@@ -179,23 +183,19 @@ async fn should_scale_up(
.fetch_all(pool)
.await?;
// Not enough observations yet — don't scale.
if (recent.len() as i64) < HYSTERESIS_WINDOW {
return Ok(false);
}
// All observations must show queued > current.
Ok(recent
.iter()
.all(|(count,)| (*count as i64) > current_runners))
}
/// Image row from the database.
#[allow(dead_code)]
struct ImageRow {
id: Uuid,
image_ref: String,
labels: Vec<String>,
cpu_request: i32,
mem_request_mb: i32,
}
@@ -203,9 +203,9 @@ struct ImageRow {
/// Find a runner image whose labels are a superset of the requested labels.
async fn find_image_for_labels(pool: &PgPool, labels: &[&str]) -> anyhow::Result<Option<ImageRow>> {
let label_vec: Vec<String> = labels.iter().map(|s| s.to_string()).collect();
let row = sqlx::query_as::<_, (Uuid, String, Vec<String>, i32, i32)>(
let row = sqlx::query_as::<_, (Uuid, String, i32, i32)>(
r#"
SELECT id, image_ref, labels, cpu_request, mem_request_mb
SELECT id, image_ref, cpu_request, mem_request_mb
FROM runner_images
WHERE labels @> $1
LIMIT 1
@@ -215,15 +215,14 @@ async fn find_image_for_labels(pool: &PgPool, labels: &[&str]) -> anyhow::Result
.fetch_optional(pool)
.await?;
Ok(row.map(
|(id, image_ref, labels, cpu_request, mem_request_mb)| ImageRow {
Ok(
row.map(|(id, image_ref, cpu_request, mem_request_mb)| ImageRow {
id,
image_ref,
labels,
cpu_request,
mem_request_mb,
},
))
}),
)
}
/// Spawn one runner: pick host, get registration token, insert row, call agent.
@@ -406,6 +405,55 @@ async fn confirm_registrations(pool: &PgPool, gitea: &GiteaClient) -> anyhow::Re
Ok(())
}
/// Health-check agents and mark hosts with stale heartbeats as drained.
async fn check_host_health(
pool: &PgPool,
agent: &AgentClient,
config: &BrewConfig,
) -> anyhow::Result<()> {
// Find hosts whose last heartbeat is older than the timeout.
let stale_hosts = sqlx::query_as::<_, (Uuid, String, String)>(
r#"
SELECT id, hostname, agent_endpoint
FROM hosts
WHERE NOT is_drained
AND updated_at < now() - make_interval(secs => $1::float8)
"#,
)
.bind(config.host_timeout_secs as f64)
.fetch_all(pool)
.await?;
for (host_id, hostname, endpoint) in &stale_hosts {
// Try an active health check before giving up.
match agent.health(endpoint).await {
Ok(resp) if resp.healthy => {
// Host is actually fine — update timestamp.
sqlx::query("UPDATE hosts SET updated_at = now() WHERE id = $1")
.bind(host_id)
.execute(pool)
.await?;
tracing::debug!(host = %hostname, "stale host responded to health check, updated");
}
_ => {
tracing::warn!(
host = %hostname,
host_id = %host_id,
timeout_secs = config.host_timeout_secs,
"host unresponsive, excluding from placement"
);
// Mark drained so placement skips it. Operator must undrain manually.
sqlx::query("UPDATE hosts SET is_drained = true, updated_at = now() WHERE id = $1")
.bind(host_id)
.execute(pool)
.await?;
}
}
}
Ok(())
}
/// Count active (non-terminal) runners per label set.
async fn active_runner_counts(
pool: &PgPool,
@@ -440,9 +488,14 @@ async fn record_observation(
Ok(())
}
/// Reaper: detect and handle stuck runners.
async fn reap_stale(pool: &PgPool, agent: &AgentClient) -> anyhow::Result<()> {
// Stuck spawning > 60s
/// Reaper: detect and handle stuck runners, orphaned registrations, and stale rows.
async fn reap_stale(
pool: &PgPool,
agent: &AgentClient,
gitea: &GiteaClient,
config: &BrewConfig,
) -> anyhow::Result<()> {
// --- Stuck spawning > 60s ---
let stuck_spawning = sqlx::query_as::<_, (Uuid, String, String)>(
r#"
SELECT r.id, r.container_id, h.agent_endpoint
@@ -472,7 +525,7 @@ async fn reap_stale(pool: &PgPool, agent: &AgentClient) -> anyhow::Result<()> {
mark_failed(pool, *runner_id, "reaped: stuck spawning > 60s").await?;
}
// Stuck started > 60s (container up but never registered in Gitea)
// --- Stuck started > 60s (container up but never registered in Gitea) ---
let stuck_started = sqlx::query_as::<_, (Uuid, String, String)>(
r#"
SELECT r.id, r.container_id, h.agent_endpoint
@@ -502,7 +555,7 @@ async fn reap_stale(pool: &PgPool, agent: &AgentClient) -> anyhow::Result<()> {
mark_failed(pool, *runner_id, "reaped: stuck started > 60s").await?;
}
// Stuck registered > 30min (registered but never picked up a job)
// --- Stuck registered > 30min (registered but never picked up a job) ---
let stuck_registered = sqlx::query_as::<_, (Uuid, String, String)>(
r#"
SELECT r.id, r.container_id, h.agent_endpoint
@@ -532,7 +585,111 @@ async fn reap_stale(pool: &PgPool, agent: &AgentClient) -> anyhow::Result<()> {
mark_failed(pool, *runner_id, "reaped: stuck registered > 30min").await?;
}
// Prune old queue observations (> 1 hour)
// --- Silent runner: running but no update for 10 minutes ---
let silent_runners = sqlx::query_as::<_, (Uuid, String, String)>(
r#"
SELECT r.id, r.container_id, h.agent_endpoint
FROM runners r
JOIN hosts h ON h.id = r.host_id
WHERE r.state = 'running'
AND r.updated_at < now() - interval '10 minutes'
AND r.container_id IS NOT NULL
"#,
)
.fetch_all(pool)
.await?;
for (runner_id, container_id, endpoint) in &silent_runners {
tracing::warn!(runner_id = %runner_id, "reaping silent runner (no update > 10min)");
let _ = agent
.terminate(
endpoint,
&TerminateRequest {
runner_id: *runner_id,
container_id: container_id.clone(),
force: true,
reason: "silent runner, no update > 10min".to_owned(),
},
)
.await;
mark_failed(pool, *runner_id, "reaped: silent runner > 10min").await?;
}
// --- Orphaned Gitea registrations ---
// Runners in Gitea that have no matching active row in our DB.
if let Ok(gitea_runners) = gitea.list_runners().await {
let our_active_gitea_ids: Vec<(i64,)> = sqlx::query_as(
r#"
SELECT gitea_runner_id FROM runners
WHERE gitea_runner_id IS NOT NULL
AND state IN ('started', 'registered', 'running')
"#,
)
.fetch_all(pool)
.await?;
let active_ids: std::collections::HashSet<i64> =
our_active_gitea_ids.iter().map(|(id,)| *id).collect();
for gr in &gitea_runners {
if gr.ephemeral && !active_ids.contains(&gr.id) {
// Check if it's one of ours by name prefix.
if gr.name.starts_with("gf-") {
tracing::warn!(
gitea_runner_id = gr.id,
name = %gr.name,
status = %gr.status,
"deregistering orphaned Gitea runner"
);
if let Err(e) = gitea.delete_runner(gr.id).await {
tracing::warn!(
gitea_runner_id = gr.id,
"failed to delete orphaned runner: {e}"
);
}
}
}
}
}
// --- Transition completed/failed → reaped ---
let reaped = sqlx::query(
r#"
UPDATE runners
SET state = 'reaped', updated_at = now()
WHERE state IN ('completed', 'failed')
AND completed_at < now() - interval '5 minutes'
"#,
)
.execute(pool)
.await?
.rows_affected();
if reaped > 0 {
tracing::info!(
count = reaped,
"transitioned completed/failed runners to reaped"
);
}
// --- Delete old reaped rows ---
let deleted = sqlx::query(
r#"
DELETE FROM runners
WHERE state = 'reaped'
AND updated_at < now() - make_interval(days => $1::int)
"#,
)
.bind(config.runner_retention_days as i32)
.execute(pool)
.await?
.rows_affected();
if deleted > 0 {
tracing::info!(count = deleted, "deleted old reaped runner rows");
}
// --- Prune old queue observations (> 1 hour) ---
let pruned =
sqlx::query("DELETE FROM queue_observations WHERE observed_at < now() - interval '1 hour'")
.execute(pool)

View File

@@ -71,17 +71,41 @@ pub struct TlsConfig {
pub ca: PathBuf,
}
#[derive(Debug, Deserialize)]
#[derive(Debug, Clone, Copy, Deserialize)]
pub struct BrewConfig {
/// Brew loop interval in seconds.
#[serde(default = "default_brew_interval")]
pub interval_secs: u64,
/// Maximum runners per label set.
#[serde(default = "default_max_per_labelset")]
pub max_per_labelset: i64,
/// Seconds before a silent host is considered unhealthy.
#[serde(default = "default_host_timeout")]
pub host_timeout_secs: u64,
/// Days to keep completed/failed runner rows before deleting.
#[serde(default = "default_runner_retention_days")]
pub runner_retention_days: u32,
}
fn default_brew_interval() -> u64 {
5
}
fn default_max_per_labelset() -> i64 {
10
}
fn default_host_timeout() -> u64 {
120
}
fn default_runner_retention_days() -> u32 {
7
}
impl Config {
pub fn load(path: &std::path::Path) -> anyhow::Result<Self> {
let text = std::fs::read_to_string(path)

View File

@@ -20,25 +20,12 @@ pub struct QueueSnapshot {
/// A runner registered in Gitea (from admin API).
#[derive(Debug, Deserialize)]
#[allow(dead_code)]
pub struct GiteaRunner {
pub id: i64,
pub name: String,
pub status: String,
pub busy: bool,
#[serde(default)]
pub ephemeral: bool,
#[serde(default)]
pub labels: Vec<GiteaRunnerLabel>,
}
#[derive(Debug, Deserialize)]
#[allow(dead_code)]
pub struct GiteaRunnerLabel {
pub id: i64,
pub name: String,
#[serde(rename = "type")]
pub label_type: Option<String>,
}
#[derive(Debug, Deserialize)]
@@ -306,7 +293,6 @@ impl GiteaClient {
}
/// Delete a runner from Gitea by its Gitea runner ID.
#[allow(dead_code)]
pub async fn delete_runner(&self, runner_id: i64) -> anyhow::Result<()> {
let status = self
.client

View File

@@ -9,7 +9,6 @@ mod tls;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use axum::Router;
use axum::routing::post;
@@ -91,15 +90,15 @@ async fn main() -> anyhow::Result<()> {
sd_notify_ready();
// Run the brew loop and event receiver concurrently.
let interval = Duration::from_secs(config.brew.interval_secs);
tracing::info!(
"starting brew loop ({}s interval)",
config.brew.interval_secs
);
let brew_pool = pool.clone();
let brew_config = config.brew;
let brew_handle = tokio::spawn(async move {
if let Err(e) = brew::run(&brew_pool, &gitea, &agent, interval).await {
if let Err(e) = brew::run(&brew_pool, &gitea, &agent, &brew_config).await {
tracing::error!("brew loop exited with error: {e}");
}
});