From 88fbbba60bd1b74311da7cdde4f8ade7c2f80762 Mon Sep 17 00:00:00 2001 From: rob thijssen Date: Tue, 5 May 2026 13:45:33 +0300 Subject: [PATCH] feat(hg): revset-based author query, group discovery, one-shot ingest script MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Rewrites the hg worker to use json-log?rev=author() which matches the changeset author (not the pusher), capturing commits landed by sheriffs. Repos are discovered within configured groups plus individually listed repos. The worker skips entirely after the first successful backfill. Adds script/hg-ingest.sh for offline ingestion via local hg clones — clones one repo at a time, caches extracted changesets to .tsv, inserts via psql, and sets poller_state when done. Co-Authored-By: Claude Opus 4.6 (1M context) --- asset/config/worker.env.tmpl | 5 +- crates/moments-data/src/hg.rs | 229 ++++++++++++++++-------------- crates/moments-worker/src/main.rs | 21 ++- script/hg-ingest.sh | 141 ++++++++++++++++++ 4 files changed, 284 insertions(+), 112 deletions(-) create mode 100755 script/hg-ingest.sh diff --git a/asset/config/worker.env.tmpl b/asset/config/worker.env.tmpl index 0e0d5d9..78c2665 100644 --- a/asset/config/worker.env.tmpl +++ b/asset/config/worker.env.tmpl @@ -14,8 +14,9 @@ GITEA_TOKEN={{GITEA_TOKEN}} GITEA_POLL_INTERVAL_SECS=600 HG_HOST=hg-edge.mozilla.org -HG_REPOS=build/puppet,build/tools,build/buildbot-configs -HG_AUTHOR_TERMS=thijssen,grenade +HG_GROUPS=build,integration +HG_REPOS=mozilla-central +HG_AUTHOR_TERMS=rthijssen,grenade HG_POLL_INTERVAL_SECS=86400 BUGZILLA_HOST=bugzilla.mozilla.org diff --git a/crates/moments-data/src/hg.rs b/crates/moments-data/src/hg.rs index bc3c952..a71d9dc 100644 --- a/crates/moments-data/src/hg.rs +++ b/crates/moments-data/src/hg.rs @@ -1,14 +1,14 @@ -//! hg-edge.mozilla.org pushlog ingestion. +//! hg-edge.mozilla.org changeset ingestion via `json-log` revset queries. //! -//! mozilla's hg pushlog filter `user=` matches the *pusher*, not the -//! changeset author. As a community-level contributor whose code was -//! reviewed and pushed by sheriffs/reviewers, the user filter returns 0 -//! results. So this source pulls full pushlogs from a configured set of -//! repos and filters changeset authors client-side by substring match. +//! Uses the `json-log?rev=author(term)` endpoint which returns changesets +//! by the *author* (not the pusher), so it captures commits landed by +//! sheriffs on behalf of the contributor. //! -//! The result set is historical (mozilla retired hg) — no new events -//! expected after the initial backfill. Daily polling keeps the upserts -//! cheap and idempotent. +//! Repos are discovered within configured groups (e.g. `build`) via the +//! `/{group}/?style=json` index, plus any individually listed repos +//! (e.g. `mozilla-central`). Once the first successful scan completes +//! (poller state is touched), all subsequent polls are skipped — the +//! data is historical and will not change. use std::sync::Arc; @@ -26,13 +26,17 @@ const USER_AGENT: &str = concat!( env!("CARGO_PKG_VERSION"), " (+https://rob.tn)" ); +/// Maximum changesets returned per json-log request. +const REV_COUNT: u32 = 500; #[derive(Clone, Debug)] pub struct HgConfig { pub host: String, - /// Case-insensitive substrings matched against changeset author fields. + /// Substrings matched via `author(term)` revset queries. pub author_terms: Vec, - /// Repo paths under host, e.g. "build/puppet". + /// Repo groups to scan — each is enumerated via `/{group}/?style=json`. + pub groups: Vec, + /// Individual repos to scan (e.g. `mozilla-central`). pub repos: Vec, } @@ -40,12 +44,9 @@ impl Default for HgConfig { fn default() -> Self { Self { host: "hg-edge.mozilla.org".into(), - author_terms: vec!["thijssen".into(), "grenade".into()], - repos: vec![ - "build/puppet".into(), - "build/tools".into(), - "build/buildbot-configs".into(), - ], + author_terms: vec!["rthijssen".into(), "grenade".into()], + groups: vec!["build".into(), "integration".into()], + repos: vec!["mozilla-central".into()], } } } @@ -72,23 +73,9 @@ impl HgSource { } } - fn pushlog_url(&self, repo: &str) -> String { - format!( - "https://{}/{}/json-pushes?version=2&full=1", - self.config.host, repo - ) - } - - fn matches_author(&self, author: &str) -> bool { - let lower = author.to_lowercase(); - self.config - .author_terms - .iter() - .any(|t| lower.contains(&t.to_lowercase())) - } - - async fn scan_repo(&self, repo: &str) -> Result { - let url = self.pushlog_url(repo); + /// Discover repos in a group via `/{group}/?style=json`. + async fn discover_repos(&self, group: &str) -> Result, SourceError> { + let url = format!("https://{}/{}/?style=json", self.config.host, group); let resp = self .client .get(&url) @@ -97,69 +84,87 @@ impl HgSource { .await .map_err(|e| SourceError::Http(e.to_string()))?; if !resp.status().is_success() { - return Err(SourceError::Http(format!("{} GET {}", resp.status(), url))); + warn!(group, status = %resp.status(), "failed to discover repos in group"); + return Ok(vec![]); } let body: Value = resp .json() .await .map_err(|e| SourceError::Parse(e.to_string()))?; + Ok(body + .get("entries") + .and_then(Value::as_array) + .map(|entries| { + entries + .iter() + .filter_map(|e| { + e.get("name") + .and_then(Value::as_str) + .map(|name| format!("{group}/{name}")) + }) + .collect() + }) + .unwrap_or_default()) + } - let mut events = Vec::new(); - if let Some(pushes) = body.get("pushes").and_then(Value::as_object) { - for (pushid, push) in pushes { - let pushed_at_secs = push.get("date").and_then(Value::as_i64).unwrap_or(0); - let pushed_at = Utc - .timestamp_opt(pushed_at_secs, 0) - .single() - .unwrap_or_else(Utc::now); - let pusher = push - .get("user") - .and_then(Value::as_str) - .unwrap_or("") - .to_string(); - if let Some(changesets) = push.get("changesets").and_then(Value::as_array) { - for cs in changesets { - let author = cs.get("author").and_then(Value::as_str).unwrap_or(""); - if !self.matches_author(author) { - continue; - } - let node = cs.get("node").and_then(Value::as_str).unwrap_or(""); - if node.is_empty() { - continue; - } - let occurred_at = cs - .get("date") - .and_then(Value::as_array) - .and_then(|a| parse_hg_date(a)) - .unwrap_or(pushed_at); + fn log_url(&self, repo: &str, author_term: &str) -> String { + format!( + "https://{}/{}/json-log?rev=author({})&style=json&revcount={}", + self.config.host, repo, author_term, REV_COUNT + ) + } - let mut payload = cs.clone(); - if let Some(obj) = payload.as_object_mut() { - obj.insert("_repo".into(), Value::String(repo.into())); - obj.insert( - "_host".into(), - Value::String(self.config.host.clone()), - ); - obj.insert("_pusher".into(), Value::String(pusher.clone())); - obj.insert( - "_pushid".into(), - Value::String(pushid.clone()), - ); - } - events.push(Event { - id: format!("hg:{repo}:{node}"), - source: Source::Hg, - action: "Commit".into(), - occurred_at, - // mozilla hg-edge is exclusively public. - public: true, - payload, - }); + async fn scan_repo(&self, repo: &str) -> Result { + let mut all_events = Vec::new(); + for term in &self.config.author_terms { + let url = self.log_url(repo, term); + let resp = self + .client + .get(&url) + .header(header::USER_AGENT, USER_AGENT) + .send() + .await + .map_err(|e| SourceError::Http(e.to_string()))?; + if !resp.status().is_success() { + return Err(SourceError::Http(format!("{} GET {}", resp.status(), url))); + } + let body: Value = resp + .json() + .await + .map_err(|e| SourceError::Parse(e.to_string()))?; + + if let Some(entries) = body.get("entries").and_then(Value::as_array) { + for entry in entries { + let node = entry.get("node").and_then(Value::as_str).unwrap_or(""); + if node.is_empty() { + continue; } + let occurred_at = entry + .get("date") + .and_then(Value::as_array) + .and_then(|a| parse_hg_date(a)) + .unwrap_or_else(Utc::now); + + let mut payload = entry.clone(); + if let Some(obj) = payload.as_object_mut() { + obj.insert("_repo".into(), Value::String(repo.into())); + obj.insert( + "_host".into(), + Value::String(self.config.host.clone()), + ); + } + all_events.push(Event { + id: format!("hg:{repo}:{node}"), + source: Source::Hg, + action: "Commit".into(), + occurred_at, + public: true, + payload, + }); } } } - Ok(self.writer.upsert_events(&events).await?) + Ok(self.writer.upsert_events(&all_events).await?) } } @@ -170,11 +175,26 @@ impl EventSource for HgSource { } async fn poll(&self) -> Result { + // hg repos are archived — one complete scan is sufficient. + if self.state.load(SOURCE_NAME).await?.is_some() { + debug!("hg already backfilled, skipping"); + return Ok(0); + } + + let mut repos: Vec = self.config.repos.clone(); + for group in &self.config.groups { + let discovered = self.discover_repos(group).await?; + debug!(group, repos = discovered.len(), "discovered hg repos"); + repos.extend(discovered); + } + let mut total = 0usize; - for repo in &self.config.repos { + for repo in &repos { match self.scan_repo(repo).await { Ok(n) => { - debug!(repo, ingested = n, "hg repo scan complete"); + if n > 0 { + debug!(repo, ingested = n, "hg repo scan complete"); + } total += n; } Err(e) => { @@ -183,11 +203,12 @@ impl EventSource for HgSource { } } self.state.touch(SOURCE_NAME).await?; + debug!(ingested = total, "hg backfill complete"); Ok(total) } } -/// Parse a pushlog date array `[seconds, tz_offset_secs]` into UTC. +/// Parse a hgweb date array `[seconds, tz_offset_secs]` into UTC. fn parse_hg_date(arr: &[Value]) -> Option> { let secs = arr.first()?.as_f64()? as i64; Utc.timestamp_opt(secs, 0).single() @@ -197,23 +218,6 @@ fn parse_hg_date(arr: &[Value]) -> Option> { mod tests { use super::*; - #[test] - fn matches_author_substring_case_insensitive() { - let s = HgSource { - client: Client::new(), - writer: Arc::new(NoopWriter), - state: Arc::new(NoopState), - config: HgConfig { - author_terms: vec!["thijssen".into(), "grenade".into()], - ..Default::default() - }, - }; - assert!(s.matches_author("Rob Thijssen ")); - assert!(s.matches_author("grenade@example")); - assert!(s.matches_author("THIJSSEN")); - assert!(!s.matches_author("Other Person ")); - } - #[test] fn parse_hg_date_handles_seconds() { let arr = vec![Value::from(1_700_000_000_f64), Value::from(0_f64)]; @@ -221,6 +225,19 @@ mod tests { assert_eq!(dt.timestamp(), 1_700_000_000); } + #[test] + fn log_url_uses_revset_author_query() { + let src = HgSource { + client: Client::new(), + writer: Arc::new(NoopWriter), + state: Arc::new(NoopState), + config: HgConfig::default(), + }; + let url = src.log_url("mozilla-central", "thijssen"); + assert!(url.contains("json-log?rev=author(thijssen)")); + assert!(url.contains("revcount=500")); + } + // Tiny stub impls just so we can construct an HgSource for unit tests. struct NoopWriter; #[async_trait] diff --git a/crates/moments-worker/src/main.rs b/crates/moments-worker/src/main.rs index 2b4c740..07e0185 100644 --- a/crates/moments-worker/src/main.rs +++ b/crates/moments-worker/src/main.rs @@ -51,21 +51,31 @@ struct Args { #[arg(long, env = "HG_HOST", default_value = "hg-edge.mozilla.org")] hg_host: String, - /// Comma-separated mozilla hg repo paths to scan, e.g. "build/puppet,build/tools". + /// Comma-separated repo groups to scan. Repos within each group are + /// discovered via `/{group}/?style=json`. + #[arg( + long, + env = "HG_GROUPS", + value_delimiter = ',', + default_value = "build,integration" + )] + hg_groups: Vec, + + /// Comma-separated individual repos to scan (e.g. `mozilla-central`). #[arg( long, env = "HG_REPOS", value_delimiter = ',', - default_value = "build/puppet,build/tools,build/buildbot-configs" + default_value = "mozilla-central" )] hg_repos: Vec, - /// Comma-separated case-insensitive substrings matched against changeset author fields. + /// Comma-separated author substrings for `author()` revset queries. #[arg( long, env = "HG_AUTHOR_TERMS", value_delimiter = ',', - default_value = "thijssen,grenade" + default_value = "rthijssen,grenade" )] hg_author_terms: Vec, @@ -141,6 +151,7 @@ async fn main() -> anyhow::Result<()> { HgConfig { host: args.hg_host.clone(), author_terms: args.hg_author_terms.clone(), + groups: args.hg_groups.clone(), repos: args.hg_repos.clone(), }, )) as Arc; @@ -162,7 +173,9 @@ async fn main() -> anyhow::Result<()> { gitea_host = args.gitea_host, gitea_user = args.gitea_user, hg_host = args.hg_host, + hg_groups = ?args.hg_groups, hg_repos = ?args.hg_repos, + hg_author_terms = ?args.hg_author_terms, bugzilla_host = args.bugzilla_host, bugzilla_email = args.bugzilla_email, events_interval_secs = args.interval_secs, diff --git a/script/hg-ingest.sh b/script/hg-ingest.sh new file mode 100755 index 0000000..d5f1aa4 --- /dev/null +++ b/script/hg-ingest.sh @@ -0,0 +1,141 @@ +#!/usr/bin/env bash +# +# One-shot hg changeset ingestion via local clones. +# +# Bare-clones each hg repo, extracts changesets matching author terms, +# and inserts them into the moments database. Sets poller_state so the +# worker won't re-scan. +# +# Requirements: hg (mercurial), psql, jq +# +# Usage: +# DATABASE_URL="postgres://..." ./script/hg-ingest.sh +# +set -euo pipefail + +DATABASE_URL="${DATABASE_URL:-postgres://moments_rw@magrathea.kosherinata.internal/moments:5432?sslmode=verify-full&sslrootcert=/etc/pki/ca-trust/source/anchors/root-internal.pem&sslcert=/etc/pki/tls/misc/$(hostname -f).pem&sslkey=/etc/pki/tls/private/$(hostname -f).pem}" +HG_HOST="${HG_HOST:-hg-edge.mozilla.org}" +WORK_DIR="${HG_WORK_DIR:-~/hg}" + +# Repos to clone (groups are expanded inline) +REPOS=( + mozilla-central + integration/mozilla-inbound + integration/autoland + integration/fx-team + integration/b2g-inbound + build/puppet + build/tools + build/buildbot + build/buildbot-configs + build/slave_health + build/mozharness + build/braindump + build/cloud-tools + build/compare-locales + build/nagios-core + build/partner-repacks + build/preproduction + build/rpm-sources + build/talos + build/tupperware + build/ash-mozharness + build/autoland + build/opsi-package-sources +) + +# Author terms — matched case-insensitively against changeset author fields +AUTHOR_TERMS=("rthijssen" "grenade") + +: "${DATABASE_URL:?DATABASE_URL must be set}" + +mkdir -p "$WORK_DIR" + +total=0 + +CLONE_DIR="$WORK_DIR/clone" +CACHE_DIR="$WORK_DIR/cache" +mkdir -p "$CACHE_DIR" + +for repo in "${REPOS[@]}"; do + cache_file="$CACHE_DIR/$(echo "$repo" | tr '/' '_').tsv" + + # Skip repos already cached (re-run safe) + if [ -f "$cache_file" ]; then + echo "[hg-ingest] $repo: using cached results" + else + # Remove any previous clone to keep only one on disk + rm -rf "$CLONE_DIR" + + echo "[hg-ingest] cloning $repo" + if ! hg clone --noupdate "https://$HG_HOST/$repo" "$CLONE_DIR" 2>/dev/null; then + echo "[hg-ingest] clone failed: $repo (skipping)" + continue + fi + + # Build revset: author(term1) or author(term2) ... + revset="" + for term in "${AUTHOR_TERMS[@]}"; do + if [ -z "$revset" ]; then + revset="author('$term')" + else + revset="$revset or author('$term')" + fi + done + + # Extract matching changesets to cache file + hg log -R "$CLONE_DIR" -r "$revset" \ + --template '{node}\t{author}\t{date|hgdate}\t{desc|firstline}\n' \ + > "$cache_file" 2>/dev/null || true + + # Free disk immediately + rm -rf "$CLONE_DIR" + fi + + # Ingest cached results into the database + count=0 + while IFS=$'\t' read -r node author date_raw desc; do + [ -z "$node" ] && continue + + # {date|hgdate} outputs "timestamp offset" — take just the timestamp + date_ts="${date_raw%% *}" + + # Build ISO timestamp from unix epoch + occurred_at=$(date -u -d "@${date_ts}" '+%Y-%m-%dT%H:%M:%SZ' 2>/dev/null || \ + date -u -r "${date_ts}" '+%Y-%m-%dT%H:%M:%SZ' 2>/dev/null) + + event_id="hg:${repo}:${node}" + + # Build payload JSON (jq handles all escaping) + payload=$(jq -n \ + --arg node "$node" \ + --arg user "$author" \ + --arg desc "$desc" \ + --arg repo "$repo" \ + --arg host "$HG_HOST" \ + '{node: $node, user: $user, desc: $desc, _repo: $repo, _host: $host}') + + # Upsert into events table + psql "$DATABASE_URL" -q -c " + INSERT INTO events (id, source, action, occurred_at, public, payload) + VALUES (\$\$${event_id}\$\$, 'hg', 'Commit', '${occurred_at}', true, \$\$${payload}\$\$::jsonb) + ON CONFLICT (id) DO NOTHING; + " + + count=$((count + 1)) + done < "$cache_file" + + if [ "$count" -gt 0 ]; then + echo "[hg-ingest] $repo: $count changesets ingested" + fi + total=$((total + count)) +done + +# Mark poller state so the worker skips hg +psql "$DATABASE_URL" -q -c " + INSERT INTO poller_state (source, last_fetched) + VALUES ('hg', now()) + ON CONFLICT (source) DO UPDATE SET last_fetched = now(); +" + +echo "[hg-ingest] done. total: $total changesets"