feat(worker): add hg-edge and bugzilla pollers

Wires two historical sources for completeness with the 2019 timeline:

- hg-edge.mozilla.org: scans json-pushes for a configured set of
  build/* repos and matches changeset author client-side, since the
  pushlog `user=` filter targets the pusher (sheriffs/reviewers in
  this case) rather than the author. Daily poll cadence — mozilla
  retired hg, no new events expected.
- bugzilla.mozilla.org: queries /rest/bug?creator=<email>. Without
  an api key the unauthenticated endpoint only returns public bugs,
  which is what the public timeline wants anyway.

Reshape renders "<author> committed <short_node> in <repo>" for hg
and "filed bug #<id> in <product>" for bugzilla, both linking back
to the canonical upstream URL via a stamped `_host` payload field.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-03 19:55:41 +03:00
parent f750e8de47
commit 7919a2d9ab
7 changed files with 721 additions and 18 deletions

View File

@@ -0,0 +1,176 @@
//! Bugzilla REST API ingestion.
//!
//! Hits `/rest/bug?creator=<email>` to pull bugs the user filed. Without
//! an api key, only public bugs are returned, which is what we want for
//! the public timeline anyway.
//!
//! No incremental story for v1 — each tick refetches the full list and
//! relies on idempotent upsert by `bugzilla:<id>`. Comments and bug
//! changes (status flips, assignment, etc.) aren't ingested for v1
//! because they require per-bug API calls; revisit if that history
//! becomes desirable.
use std::sync::Arc;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use moments_core::{EventSource, EventWriter, PollerStateStore, SourceError};
use moments_entities::{Event, Source};
use reqwest::{Client, header};
use serde_json::Value;
use tracing::debug;
const SOURCE_NAME: &str = "bugzilla";
const USER_AGENT: &str = concat!(
"moments/",
env!("CARGO_PKG_VERSION"),
" (+https://rob.tn)"
);
#[derive(Clone, Debug)]
pub struct BugzillaConfig {
pub host: String,
pub creator_email: String,
pub api_key: Option<String>,
/// Bugs requested per page; bugzilla allows up to 1000.
pub limit: u32,
}
impl Default for BugzillaConfig {
fn default() -> Self {
Self {
host: "bugzilla.mozilla.org".into(),
creator_email: "rthijssen@mozilla.com".into(),
api_key: None,
limit: 500,
}
}
}
pub struct BugzillaSource {
client: Client,
writer: Arc<dyn EventWriter>,
state: Arc<dyn PollerStateStore>,
config: BugzillaConfig,
}
impl BugzillaSource {
pub fn new(
client: Client,
writer: Arc<dyn EventWriter>,
state: Arc<dyn PollerStateStore>,
config: BugzillaConfig,
) -> Self {
Self {
client,
writer,
state,
config,
}
}
}
#[async_trait]
impl EventSource for BugzillaSource {
fn name(&self) -> &'static str {
SOURCE_NAME
}
async fn poll(&self) -> Result<usize, SourceError> {
let url = format!(
"https://{}/rest/bug?creator={}&limit={}\
&include_fields=id,summary,creation_time,last_change_time,status,resolution,product,component",
self.config.host, self.config.creator_email, self.config.limit
);
let mut req = self
.client
.get(&url)
.header(header::USER_AGENT, USER_AGENT)
.header(header::ACCEPT, "application/json");
if let Some(key) = &self.config.api_key {
req = req.header("X-BUGZILLA-API-KEY", key);
}
let resp = req
.send()
.await
.map_err(|e| SourceError::Http(e.to_string()))?;
if !resp.status().is_success() {
return Err(SourceError::Http(format!("{} GET {}", resp.status(), url)));
}
let body: Value = resp
.json()
.await
.map_err(|e| SourceError::Parse(e.to_string()))?;
let bugs = body
.get("bugs")
.and_then(Value::as_array)
.cloned()
.unwrap_or_default();
let events: Vec<Event> = bugs
.iter()
.filter_map(|b| parse_bug(b, &self.config.host))
.collect();
let n = self.writer.upsert_events(&events).await?;
self.state.touch(SOURCE_NAME).await?;
debug!(ingested = n, total = bugs.len(), "bugzilla poll complete");
Ok(n)
}
}
fn parse_bug(bug: &Value, host: &str) -> Option<Event> {
let id = bug.get("id").and_then(Value::as_i64)?;
let creation_time = bug.get("creation_time").and_then(Value::as_str)?;
let occurred_at = DateTime::parse_from_rfc3339(creation_time)
.ok()?
.with_timezone(&Utc);
let mut payload = bug.clone();
if let Some(obj) = payload.as_object_mut() {
obj.insert("_host".into(), Value::String(host.into()));
}
Some(Event {
id: format!("bugzilla:{id}"),
source: Source::Bugzilla,
action: "BugCreate".into(),
occurred_at,
// The unauth REST API only returns publicly visible bugs.
public: true,
payload,
})
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn parses_bug_with_creation_time() {
let raw = json!({
"id": 1158879,
"summary": "Commit Access (Level 1) for Rob Thijssen",
"creation_time": "2015-04-27T16:29:59Z",
"last_change_time": "2015-04-28T14:06:48Z",
"status": "RESOLVED",
"product": "mozilla.org"
});
let ev = parse_bug(&raw, "bugzilla.mozilla.org").expect("parses");
assert_eq!(ev.id, "bugzilla:1158879");
assert_eq!(ev.action, "BugCreate");
assert_eq!(ev.source, Source::Bugzilla);
assert!(ev.public);
assert_eq!(
ev.payload.get("_host").and_then(|v| v.as_str()),
Some("bugzilla.mozilla.org")
);
}
#[test]
fn rejects_bug_missing_id_or_creation_time() {
let raw = json!({ "summary": "x" });
assert!(parse_bug(&raw, "bugzilla.mozilla.org").is_none());
}
}

View File

@@ -0,0 +1,256 @@
//! hg-edge.mozilla.org pushlog ingestion.
//!
//! mozilla's hg pushlog filter `user=` matches the *pusher*, not the
//! changeset author. As a community-level contributor whose code was
//! reviewed and pushed by sheriffs/reviewers, the user filter returns 0
//! results. So this source pulls full pushlogs from a configured set of
//! repos and filters changeset authors client-side by substring match.
//!
//! The result set is historical (mozilla retired hg) — no new events
//! expected after the initial backfill. Daily polling keeps the upserts
//! cheap and idempotent.
use std::sync::Arc;
use async_trait::async_trait;
use chrono::{DateTime, TimeZone, Utc};
use moments_core::{EventSource, EventWriter, PollerStateStore, SourceError};
use moments_entities::{Event, Source};
use reqwest::{Client, header};
use serde_json::Value;
use tracing::{debug, warn};
const SOURCE_NAME: &str = "hg";
const USER_AGENT: &str = concat!(
"moments/",
env!("CARGO_PKG_VERSION"),
" (+https://rob.tn)"
);
#[derive(Clone, Debug)]
pub struct HgConfig {
pub host: String,
/// Case-insensitive substrings matched against changeset author fields.
pub author_terms: Vec<String>,
/// Repo paths under host, e.g. "build/puppet".
pub repos: Vec<String>,
}
impl Default for HgConfig {
fn default() -> Self {
Self {
host: "hg-edge.mozilla.org".into(),
author_terms: vec!["thijssen".into(), "grenade".into()],
repos: vec![
"build/puppet".into(),
"build/tools".into(),
"build/buildbot-configs".into(),
],
}
}
}
pub struct HgSource {
client: Client,
writer: Arc<dyn EventWriter>,
state: Arc<dyn PollerStateStore>,
config: HgConfig,
}
impl HgSource {
pub fn new(
client: Client,
writer: Arc<dyn EventWriter>,
state: Arc<dyn PollerStateStore>,
config: HgConfig,
) -> Self {
Self {
client,
writer,
state,
config,
}
}
fn pushlog_url(&self, repo: &str) -> String {
format!(
"https://{}/{}/json-pushes?version=2&full=1",
self.config.host, repo
)
}
fn matches_author(&self, author: &str) -> bool {
let lower = author.to_lowercase();
self.config
.author_terms
.iter()
.any(|t| lower.contains(&t.to_lowercase()))
}
async fn scan_repo(&self, repo: &str) -> Result<usize, SourceError> {
let url = self.pushlog_url(repo);
let resp = self
.client
.get(&url)
.header(header::USER_AGENT, USER_AGENT)
.send()
.await
.map_err(|e| SourceError::Http(e.to_string()))?;
if !resp.status().is_success() {
return Err(SourceError::Http(format!("{} GET {}", resp.status(), url)));
}
let body: Value = resp
.json()
.await
.map_err(|e| SourceError::Parse(e.to_string()))?;
let mut events = Vec::new();
if let Some(pushes) = body.get("pushes").and_then(Value::as_object) {
for (pushid, push) in pushes {
let pushed_at_secs = push.get("date").and_then(Value::as_i64).unwrap_or(0);
let pushed_at = Utc
.timestamp_opt(pushed_at_secs, 0)
.single()
.unwrap_or_else(Utc::now);
let pusher = push
.get("user")
.and_then(Value::as_str)
.unwrap_or("")
.to_string();
if let Some(changesets) = push.get("changesets").and_then(Value::as_array) {
for cs in changesets {
let author = cs.get("author").and_then(Value::as_str).unwrap_or("");
if !self.matches_author(author) {
continue;
}
let node = cs.get("node").and_then(Value::as_str).unwrap_or("");
if node.is_empty() {
continue;
}
let occurred_at = cs
.get("date")
.and_then(Value::as_array)
.and_then(|a| parse_hg_date(a))
.unwrap_or(pushed_at);
let mut payload = cs.clone();
if let Some(obj) = payload.as_object_mut() {
obj.insert("_repo".into(), Value::String(repo.into()));
obj.insert(
"_host".into(),
Value::String(self.config.host.clone()),
);
obj.insert("_pusher".into(), Value::String(pusher.clone()));
obj.insert(
"_pushid".into(),
Value::String(pushid.clone()),
);
}
events.push(Event {
id: format!("hg:{repo}:{node}"),
source: Source::Hg,
action: "Commit".into(),
occurred_at,
// mozilla hg-edge is exclusively public.
public: true,
payload,
});
}
}
}
}
Ok(self.writer.upsert_events(&events).await?)
}
}
#[async_trait]
impl EventSource for HgSource {
fn name(&self) -> &'static str {
SOURCE_NAME
}
async fn poll(&self) -> Result<usize, SourceError> {
let mut total = 0usize;
for repo in &self.config.repos {
match self.scan_repo(repo).await {
Ok(n) => {
debug!(repo, ingested = n, "hg repo scan complete");
total += n;
}
Err(e) => {
warn!(repo, error = %e, "hg repo scan failed; continuing");
}
}
}
self.state.touch(SOURCE_NAME).await?;
Ok(total)
}
}
/// Parse a pushlog date array `[seconds, tz_offset_secs]` into UTC.
fn parse_hg_date(arr: &[Value]) -> Option<DateTime<Utc>> {
let secs = arr.first()?.as_f64()? as i64;
Utc.timestamp_opt(secs, 0).single()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn matches_author_substring_case_insensitive() {
let s = HgSource {
client: Client::new(),
writer: Arc::new(NoopWriter),
state: Arc::new(NoopState),
config: HgConfig {
author_terms: vec!["thijssen".into(), "grenade".into()],
..Default::default()
},
};
assert!(s.matches_author("Rob Thijssen <rob@example.com>"));
assert!(s.matches_author("grenade@example"));
assert!(s.matches_author("THIJSSEN"));
assert!(!s.matches_author("Other Person <other@example>"));
}
#[test]
fn parse_hg_date_handles_seconds() {
let arr = vec![Value::from(1_700_000_000_f64), Value::from(0_f64)];
let dt = parse_hg_date(&arr).expect("parses");
assert_eq!(dt.timestamp(), 1_700_000_000);
}
// Tiny stub impls just so we can construct an HgSource for unit tests.
struct NoopWriter;
#[async_trait]
impl EventWriter for NoopWriter {
async fn upsert_events(
&self,
_events: &[Event],
) -> Result<usize, moments_core::StoreError> {
Ok(0)
}
}
struct NoopState;
#[async_trait]
impl PollerStateStore for NoopState {
async fn load(
&self,
_source: &str,
) -> Result<Option<moments_core::PollerState>, moments_core::StoreError> {
Ok(None)
}
async fn save(
&self,
_source: &str,
_etag: Option<&str>,
_last_modified: Option<DateTime<Utc>>,
) -> Result<(), moments_core::StoreError> {
Ok(())
}
async fn touch(&self, _source: &str) -> Result<(), moments_core::StoreError> {
Ok(())
}
}
}

View File

@@ -1,6 +1,8 @@
pub mod bugzilla;
pub mod gitea;
pub mod github;
pub mod github_search;
pub mod hg;
use async_trait::async_trait;
use chrono::{DateTime, Utc};