diff --git a/asset/config/worker.env.tmpl b/asset/config/worker.env.tmpl index 78c2665..71a506c 100644 --- a/asset/config/worker.env.tmpl +++ b/asset/config/worker.env.tmpl @@ -7,6 +7,7 @@ GITHUB_USER=grenade GITHUB_TOKEN={{GITHUB_TOKEN}} POLL_INTERVAL_SECS=600 SEARCH_POLL_INTERVAL_SECS=86400 +REPO_POLL_INTERVAL_SECS=604800 GITEA_HOST=git.lair.cafe GITEA_USER=grenade diff --git a/crates/moments-data/src/github_repo.rs b/crates/moments-data/src/github_repo.rs new file mode 100644 index 0000000..42bbbab --- /dev/null +++ b/crates/moments-data/src/github_repo.rs @@ -0,0 +1,325 @@ +//! Per-repo commit enumeration for full GitHub history. +//! +//! The Search API caps at 1000 results; this source enumerates all repos +//! the user can access via `/user/repos` and walks each repo's commit +//! history via `/repos/{owner}/{repo}/commits?author={user}` — no cap. +//! +//! Events use `github-commit:{sha}` as their ID, matching the scheme in +//! `github_search`, so duplicates are resolved via idempotent upsert. +//! +//! Per-repo poller state keys (`github-repo:{owner}/{repo}`) track which +//! repos have been fully backfilled. First run paginates the full history; +//! subsequent runs fetch only page 1. + +use std::sync::Arc; + +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use moments_core::{EventSource, EventWriter, PollerStateStore, SourceError}; +use moments_entities::{Event, Source}; +use reqwest::{Client, header}; +use serde_json::Value; +use tracing::{debug, warn}; + +const SOURCE_NAME: &str = "github-repo"; +const USER_AGENT: &str = concat!( + "moments/", + env!("CARGO_PKG_VERSION"), + " (+https://rob.tn)" +); +const MAX_BACKFILL_PAGES: u32 = 100; + +#[derive(Clone, Debug)] +pub struct GithubRepoConfig { + pub user: String, + pub token: Option, + pub per_page: u32, +} + +impl Default for GithubRepoConfig { + fn default() -> Self { + Self { + user: "grenade".into(), + token: None, + per_page: 100, + } + } +} + +pub struct GithubRepoSource { + client: Client, + writer: Arc, + state: Arc, + config: GithubRepoConfig, +} + +impl GithubRepoSource { + pub fn new( + client: Client, + writer: Arc, + state: Arc, + config: GithubRepoConfig, + ) -> Self { + Self { + client, + writer, + state, + config, + } + } + + fn apply_headers(&self, mut req: reqwest::RequestBuilder) -> reqwest::RequestBuilder { + req = req + .header(header::ACCEPT, "application/vnd.github+json") + .header("X-GitHub-Api-Version", "2022-11-28") + .header(header::USER_AGENT, USER_AGENT); + if let Some(token) = &self.config.token { + req = req.header(header::AUTHORIZATION, format!("Bearer {token}")); + } + req + } + + /// Discover all repos the authenticated user can access. + async fn discover_repos(&self) -> Result, SourceError> { + if self.config.token.is_none() { + return Ok(vec![]); + } + let mut repos = Vec::new(); + for page in 1..=50 { + let url = format!( + "https://api.github.com/user/repos?affiliation=owner,collaborator,organization_member&visibility=all&per_page={}&page={}", + self.config.per_page, page + ); + let req = self.apply_headers(self.client.get(&url)); + let resp = req + .send() + .await + .map_err(|e| SourceError::Http(e.to_string()))?; + if !resp.status().is_success() { + return Err(SourceError::Http(format!("{} GET {}", resp.status(), url))); + } + let items: Vec = resp + .json() + .await + .map_err(|e| SourceError::Parse(e.to_string()))?; + if items.is_empty() { + break; + } + for item in &items { + if let Some(r) = parse_repo(item) { + repos.push(r); + } + } + if items.len() < self.config.per_page as usize { + break; + } + } + Ok(repos) + } + + /// Fetch commits for a single repo, paginating fully on first run. + async fn scan_repo(&self, repo: &Repo) -> Result { + let state_key = format!("github-repo:{}", repo.full_name); + let prior = self.state.load(&state_key).await?; + let first_run = prior.is_none(); + let max_pages = if first_run { MAX_BACKFILL_PAGES } else { 1 }; + + let mut total = 0usize; + for page in 1..=max_pages { + let url = format!( + "https://api.github.com/repos/{}/commits?author={}&per_page={}&page={}", + repo.full_name, self.config.user, self.config.per_page, page + ); + let req = self.apply_headers(self.client.get(&url)); + let resp = req + .send() + .await + .map_err(|e| SourceError::Http(e.to_string()))?; + + let status = resp.status(); + // 409 = empty repo (no commits at all), not an error + if status.as_u16() == 409 { + break; + } + if status.as_u16() == 403 || status.as_u16() == 429 { + warn!(repo = %repo.full_name, status = %status, "rate limited; stopping early"); + return Err(SourceError::Http(format!("{} GET {}", status, url))); + } + if status.as_u16() == 404 { + warn!(repo = %repo.full_name, "repo not found; skipping"); + break; + } + if !status.is_success() { + return Err(SourceError::Http(format!("{} GET {}", status, url))); + } + + let items: Vec = resp + .json() + .await + .map_err(|e| SourceError::Parse(e.to_string()))?; + if items.is_empty() { + break; + } + + let events: Vec = items + .iter() + .filter_map(|item| parse_commit(item, repo)) + .collect(); + total += self.writer.upsert_events(&events).await?; + + if items.len() < self.config.per_page as usize { + break; + } + } + + self.state.touch(&state_key).await?; + Ok(total) + } +} + +#[async_trait] +impl EventSource for GithubRepoSource { + fn name(&self) -> &'static str { + SOURCE_NAME + } + + async fn poll(&self) -> Result { + let repos = self.discover_repos().await?; + debug!(repos = repos.len(), "discovered github repos"); + + let mut total = 0usize; + for repo in &repos { + match self.scan_repo(repo).await { + Ok(n) => { + if n > 0 { + debug!(repo = %repo.full_name, ingested = n, "repo commit scan complete"); + } + total += n; + } + Err(SourceError::Http(ref msg)) if msg.starts_with("403") || msg.starts_with("429") => { + warn!("rate limited during repo scan; ending poll early"); + break; + } + Err(e) => { + warn!(repo = %repo.full_name, error = %e, "repo scan failed; continuing"); + } + } + } + + self.state.touch(SOURCE_NAME).await?; + debug!(ingested = total, repos = repos.len(), "github-repo poll complete"); + Ok(total) + } +} + +#[derive(Debug, Clone)] +struct Repo { + full_name: String, + private: bool, +} + +fn parse_repo(item: &Value) -> Option { + let full_name = item.get("full_name").and_then(Value::as_str)?; + let private = item.get("private").and_then(Value::as_bool).unwrap_or(false); + Some(Repo { + full_name: full_name.to_string(), + private, + }) +} + +fn parse_commit(item: &Value, repo: &Repo) -> Option { + let sha = item.get("sha").and_then(Value::as_str)?; + let date_str = item + .get("commit") + .and_then(|c| c.get("author")) + .and_then(|a| a.get("date")) + .and_then(Value::as_str) + .or_else(|| { + item.get("commit") + .and_then(|c| c.get("committer")) + .and_then(|c| c.get("date")) + .and_then(Value::as_str) + })?; + let occurred_at = DateTime::parse_from_rfc3339(date_str) + .ok()? + .with_timezone(&Utc); + + Some(Event { + id: format!("github-commit:{sha}"), + source: Source::Github, + action: "Commit".into(), + occurred_at, + public: !repo.private, + payload: item.clone(), + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + #[test] + fn parse_commit_uses_sha_as_id() { + let repo = Repo { + full_name: "grenade/moments".into(), + private: false, + }; + let raw = json!({ + "sha": "abc123", + "commit": { + "author": { "date": "2024-01-15T10:30:00Z" }, + "message": "fix something" + } + }); + let ev = parse_commit(&raw, &repo).expect("parses"); + assert_eq!(ev.id, "github-commit:abc123"); + assert_eq!(ev.action, "Commit"); + assert!(ev.public); + } + + #[test] + fn parse_commit_private_repo() { + let repo = Repo { + full_name: "grenade/secret".into(), + private: true, + }; + let raw = json!({ + "sha": "def456", + "commit": { + "author": { "date": "2024-01-15T10:30:00Z" }, + "message": "secret change" + } + }); + let ev = parse_commit(&raw, &repo).expect("parses"); + assert!(!ev.public); + } + + #[test] + fn parse_commit_falls_back_to_committer_date() { + let repo = Repo { + full_name: "grenade/moments".into(), + private: false, + }; + let raw = json!({ + "sha": "ghi789", + "commit": { + "committer": { "date": "2024-02-01T12:00:00Z" }, + "message": "no author date" + } + }); + let ev = parse_commit(&raw, &repo).expect("parses"); + assert_eq!(ev.id, "github-commit:ghi789"); + } + + #[test] + fn parse_repo_extracts_fields() { + let raw = json!({ + "full_name": "grenade/moments", + "private": false + }); + let repo = parse_repo(&raw).expect("parses"); + assert_eq!(repo.full_name, "grenade/moments"); + assert!(!repo.private); + } +} diff --git a/crates/moments-data/src/lib.rs b/crates/moments-data/src/lib.rs index be9b723..8ca0671 100644 --- a/crates/moments-data/src/lib.rs +++ b/crates/moments-data/src/lib.rs @@ -1,6 +1,7 @@ pub mod bugzilla; pub mod gitea; pub mod github; +pub mod github_repo; pub mod github_search; pub mod hg; diff --git a/crates/moments-worker/src/main.rs b/crates/moments-worker/src/main.rs index 07e0185..4cc5c05 100644 --- a/crates/moments-worker/src/main.rs +++ b/crates/moments-worker/src/main.rs @@ -7,6 +7,7 @@ use moments_data::{ bugzilla::{BugzillaConfig, BugzillaSource}, gitea::{GiteaConfig, GiteaSource}, github::{GithubConfig, GithubSource}, + github_repo::{GithubRepoConfig, GithubRepoSource}, github_search::{GithubSearchConfig, GithubSearchSource}, hg::{HgConfig, HgSource}, }; @@ -35,6 +36,11 @@ struct Args { #[arg(long, env = "SEARCH_POLL_INTERVAL_SECS", default_value = "86400")] search_interval_secs: u64, + /// Seconds between per-repo commit enumeration polls (full history backfill). + /// Defaults to weekly — expensive initial scan, cheap afterwards. + #[arg(long, env = "REPO_POLL_INTERVAL_SECS", default_value = "604800")] + repo_interval_secs: u64, + #[arg(long, env = "GITEA_HOST", default_value = "git.lair.cafe")] gitea_host: String, @@ -132,6 +138,17 @@ async fn main() -> anyhow::Result<()> { }, )) as Arc; + let github_repo = Arc::new(GithubRepoSource::new( + http.clone(), + store.clone(), + store.clone(), + GithubRepoConfig { + user: args.github_user.clone(), + token: args.github_token.clone(), + ..Default::default() + }, + )) as Arc; + let gitea = Arc::new(GiteaSource::new( http.clone(), store.clone(), @@ -180,6 +197,7 @@ async fn main() -> anyhow::Result<()> { bugzilla_email = args.bugzilla_email, events_interval_secs = args.interval_secs, search_interval_secs = args.search_interval_secs, + repo_interval_secs = args.repo_interval_secs, gitea_interval_secs = args.gitea_interval_secs, hg_interval_secs = args.hg_interval_secs, bugzilla_interval_secs = args.bugzilla_interval_secs, @@ -188,6 +206,7 @@ async fn main() -> anyhow::Result<()> { let interval = Duration::from_secs(args.interval_secs); let search_interval = Duration::from_secs(args.search_interval_secs); + let repo_interval = Duration::from_secs(args.repo_interval_secs); let gitea_interval = Duration::from_secs(args.gitea_interval_secs); let hg_interval = Duration::from_secs(args.hg_interval_secs); let bugzilla_interval = Duration::from_secs(args.bugzilla_interval_secs); @@ -195,6 +214,8 @@ async fn main() -> anyhow::Result<()> { let github_task = tokio::spawn(async move { run_poller(github, interval).await }); let github_search_task = tokio::spawn(async move { run_poller(github_search, search_interval).await }); + let github_repo_task = + tokio::spawn(async move { run_poller(github_repo, repo_interval).await }); let gitea_task = tokio::spawn(async move { run_poller(gitea, gitea_interval).await }); let hg_task = tokio::spawn(async move { run_poller(hg, hg_interval).await }); let bugzilla_task = @@ -204,6 +225,7 @@ async fn main() -> anyhow::Result<()> { info!("shutdown signal received"); github_task.abort(); github_search_task.abort(); + github_repo_task.abort(); gitea_task.abort(); hg_task.abort(); bugzilla_task.abort(); diff --git a/script/hg-ingest.sh b/script/hg-ingest.sh index 55e68f5..7ad3982 100755 --- a/script/hg-ingest.sh +++ b/script/hg-ingest.sh @@ -19,6 +19,7 @@ WORK_DIR="${HG_WORK_DIR:-$HOME/hg}" # Repos to clone (groups are expanded inline) REPOS=( + mozilla-central integration/mozilla-inbound integration/autoland integration/fx-team