Rewrites the hg worker to use json-log?rev=author() which matches the changeset author (not the pusher), capturing commits landed by sheriffs. Repos are discovered within configured groups plus individually listed repos. The worker skips entirely after the first successful backfill. Adds script/hg-ingest.sh for offline ingestion via local hg clones — clones one repo at a time, caches extracted changesets to .tsv, inserts via psql, and sets poller_state when done. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
223 lines
7.1 KiB
Rust
223 lines
7.1 KiB
Rust
use std::{sync::Arc, time::Duration};
|
|
|
|
use clap::Parser;
|
|
use moments_core::{EventSource, run_poller};
|
|
use moments_data::{
|
|
PgStore,
|
|
bugzilla::{BugzillaConfig, BugzillaSource},
|
|
gitea::{GiteaConfig, GiteaSource},
|
|
github::{GithubConfig, GithubSource},
|
|
github_search::{GithubSearchConfig, GithubSearchSource},
|
|
hg::{HgConfig, HgSource},
|
|
};
|
|
use reqwest::Client;
|
|
use tracing::info;
|
|
|
|
#[derive(Parser, Debug)]
|
|
#[command(version, about = "moments ingestion worker")]
|
|
struct Args {
|
|
#[arg(long, env = "DATABASE_URL")]
|
|
database_url: String,
|
|
|
|
#[arg(long, env = "GITHUB_USER", default_value = "grenade")]
|
|
github_user: String,
|
|
|
|
/// Optional GitHub token. Higher rate limit and access to private events.
|
|
#[arg(long, env = "GITHUB_TOKEN")]
|
|
github_token: Option<String>,
|
|
|
|
/// Seconds between Events-API polls (live feed, last 90 days).
|
|
#[arg(long, env = "POLL_INTERVAL_SECS", default_value = "600")]
|
|
interval_secs: u64,
|
|
|
|
/// Seconds between Search-API polls (historical issue/PR backfill).
|
|
/// Defaults to 24h — this is a backfill, not a live feed.
|
|
#[arg(long, env = "SEARCH_POLL_INTERVAL_SECS", default_value = "86400")]
|
|
search_interval_secs: u64,
|
|
|
|
#[arg(long, env = "GITEA_HOST", default_value = "git.lair.cafe")]
|
|
gitea_host: String,
|
|
|
|
#[arg(long, env = "GITEA_USER", default_value = "grenade")]
|
|
gitea_user: String,
|
|
|
|
#[arg(long, env = "GITEA_TOKEN")]
|
|
gitea_token: Option<String>,
|
|
|
|
/// Seconds between Gitea activity-feed polls.
|
|
#[arg(long, env = "GITEA_POLL_INTERVAL_SECS", default_value = "600")]
|
|
gitea_interval_secs: u64,
|
|
|
|
#[arg(long, env = "HG_HOST", default_value = "hg-edge.mozilla.org")]
|
|
hg_host: String,
|
|
|
|
/// Comma-separated repo groups to scan. Repos within each group are
|
|
/// discovered via `/{group}/?style=json`.
|
|
#[arg(
|
|
long,
|
|
env = "HG_GROUPS",
|
|
value_delimiter = ',',
|
|
default_value = "build,integration"
|
|
)]
|
|
hg_groups: Vec<String>,
|
|
|
|
/// Comma-separated individual repos to scan (e.g. `mozilla-central`).
|
|
#[arg(
|
|
long,
|
|
env = "HG_REPOS",
|
|
value_delimiter = ',',
|
|
default_value = "mozilla-central"
|
|
)]
|
|
hg_repos: Vec<String>,
|
|
|
|
/// Comma-separated author substrings for `author()` revset queries.
|
|
#[arg(
|
|
long,
|
|
env = "HG_AUTHOR_TERMS",
|
|
value_delimiter = ',',
|
|
default_value = "rthijssen,grenade"
|
|
)]
|
|
hg_author_terms: Vec<String>,
|
|
|
|
/// Seconds between hg pushlog scans (defaults to 24h — historical data).
|
|
#[arg(long, env = "HG_POLL_INTERVAL_SECS", default_value = "86400")]
|
|
hg_interval_secs: u64,
|
|
|
|
#[arg(long, env = "BUGZILLA_HOST", default_value = "bugzilla.mozilla.org")]
|
|
bugzilla_host: String,
|
|
|
|
#[arg(long, env = "BUGZILLA_EMAIL", default_value = "rthijssen@mozilla.com")]
|
|
bugzilla_email: String,
|
|
|
|
/// Optional bugzilla API key. Without one, only public bugs are returned.
|
|
#[arg(long, env = "BUGZILLA_API_KEY")]
|
|
bugzilla_api_key: Option<String>,
|
|
|
|
/// Seconds between bugzilla creator-query polls (defaults to 24h).
|
|
#[arg(long, env = "BUGZILLA_POLL_INTERVAL_SECS", default_value = "86400")]
|
|
bugzilla_interval_secs: u64,
|
|
}
|
|
|
|
#[tokio::main]
|
|
async fn main() -> anyhow::Result<()> {
|
|
init_tracing();
|
|
let args = Args::parse();
|
|
|
|
let store = Arc::new(PgStore::connect(&args.database_url).await?);
|
|
store.migrate().await?;
|
|
|
|
let http = Client::builder()
|
|
.timeout(Duration::from_secs(30))
|
|
.build()?;
|
|
|
|
let github = Arc::new(GithubSource::new(
|
|
http.clone(),
|
|
store.clone(),
|
|
store.clone(),
|
|
GithubConfig {
|
|
user: args.github_user.clone(),
|
|
token: args.github_token.clone(),
|
|
per_page: 100,
|
|
},
|
|
)) as Arc<dyn EventSource>;
|
|
|
|
let github_search = Arc::new(GithubSearchSource::new(
|
|
http.clone(),
|
|
store.clone(),
|
|
store.clone(),
|
|
GithubSearchConfig {
|
|
user: args.github_user.clone(),
|
|
token: args.github_token.clone(),
|
|
..Default::default()
|
|
},
|
|
)) as Arc<dyn EventSource>;
|
|
|
|
let gitea = Arc::new(GiteaSource::new(
|
|
http.clone(),
|
|
store.clone(),
|
|
store.clone(),
|
|
GiteaConfig {
|
|
host: args.gitea_host.clone(),
|
|
user: args.gitea_user.clone(),
|
|
token: args.gitea_token.clone(),
|
|
..Default::default()
|
|
},
|
|
)) as Arc<dyn EventSource>;
|
|
|
|
let hg = Arc::new(HgSource::new(
|
|
http.clone(),
|
|
store.clone(),
|
|
store.clone(),
|
|
HgConfig {
|
|
host: args.hg_host.clone(),
|
|
author_terms: args.hg_author_terms.clone(),
|
|
groups: args.hg_groups.clone(),
|
|
repos: args.hg_repos.clone(),
|
|
},
|
|
)) as Arc<dyn EventSource>;
|
|
|
|
let bugzilla = Arc::new(BugzillaSource::new(
|
|
http.clone(),
|
|
store.clone(),
|
|
store.clone(),
|
|
BugzillaConfig {
|
|
host: args.bugzilla_host.clone(),
|
|
creator_email: args.bugzilla_email.clone(),
|
|
api_key: args.bugzilla_api_key.clone(),
|
|
..Default::default()
|
|
},
|
|
)) as Arc<dyn EventSource>;
|
|
|
|
info!(
|
|
github_user = args.github_user,
|
|
gitea_host = args.gitea_host,
|
|
gitea_user = args.gitea_user,
|
|
hg_host = args.hg_host,
|
|
hg_groups = ?args.hg_groups,
|
|
hg_repos = ?args.hg_repos,
|
|
hg_author_terms = ?args.hg_author_terms,
|
|
bugzilla_host = args.bugzilla_host,
|
|
bugzilla_email = args.bugzilla_email,
|
|
events_interval_secs = args.interval_secs,
|
|
search_interval_secs = args.search_interval_secs,
|
|
gitea_interval_secs = args.gitea_interval_secs,
|
|
hg_interval_secs = args.hg_interval_secs,
|
|
bugzilla_interval_secs = args.bugzilla_interval_secs,
|
|
"worker started"
|
|
);
|
|
|
|
let interval = Duration::from_secs(args.interval_secs);
|
|
let search_interval = Duration::from_secs(args.search_interval_secs);
|
|
let gitea_interval = Duration::from_secs(args.gitea_interval_secs);
|
|
let hg_interval = Duration::from_secs(args.hg_interval_secs);
|
|
let bugzilla_interval = Duration::from_secs(args.bugzilla_interval_secs);
|
|
|
|
let github_task = tokio::spawn(async move { run_poller(github, interval).await });
|
|
let github_search_task =
|
|
tokio::spawn(async move { run_poller(github_search, search_interval).await });
|
|
let gitea_task = tokio::spawn(async move { run_poller(gitea, gitea_interval).await });
|
|
let hg_task = tokio::spawn(async move { run_poller(hg, hg_interval).await });
|
|
let bugzilla_task =
|
|
tokio::spawn(async move { run_poller(bugzilla, bugzilla_interval).await });
|
|
|
|
tokio::signal::ctrl_c().await?;
|
|
info!("shutdown signal received");
|
|
github_task.abort();
|
|
github_search_task.abort();
|
|
gitea_task.abort();
|
|
hg_task.abort();
|
|
bugzilla_task.abort();
|
|
Ok(())
|
|
}
|
|
|
|
fn init_tracing() {
|
|
use tracing_subscriber::{EnvFilter, fmt};
|
|
let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
|
|
let json = std::env::var("JOURNAL_STREAM").is_ok();
|
|
if json {
|
|
fmt().with_env_filter(filter).json().init();
|
|
} else {
|
|
fmt().with_env_filter(filter).init();
|
|
}
|
|
}
|