feat(hg): revset-based author query, group discovery, one-shot ingest script

Rewrites the hg worker to use json-log?rev=author() which matches the
changeset author (not the pusher), capturing commits landed by sheriffs.
Repos are discovered within configured groups plus individually listed
repos. The worker skips entirely after the first successful backfill.

Adds script/hg-ingest.sh for offline ingestion via local hg clones —
clones one repo at a time, caches extracted changesets to .tsv, inserts
via psql, and sets poller_state when done.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-05 13:45:33 +03:00
parent 1bbe55dc84
commit 88fbbba60b
4 changed files with 284 additions and 112 deletions

View File

@@ -1,14 +1,14 @@
//! hg-edge.mozilla.org pushlog ingestion.
//! hg-edge.mozilla.org changeset ingestion via `json-log` revset queries.
//!
//! mozilla's hg pushlog filter `user=` matches the *pusher*, not the
//! changeset author. As a community-level contributor whose code was
//! reviewed and pushed by sheriffs/reviewers, the user filter returns 0
//! results. So this source pulls full pushlogs from a configured set of
//! repos and filters changeset authors client-side by substring match.
//! Uses the `json-log?rev=author(term)` endpoint which returns changesets
//! by the *author* (not the pusher), so it captures commits landed by
//! sheriffs on behalf of the contributor.
//!
//! The result set is historical (mozilla retired hg) — no new events
//! expected after the initial backfill. Daily polling keeps the upserts
//! cheap and idempotent.
//! Repos are discovered within configured groups (e.g. `build`) via the
//! `/{group}/?style=json` index, plus any individually listed repos
//! (e.g. `mozilla-central`). Once the first successful scan completes
//! (poller state is touched), all subsequent polls are skipped — the
//! data is historical and will not change.
use std::sync::Arc;
@@ -26,13 +26,17 @@ const USER_AGENT: &str = concat!(
env!("CARGO_PKG_VERSION"),
" (+https://rob.tn)"
);
/// Maximum changesets returned per json-log request.
const REV_COUNT: u32 = 500;
#[derive(Clone, Debug)]
pub struct HgConfig {
pub host: String,
/// Case-insensitive substrings matched against changeset author fields.
/// Substrings matched via `author(term)` revset queries.
pub author_terms: Vec<String>,
/// Repo paths under host, e.g. "build/puppet".
/// Repo groups to scan — each is enumerated via `/{group}/?style=json`.
pub groups: Vec<String>,
/// Individual repos to scan (e.g. `mozilla-central`).
pub repos: Vec<String>,
}
@@ -40,12 +44,9 @@ impl Default for HgConfig {
fn default() -> Self {
Self {
host: "hg-edge.mozilla.org".into(),
author_terms: vec!["thijssen".into(), "grenade".into()],
repos: vec![
"build/puppet".into(),
"build/tools".into(),
"build/buildbot-configs".into(),
],
author_terms: vec!["rthijssen".into(), "grenade".into()],
groups: vec!["build".into(), "integration".into()],
repos: vec!["mozilla-central".into()],
}
}
}
@@ -72,23 +73,9 @@ impl HgSource {
}
}
fn pushlog_url(&self, repo: &str) -> String {
format!(
"https://{}/{}/json-pushes?version=2&full=1",
self.config.host, repo
)
}
fn matches_author(&self, author: &str) -> bool {
let lower = author.to_lowercase();
self.config
.author_terms
.iter()
.any(|t| lower.contains(&t.to_lowercase()))
}
async fn scan_repo(&self, repo: &str) -> Result<usize, SourceError> {
let url = self.pushlog_url(repo);
/// Discover repos in a group via `/{group}/?style=json`.
async fn discover_repos(&self, group: &str) -> Result<Vec<String>, SourceError> {
let url = format!("https://{}/{}/?style=json", self.config.host, group);
let resp = self
.client
.get(&url)
@@ -97,69 +84,87 @@ impl HgSource {
.await
.map_err(|e| SourceError::Http(e.to_string()))?;
if !resp.status().is_success() {
return Err(SourceError::Http(format!("{} GET {}", resp.status(), url)));
warn!(group, status = %resp.status(), "failed to discover repos in group");
return Ok(vec![]);
}
let body: Value = resp
.json()
.await
.map_err(|e| SourceError::Parse(e.to_string()))?;
Ok(body
.get("entries")
.and_then(Value::as_array)
.map(|entries| {
entries
.iter()
.filter_map(|e| {
e.get("name")
.and_then(Value::as_str)
.map(|name| format!("{group}/{name}"))
})
.collect()
})
.unwrap_or_default())
}
let mut events = Vec::new();
if let Some(pushes) = body.get("pushes").and_then(Value::as_object) {
for (pushid, push) in pushes {
let pushed_at_secs = push.get("date").and_then(Value::as_i64).unwrap_or(0);
let pushed_at = Utc
.timestamp_opt(pushed_at_secs, 0)
.single()
.unwrap_or_else(Utc::now);
let pusher = push
.get("user")
.and_then(Value::as_str)
.unwrap_or("")
.to_string();
if let Some(changesets) = push.get("changesets").and_then(Value::as_array) {
for cs in changesets {
let author = cs.get("author").and_then(Value::as_str).unwrap_or("");
if !self.matches_author(author) {
continue;
}
let node = cs.get("node").and_then(Value::as_str).unwrap_or("");
if node.is_empty() {
continue;
}
let occurred_at = cs
.get("date")
.and_then(Value::as_array)
.and_then(|a| parse_hg_date(a))
.unwrap_or(pushed_at);
fn log_url(&self, repo: &str, author_term: &str) -> String {
format!(
"https://{}/{}/json-log?rev=author({})&style=json&revcount={}",
self.config.host, repo, author_term, REV_COUNT
)
}
let mut payload = cs.clone();
if let Some(obj) = payload.as_object_mut() {
obj.insert("_repo".into(), Value::String(repo.into()));
obj.insert(
"_host".into(),
Value::String(self.config.host.clone()),
);
obj.insert("_pusher".into(), Value::String(pusher.clone()));
obj.insert(
"_pushid".into(),
Value::String(pushid.clone()),
);
}
events.push(Event {
id: format!("hg:{repo}:{node}"),
source: Source::Hg,
action: "Commit".into(),
occurred_at,
// mozilla hg-edge is exclusively public.
public: true,
payload,
});
async fn scan_repo(&self, repo: &str) -> Result<usize, SourceError> {
let mut all_events = Vec::new();
for term in &self.config.author_terms {
let url = self.log_url(repo, term);
let resp = self
.client
.get(&url)
.header(header::USER_AGENT, USER_AGENT)
.send()
.await
.map_err(|e| SourceError::Http(e.to_string()))?;
if !resp.status().is_success() {
return Err(SourceError::Http(format!("{} GET {}", resp.status(), url)));
}
let body: Value = resp
.json()
.await
.map_err(|e| SourceError::Parse(e.to_string()))?;
if let Some(entries) = body.get("entries").and_then(Value::as_array) {
for entry in entries {
let node = entry.get("node").and_then(Value::as_str).unwrap_or("");
if node.is_empty() {
continue;
}
let occurred_at = entry
.get("date")
.and_then(Value::as_array)
.and_then(|a| parse_hg_date(a))
.unwrap_or_else(Utc::now);
let mut payload = entry.clone();
if let Some(obj) = payload.as_object_mut() {
obj.insert("_repo".into(), Value::String(repo.into()));
obj.insert(
"_host".into(),
Value::String(self.config.host.clone()),
);
}
all_events.push(Event {
id: format!("hg:{repo}:{node}"),
source: Source::Hg,
action: "Commit".into(),
occurred_at,
public: true,
payload,
});
}
}
}
Ok(self.writer.upsert_events(&events).await?)
Ok(self.writer.upsert_events(&all_events).await?)
}
}
@@ -170,11 +175,26 @@ impl EventSource for HgSource {
}
async fn poll(&self) -> Result<usize, SourceError> {
// hg repos are archived — one complete scan is sufficient.
if self.state.load(SOURCE_NAME).await?.is_some() {
debug!("hg already backfilled, skipping");
return Ok(0);
}
let mut repos: Vec<String> = self.config.repos.clone();
for group in &self.config.groups {
let discovered = self.discover_repos(group).await?;
debug!(group, repos = discovered.len(), "discovered hg repos");
repos.extend(discovered);
}
let mut total = 0usize;
for repo in &self.config.repos {
for repo in &repos {
match self.scan_repo(repo).await {
Ok(n) => {
debug!(repo, ingested = n, "hg repo scan complete");
if n > 0 {
debug!(repo, ingested = n, "hg repo scan complete");
}
total += n;
}
Err(e) => {
@@ -183,11 +203,12 @@ impl EventSource for HgSource {
}
}
self.state.touch(SOURCE_NAME).await?;
debug!(ingested = total, "hg backfill complete");
Ok(total)
}
}
/// Parse a pushlog date array `[seconds, tz_offset_secs]` into UTC.
/// Parse a hgweb date array `[seconds, tz_offset_secs]` into UTC.
fn parse_hg_date(arr: &[Value]) -> Option<DateTime<Utc>> {
let secs = arr.first()?.as_f64()? as i64;
Utc.timestamp_opt(secs, 0).single()
@@ -197,23 +218,6 @@ fn parse_hg_date(arr: &[Value]) -> Option<DateTime<Utc>> {
mod tests {
use super::*;
#[test]
fn matches_author_substring_case_insensitive() {
let s = HgSource {
client: Client::new(),
writer: Arc::new(NoopWriter),
state: Arc::new(NoopState),
config: HgConfig {
author_terms: vec!["thijssen".into(), "grenade".into()],
..Default::default()
},
};
assert!(s.matches_author("Rob Thijssen <rob@example.com>"));
assert!(s.matches_author("grenade@example"));
assert!(s.matches_author("THIJSSEN"));
assert!(!s.matches_author("Other Person <other@example>"));
}
#[test]
fn parse_hg_date_handles_seconds() {
let arr = vec![Value::from(1_700_000_000_f64), Value::from(0_f64)];
@@ -221,6 +225,19 @@ mod tests {
assert_eq!(dt.timestamp(), 1_700_000_000);
}
#[test]
fn log_url_uses_revset_author_query() {
let src = HgSource {
client: Client::new(),
writer: Arc::new(NoopWriter),
state: Arc::new(NoopState),
config: HgConfig::default(),
};
let url = src.log_url("mozilla-central", "thijssen");
assert!(url.contains("json-log?rev=author(thijssen)"));
assert!(url.contains("revcount=500"));
}
// Tiny stub impls just so we can construct an HgSource for unit tests.
struct NoopWriter;
#[async_trait]