Files
moments/crates/moments-worker/src/main.rs
rob thijssen a71b4e6b84 feat(github): per-repo commit enumeration for full history backfill
Adds a new github-repo EventSource that enumerates all repos via
/user/repos and walks each repo's /commits?author= endpoint, which
has no 1000-result cap unlike the Search API. Events use the same
github-commit:{sha} ID scheme as github_search for dedup. Per-repo
poller state enables full backfill on first run, page-1-only on
subsequent polls. Weekly poll interval by default.

Closes #1

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-05 14:59:26 +03:00

245 lines
8.0 KiB
Rust

use std::{sync::Arc, time::Duration};
use clap::Parser;
use moments_core::{EventSource, run_poller};
use moments_data::{
PgStore,
bugzilla::{BugzillaConfig, BugzillaSource},
gitea::{GiteaConfig, GiteaSource},
github::{GithubConfig, GithubSource},
github_repo::{GithubRepoConfig, GithubRepoSource},
github_search::{GithubSearchConfig, GithubSearchSource},
hg::{HgConfig, HgSource},
};
use reqwest::Client;
use tracing::info;
#[derive(Parser, Debug)]
#[command(version, about = "moments ingestion worker")]
struct Args {
#[arg(long, env = "DATABASE_URL")]
database_url: String,
#[arg(long, env = "GITHUB_USER", default_value = "grenade")]
github_user: String,
/// Optional GitHub token. Higher rate limit and access to private events.
#[arg(long, env = "GITHUB_TOKEN")]
github_token: Option<String>,
/// Seconds between Events-API polls (live feed, last 90 days).
#[arg(long, env = "POLL_INTERVAL_SECS", default_value = "600")]
interval_secs: u64,
/// Seconds between Search-API polls (historical issue/PR backfill).
/// Defaults to 24h — this is a backfill, not a live feed.
#[arg(long, env = "SEARCH_POLL_INTERVAL_SECS", default_value = "86400")]
search_interval_secs: u64,
/// Seconds between per-repo commit enumeration polls (full history backfill).
/// Defaults to weekly — expensive initial scan, cheap afterwards.
#[arg(long, env = "REPO_POLL_INTERVAL_SECS", default_value = "604800")]
repo_interval_secs: u64,
#[arg(long, env = "GITEA_HOST", default_value = "git.lair.cafe")]
gitea_host: String,
#[arg(long, env = "GITEA_USER", default_value = "grenade")]
gitea_user: String,
#[arg(long, env = "GITEA_TOKEN")]
gitea_token: Option<String>,
/// Seconds between Gitea activity-feed polls.
#[arg(long, env = "GITEA_POLL_INTERVAL_SECS", default_value = "600")]
gitea_interval_secs: u64,
#[arg(long, env = "HG_HOST", default_value = "hg-edge.mozilla.org")]
hg_host: String,
/// Comma-separated repo groups to scan. Repos within each group are
/// discovered via `/{group}/?style=json`.
#[arg(
long,
env = "HG_GROUPS",
value_delimiter = ',',
default_value = "build,integration"
)]
hg_groups: Vec<String>,
/// Comma-separated individual repos to scan (e.g. `mozilla-central`).
#[arg(
long,
env = "HG_REPOS",
value_delimiter = ',',
default_value = "mozilla-central"
)]
hg_repos: Vec<String>,
/// Comma-separated author substrings for `author()` revset queries.
#[arg(
long,
env = "HG_AUTHOR_TERMS",
value_delimiter = ',',
default_value = "rthijssen,grenade"
)]
hg_author_terms: Vec<String>,
/// Seconds between hg pushlog scans (defaults to 24h — historical data).
#[arg(long, env = "HG_POLL_INTERVAL_SECS", default_value = "86400")]
hg_interval_secs: u64,
#[arg(long, env = "BUGZILLA_HOST", default_value = "bugzilla.mozilla.org")]
bugzilla_host: String,
#[arg(long, env = "BUGZILLA_EMAIL", default_value = "rthijssen@mozilla.com")]
bugzilla_email: String,
/// Optional bugzilla API key. Without one, only public bugs are returned.
#[arg(long, env = "BUGZILLA_API_KEY")]
bugzilla_api_key: Option<String>,
/// Seconds between bugzilla creator-query polls (defaults to 24h).
#[arg(long, env = "BUGZILLA_POLL_INTERVAL_SECS", default_value = "86400")]
bugzilla_interval_secs: u64,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
init_tracing();
let args = Args::parse();
let store = Arc::new(PgStore::connect(&args.database_url).await?);
store.migrate().await?;
let http = Client::builder()
.timeout(Duration::from_secs(30))
.build()?;
let github = Arc::new(GithubSource::new(
http.clone(),
store.clone(),
store.clone(),
GithubConfig {
user: args.github_user.clone(),
token: args.github_token.clone(),
per_page: 100,
},
)) as Arc<dyn EventSource>;
let github_search = Arc::new(GithubSearchSource::new(
http.clone(),
store.clone(),
store.clone(),
GithubSearchConfig {
user: args.github_user.clone(),
token: args.github_token.clone(),
..Default::default()
},
)) as Arc<dyn EventSource>;
let github_repo = Arc::new(GithubRepoSource::new(
http.clone(),
store.clone(),
store.clone(),
GithubRepoConfig {
user: args.github_user.clone(),
token: args.github_token.clone(),
..Default::default()
},
)) as Arc<dyn EventSource>;
let gitea = Arc::new(GiteaSource::new(
http.clone(),
store.clone(),
store.clone(),
GiteaConfig {
host: args.gitea_host.clone(),
user: args.gitea_user.clone(),
token: args.gitea_token.clone(),
..Default::default()
},
)) as Arc<dyn EventSource>;
let hg = Arc::new(HgSource::new(
http.clone(),
store.clone(),
store.clone(),
HgConfig {
host: args.hg_host.clone(),
author_terms: args.hg_author_terms.clone(),
groups: args.hg_groups.clone(),
repos: args.hg_repos.clone(),
},
)) as Arc<dyn EventSource>;
let bugzilla = Arc::new(BugzillaSource::new(
http.clone(),
store.clone(),
store.clone(),
BugzillaConfig {
host: args.bugzilla_host.clone(),
creator_email: args.bugzilla_email.clone(),
api_key: args.bugzilla_api_key.clone(),
..Default::default()
},
)) as Arc<dyn EventSource>;
info!(
github_user = args.github_user,
gitea_host = args.gitea_host,
gitea_user = args.gitea_user,
hg_host = args.hg_host,
hg_groups = ?args.hg_groups,
hg_repos = ?args.hg_repos,
hg_author_terms = ?args.hg_author_terms,
bugzilla_host = args.bugzilla_host,
bugzilla_email = args.bugzilla_email,
events_interval_secs = args.interval_secs,
search_interval_secs = args.search_interval_secs,
repo_interval_secs = args.repo_interval_secs,
gitea_interval_secs = args.gitea_interval_secs,
hg_interval_secs = args.hg_interval_secs,
bugzilla_interval_secs = args.bugzilla_interval_secs,
"worker started"
);
let interval = Duration::from_secs(args.interval_secs);
let search_interval = Duration::from_secs(args.search_interval_secs);
let repo_interval = Duration::from_secs(args.repo_interval_secs);
let gitea_interval = Duration::from_secs(args.gitea_interval_secs);
let hg_interval = Duration::from_secs(args.hg_interval_secs);
let bugzilla_interval = Duration::from_secs(args.bugzilla_interval_secs);
let github_task = tokio::spawn(async move { run_poller(github, interval).await });
let github_search_task =
tokio::spawn(async move { run_poller(github_search, search_interval).await });
let github_repo_task =
tokio::spawn(async move { run_poller(github_repo, repo_interval).await });
let gitea_task = tokio::spawn(async move { run_poller(gitea, gitea_interval).await });
let hg_task = tokio::spawn(async move { run_poller(hg, hg_interval).await });
let bugzilla_task =
tokio::spawn(async move { run_poller(bugzilla, bugzilla_interval).await });
tokio::signal::ctrl_c().await?;
info!("shutdown signal received");
github_task.abort();
github_search_task.abort();
github_repo_task.abort();
gitea_task.abort();
hg_task.abort();
bugzilla_task.abort();
Ok(())
}
fn init_tracing() {
use tracing_subscriber::{EnvFilter, fmt};
let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
let json = std::env::var("JOURNAL_STREAM").is_ok();
if json {
fmt().with_env_filter(filter).json().init();
} else {
fmt().with_env_filter(filter).init();
}
}