diff --git a/asset/config/worker.env.tmpl b/asset/config/worker.env.tmpl index 0e0d5d9..78c2665 100644 --- a/asset/config/worker.env.tmpl +++ b/asset/config/worker.env.tmpl @@ -14,8 +14,9 @@ GITEA_TOKEN={{GITEA_TOKEN}} GITEA_POLL_INTERVAL_SECS=600 HG_HOST=hg-edge.mozilla.org -HG_REPOS=build/puppet,build/tools,build/buildbot-configs -HG_AUTHOR_TERMS=thijssen,grenade +HG_GROUPS=build,integration +HG_REPOS=mozilla-central +HG_AUTHOR_TERMS=rthijssen,grenade HG_POLL_INTERVAL_SECS=86400 BUGZILLA_HOST=bugzilla.mozilla.org diff --git a/crates/moments-data/src/hg.rs b/crates/moments-data/src/hg.rs index bc3c952..a71d9dc 100644 --- a/crates/moments-data/src/hg.rs +++ b/crates/moments-data/src/hg.rs @@ -1,14 +1,14 @@ -//! hg-edge.mozilla.org pushlog ingestion. +//! hg-edge.mozilla.org changeset ingestion via `json-log` revset queries. //! -//! mozilla's hg pushlog filter `user=` matches the *pusher*, not the -//! changeset author. As a community-level contributor whose code was -//! reviewed and pushed by sheriffs/reviewers, the user filter returns 0 -//! results. So this source pulls full pushlogs from a configured set of -//! repos and filters changeset authors client-side by substring match. +//! Uses the `json-log?rev=author(term)` endpoint which returns changesets +//! by the *author* (not the pusher), so it captures commits landed by +//! sheriffs on behalf of the contributor. //! -//! The result set is historical (mozilla retired hg) — no new events -//! expected after the initial backfill. Daily polling keeps the upserts -//! cheap and idempotent. +//! Repos are discovered within configured groups (e.g. `build`) via the +//! `/{group}/?style=json` index, plus any individually listed repos +//! (e.g. `mozilla-central`). Once the first successful scan completes +//! (poller state is touched), all subsequent polls are skipped — the +//! data is historical and will not change. use std::sync::Arc; @@ -26,13 +26,17 @@ const USER_AGENT: &str = concat!( env!("CARGO_PKG_VERSION"), " (+https://rob.tn)" ); +/// Maximum changesets returned per json-log request. +const REV_COUNT: u32 = 500; #[derive(Clone, Debug)] pub struct HgConfig { pub host: String, - /// Case-insensitive substrings matched against changeset author fields. + /// Substrings matched via `author(term)` revset queries. pub author_terms: Vec, - /// Repo paths under host, e.g. "build/puppet". + /// Repo groups to scan — each is enumerated via `/{group}/?style=json`. + pub groups: Vec, + /// Individual repos to scan (e.g. `mozilla-central`). pub repos: Vec, } @@ -40,12 +44,9 @@ impl Default for HgConfig { fn default() -> Self { Self { host: "hg-edge.mozilla.org".into(), - author_terms: vec!["thijssen".into(), "grenade".into()], - repos: vec![ - "build/puppet".into(), - "build/tools".into(), - "build/buildbot-configs".into(), - ], + author_terms: vec!["rthijssen".into(), "grenade".into()], + groups: vec!["build".into(), "integration".into()], + repos: vec!["mozilla-central".into()], } } } @@ -72,23 +73,9 @@ impl HgSource { } } - fn pushlog_url(&self, repo: &str) -> String { - format!( - "https://{}/{}/json-pushes?version=2&full=1", - self.config.host, repo - ) - } - - fn matches_author(&self, author: &str) -> bool { - let lower = author.to_lowercase(); - self.config - .author_terms - .iter() - .any(|t| lower.contains(&t.to_lowercase())) - } - - async fn scan_repo(&self, repo: &str) -> Result { - let url = self.pushlog_url(repo); + /// Discover repos in a group via `/{group}/?style=json`. + async fn discover_repos(&self, group: &str) -> Result, SourceError> { + let url = format!("https://{}/{}/?style=json", self.config.host, group); let resp = self .client .get(&url) @@ -97,69 +84,87 @@ impl HgSource { .await .map_err(|e| SourceError::Http(e.to_string()))?; if !resp.status().is_success() { - return Err(SourceError::Http(format!("{} GET {}", resp.status(), url))); + warn!(group, status = %resp.status(), "failed to discover repos in group"); + return Ok(vec![]); } let body: Value = resp .json() .await .map_err(|e| SourceError::Parse(e.to_string()))?; + Ok(body + .get("entries") + .and_then(Value::as_array) + .map(|entries| { + entries + .iter() + .filter_map(|e| { + e.get("name") + .and_then(Value::as_str) + .map(|name| format!("{group}/{name}")) + }) + .collect() + }) + .unwrap_or_default()) + } - let mut events = Vec::new(); - if let Some(pushes) = body.get("pushes").and_then(Value::as_object) { - for (pushid, push) in pushes { - let pushed_at_secs = push.get("date").and_then(Value::as_i64).unwrap_or(0); - let pushed_at = Utc - .timestamp_opt(pushed_at_secs, 0) - .single() - .unwrap_or_else(Utc::now); - let pusher = push - .get("user") - .and_then(Value::as_str) - .unwrap_or("") - .to_string(); - if let Some(changesets) = push.get("changesets").and_then(Value::as_array) { - for cs in changesets { - let author = cs.get("author").and_then(Value::as_str).unwrap_or(""); - if !self.matches_author(author) { - continue; - } - let node = cs.get("node").and_then(Value::as_str).unwrap_or(""); - if node.is_empty() { - continue; - } - let occurred_at = cs - .get("date") - .and_then(Value::as_array) - .and_then(|a| parse_hg_date(a)) - .unwrap_or(pushed_at); + fn log_url(&self, repo: &str, author_term: &str) -> String { + format!( + "https://{}/{}/json-log?rev=author({})&style=json&revcount={}", + self.config.host, repo, author_term, REV_COUNT + ) + } - let mut payload = cs.clone(); - if let Some(obj) = payload.as_object_mut() { - obj.insert("_repo".into(), Value::String(repo.into())); - obj.insert( - "_host".into(), - Value::String(self.config.host.clone()), - ); - obj.insert("_pusher".into(), Value::String(pusher.clone())); - obj.insert( - "_pushid".into(), - Value::String(pushid.clone()), - ); - } - events.push(Event { - id: format!("hg:{repo}:{node}"), - source: Source::Hg, - action: "Commit".into(), - occurred_at, - // mozilla hg-edge is exclusively public. - public: true, - payload, - }); + async fn scan_repo(&self, repo: &str) -> Result { + let mut all_events = Vec::new(); + for term in &self.config.author_terms { + let url = self.log_url(repo, term); + let resp = self + .client + .get(&url) + .header(header::USER_AGENT, USER_AGENT) + .send() + .await + .map_err(|e| SourceError::Http(e.to_string()))?; + if !resp.status().is_success() { + return Err(SourceError::Http(format!("{} GET {}", resp.status(), url))); + } + let body: Value = resp + .json() + .await + .map_err(|e| SourceError::Parse(e.to_string()))?; + + if let Some(entries) = body.get("entries").and_then(Value::as_array) { + for entry in entries { + let node = entry.get("node").and_then(Value::as_str).unwrap_or(""); + if node.is_empty() { + continue; } + let occurred_at = entry + .get("date") + .and_then(Value::as_array) + .and_then(|a| parse_hg_date(a)) + .unwrap_or_else(Utc::now); + + let mut payload = entry.clone(); + if let Some(obj) = payload.as_object_mut() { + obj.insert("_repo".into(), Value::String(repo.into())); + obj.insert( + "_host".into(), + Value::String(self.config.host.clone()), + ); + } + all_events.push(Event { + id: format!("hg:{repo}:{node}"), + source: Source::Hg, + action: "Commit".into(), + occurred_at, + public: true, + payload, + }); } } } - Ok(self.writer.upsert_events(&events).await?) + Ok(self.writer.upsert_events(&all_events).await?) } } @@ -170,11 +175,26 @@ impl EventSource for HgSource { } async fn poll(&self) -> Result { + // hg repos are archived — one complete scan is sufficient. + if self.state.load(SOURCE_NAME).await?.is_some() { + debug!("hg already backfilled, skipping"); + return Ok(0); + } + + let mut repos: Vec = self.config.repos.clone(); + for group in &self.config.groups { + let discovered = self.discover_repos(group).await?; + debug!(group, repos = discovered.len(), "discovered hg repos"); + repos.extend(discovered); + } + let mut total = 0usize; - for repo in &self.config.repos { + for repo in &repos { match self.scan_repo(repo).await { Ok(n) => { - debug!(repo, ingested = n, "hg repo scan complete"); + if n > 0 { + debug!(repo, ingested = n, "hg repo scan complete"); + } total += n; } Err(e) => { @@ -183,11 +203,12 @@ impl EventSource for HgSource { } } self.state.touch(SOURCE_NAME).await?; + debug!(ingested = total, "hg backfill complete"); Ok(total) } } -/// Parse a pushlog date array `[seconds, tz_offset_secs]` into UTC. +/// Parse a hgweb date array `[seconds, tz_offset_secs]` into UTC. fn parse_hg_date(arr: &[Value]) -> Option> { let secs = arr.first()?.as_f64()? as i64; Utc.timestamp_opt(secs, 0).single() @@ -197,23 +218,6 @@ fn parse_hg_date(arr: &[Value]) -> Option> { mod tests { use super::*; - #[test] - fn matches_author_substring_case_insensitive() { - let s = HgSource { - client: Client::new(), - writer: Arc::new(NoopWriter), - state: Arc::new(NoopState), - config: HgConfig { - author_terms: vec!["thijssen".into(), "grenade".into()], - ..Default::default() - }, - }; - assert!(s.matches_author("Rob Thijssen ")); - assert!(s.matches_author("grenade@example")); - assert!(s.matches_author("THIJSSEN")); - assert!(!s.matches_author("Other Person ")); - } - #[test] fn parse_hg_date_handles_seconds() { let arr = vec![Value::from(1_700_000_000_f64), Value::from(0_f64)]; @@ -221,6 +225,19 @@ mod tests { assert_eq!(dt.timestamp(), 1_700_000_000); } + #[test] + fn log_url_uses_revset_author_query() { + let src = HgSource { + client: Client::new(), + writer: Arc::new(NoopWriter), + state: Arc::new(NoopState), + config: HgConfig::default(), + }; + let url = src.log_url("mozilla-central", "thijssen"); + assert!(url.contains("json-log?rev=author(thijssen)")); + assert!(url.contains("revcount=500")); + } + // Tiny stub impls just so we can construct an HgSource for unit tests. struct NoopWriter; #[async_trait] diff --git a/crates/moments-worker/src/main.rs b/crates/moments-worker/src/main.rs index 2b4c740..07e0185 100644 --- a/crates/moments-worker/src/main.rs +++ b/crates/moments-worker/src/main.rs @@ -51,21 +51,31 @@ struct Args { #[arg(long, env = "HG_HOST", default_value = "hg-edge.mozilla.org")] hg_host: String, - /// Comma-separated mozilla hg repo paths to scan, e.g. "build/puppet,build/tools". + /// Comma-separated repo groups to scan. Repos within each group are + /// discovered via `/{group}/?style=json`. + #[arg( + long, + env = "HG_GROUPS", + value_delimiter = ',', + default_value = "build,integration" + )] + hg_groups: Vec, + + /// Comma-separated individual repos to scan (e.g. `mozilla-central`). #[arg( long, env = "HG_REPOS", value_delimiter = ',', - default_value = "build/puppet,build/tools,build/buildbot-configs" + default_value = "mozilla-central" )] hg_repos: Vec, - /// Comma-separated case-insensitive substrings matched against changeset author fields. + /// Comma-separated author substrings for `author()` revset queries. #[arg( long, env = "HG_AUTHOR_TERMS", value_delimiter = ',', - default_value = "thijssen,grenade" + default_value = "rthijssen,grenade" )] hg_author_terms: Vec, @@ -141,6 +151,7 @@ async fn main() -> anyhow::Result<()> { HgConfig { host: args.hg_host.clone(), author_terms: args.hg_author_terms.clone(), + groups: args.hg_groups.clone(), repos: args.hg_repos.clone(), }, )) as Arc; @@ -162,7 +173,9 @@ async fn main() -> anyhow::Result<()> { gitea_host = args.gitea_host, gitea_user = args.gitea_user, hg_host = args.hg_host, + hg_groups = ?args.hg_groups, hg_repos = ?args.hg_repos, + hg_author_terms = ?args.hg_author_terms, bugzilla_host = args.bugzilla_host, bugzilla_email = args.bugzilla_email, events_interval_secs = args.interval_secs, diff --git a/script/hg-ingest.sh b/script/hg-ingest.sh new file mode 100755 index 0000000..d5f1aa4 --- /dev/null +++ b/script/hg-ingest.sh @@ -0,0 +1,141 @@ +#!/usr/bin/env bash +# +# One-shot hg changeset ingestion via local clones. +# +# Bare-clones each hg repo, extracts changesets matching author terms, +# and inserts them into the moments database. Sets poller_state so the +# worker won't re-scan. +# +# Requirements: hg (mercurial), psql, jq +# +# Usage: +# DATABASE_URL="postgres://..." ./script/hg-ingest.sh +# +set -euo pipefail + +DATABASE_URL="${DATABASE_URL:-postgres://moments_rw@magrathea.kosherinata.internal/moments:5432?sslmode=verify-full&sslrootcert=/etc/pki/ca-trust/source/anchors/root-internal.pem&sslcert=/etc/pki/tls/misc/$(hostname -f).pem&sslkey=/etc/pki/tls/private/$(hostname -f).pem}" +HG_HOST="${HG_HOST:-hg-edge.mozilla.org}" +WORK_DIR="${HG_WORK_DIR:-~/hg}" + +# Repos to clone (groups are expanded inline) +REPOS=( + mozilla-central + integration/mozilla-inbound + integration/autoland + integration/fx-team + integration/b2g-inbound + build/puppet + build/tools + build/buildbot + build/buildbot-configs + build/slave_health + build/mozharness + build/braindump + build/cloud-tools + build/compare-locales + build/nagios-core + build/partner-repacks + build/preproduction + build/rpm-sources + build/talos + build/tupperware + build/ash-mozharness + build/autoland + build/opsi-package-sources +) + +# Author terms — matched case-insensitively against changeset author fields +AUTHOR_TERMS=("rthijssen" "grenade") + +: "${DATABASE_URL:?DATABASE_URL must be set}" + +mkdir -p "$WORK_DIR" + +total=0 + +CLONE_DIR="$WORK_DIR/clone" +CACHE_DIR="$WORK_DIR/cache" +mkdir -p "$CACHE_DIR" + +for repo in "${REPOS[@]}"; do + cache_file="$CACHE_DIR/$(echo "$repo" | tr '/' '_').tsv" + + # Skip repos already cached (re-run safe) + if [ -f "$cache_file" ]; then + echo "[hg-ingest] $repo: using cached results" + else + # Remove any previous clone to keep only one on disk + rm -rf "$CLONE_DIR" + + echo "[hg-ingest] cloning $repo" + if ! hg clone --noupdate "https://$HG_HOST/$repo" "$CLONE_DIR" 2>/dev/null; then + echo "[hg-ingest] clone failed: $repo (skipping)" + continue + fi + + # Build revset: author(term1) or author(term2) ... + revset="" + for term in "${AUTHOR_TERMS[@]}"; do + if [ -z "$revset" ]; then + revset="author('$term')" + else + revset="$revset or author('$term')" + fi + done + + # Extract matching changesets to cache file + hg log -R "$CLONE_DIR" -r "$revset" \ + --template '{node}\t{author}\t{date|hgdate}\t{desc|firstline}\n' \ + > "$cache_file" 2>/dev/null || true + + # Free disk immediately + rm -rf "$CLONE_DIR" + fi + + # Ingest cached results into the database + count=0 + while IFS=$'\t' read -r node author date_raw desc; do + [ -z "$node" ] && continue + + # {date|hgdate} outputs "timestamp offset" — take just the timestamp + date_ts="${date_raw%% *}" + + # Build ISO timestamp from unix epoch + occurred_at=$(date -u -d "@${date_ts}" '+%Y-%m-%dT%H:%M:%SZ' 2>/dev/null || \ + date -u -r "${date_ts}" '+%Y-%m-%dT%H:%M:%SZ' 2>/dev/null) + + event_id="hg:${repo}:${node}" + + # Build payload JSON (jq handles all escaping) + payload=$(jq -n \ + --arg node "$node" \ + --arg user "$author" \ + --arg desc "$desc" \ + --arg repo "$repo" \ + --arg host "$HG_HOST" \ + '{node: $node, user: $user, desc: $desc, _repo: $repo, _host: $host}') + + # Upsert into events table + psql "$DATABASE_URL" -q -c " + INSERT INTO events (id, source, action, occurred_at, public, payload) + VALUES (\$\$${event_id}\$\$, 'hg', 'Commit', '${occurred_at}', true, \$\$${payload}\$\$::jsonb) + ON CONFLICT (id) DO NOTHING; + " + + count=$((count + 1)) + done < "$cache_file" + + if [ "$count" -gt 0 ]; then + echo "[hg-ingest] $repo: $count changesets ingested" + fi + total=$((total + count)) +done + +# Mark poller state so the worker skips hg +psql "$DATABASE_URL" -q -c " + INSERT INTO poller_state (source, last_fetched) + VALUES ('hg', now()) + ON CONFLICT (source) DO UPDATE SET last_fetched = now(); +" + +echo "[hg-ingest] done. total: $total changesets"