//! hg-edge.mozilla.org changeset ingestion via `json-log` revset queries. //! //! Uses the `json-log?rev=author(term)` endpoint which returns changesets //! by the *author* (not the pusher), so it captures commits landed by //! sheriffs on behalf of the contributor. //! //! Repos are discovered within configured groups (e.g. `build`) via the //! `/{group}/?style=json` index, plus any individually listed repos //! (e.g. `mozilla-central`). Once the first successful scan completes //! (poller state is touched), all subsequent polls are skipped — the //! data is historical and will not change. use std::sync::Arc; use async_trait::async_trait; use chrono::{DateTime, TimeZone, Utc}; use moments_core::{EventSource, EventWriter, PollerStateStore, SourceError}; use moments_entities::{Event, Source}; use reqwest::{Client, header}; use serde_json::Value; use tracing::{debug, warn}; const SOURCE_NAME: &str = "hg"; const USER_AGENT: &str = concat!( "moments/", env!("CARGO_PKG_VERSION"), " (+https://rob.tn)" ); /// Maximum changesets returned per json-log request. const REV_COUNT: u32 = 500; #[derive(Clone, Debug)] pub struct HgConfig { pub host: String, /// Substrings matched via `author(term)` revset queries. pub author_terms: Vec, /// Repo groups to scan — each is enumerated via `/{group}/?style=json`. pub groups: Vec, /// Individual repos to scan (e.g. `mozilla-central`). pub repos: Vec, } impl Default for HgConfig { fn default() -> Self { Self { host: "hg-edge.mozilla.org".into(), author_terms: vec!["rthijssen".into(), "grenade".into()], groups: vec!["build".into(), "integration".into()], repos: vec!["mozilla-central".into()], } } } pub struct HgSource { client: Client, writer: Arc, state: Arc, config: HgConfig, } impl HgSource { pub fn new( client: Client, writer: Arc, state: Arc, config: HgConfig, ) -> Self { Self { client, writer, state, config, } } /// Discover repos in a group via `/{group}/?style=json`. async fn discover_repos(&self, group: &str) -> Result, SourceError> { let url = format!("https://{}/{}/?style=json", self.config.host, group); let resp = self .client .get(&url) .header(header::USER_AGENT, USER_AGENT) .send() .await .map_err(|e| SourceError::Http(e.to_string()))?; if !resp.status().is_success() { warn!(group, status = %resp.status(), "failed to discover repos in group"); return Ok(vec![]); } let body: Value = resp .json() .await .map_err(|e| SourceError::Parse(e.to_string()))?; Ok(body .get("entries") .and_then(Value::as_array) .map(|entries| { entries .iter() .filter_map(|e| { e.get("name") .and_then(Value::as_str) .map(|name| format!("{group}/{name}")) }) .collect() }) .unwrap_or_default()) } fn log_url(&self, repo: &str, author_term: &str) -> String { format!( "https://{}/{}/json-log?rev=author({})&style=json&revcount={}", self.config.host, repo, author_term, REV_COUNT ) } async fn scan_repo(&self, repo: &str) -> Result { let mut all_events = Vec::new(); for term in &self.config.author_terms { let url = self.log_url(repo, term); let resp = self .client .get(&url) .header(header::USER_AGENT, USER_AGENT) .send() .await .map_err(|e| SourceError::Http(e.to_string()))?; if !resp.status().is_success() { return Err(SourceError::Http(format!("{} GET {}", resp.status(), url))); } let body: Value = resp .json() .await .map_err(|e| SourceError::Parse(e.to_string()))?; if let Some(entries) = body.get("entries").and_then(Value::as_array) { for entry in entries { let node = entry.get("node").and_then(Value::as_str).unwrap_or(""); if node.is_empty() { continue; } let occurred_at = entry .get("date") .and_then(Value::as_array) .and_then(|a| parse_hg_date(a)) .unwrap_or_else(Utc::now); let mut payload = entry.clone(); if let Some(obj) = payload.as_object_mut() { obj.insert("_repo".into(), Value::String(repo.into())); obj.insert( "_host".into(), Value::String(self.config.host.clone()), ); } all_events.push(Event { id: format!("hg:{repo}:{node}"), source: Source::Hg, action: "Commit".into(), occurred_at, public: true, payload, }); } } } Ok(self.writer.upsert_events(&all_events).await?) } } #[async_trait] impl EventSource for HgSource { fn name(&self) -> &'static str { SOURCE_NAME } async fn poll(&self) -> Result { // hg repos are archived — one complete scan is sufficient. if self.state.load(SOURCE_NAME).await?.is_some() { debug!("hg already backfilled, skipping"); return Ok(0); } let mut repos: Vec = self.config.repos.clone(); for group in &self.config.groups { let discovered = self.discover_repos(group).await?; debug!(group, repos = discovered.len(), "discovered hg repos"); repos.extend(discovered); } let mut total = 0usize; for repo in &repos { match self.scan_repo(repo).await { Ok(n) => { if n > 0 { debug!(repo, ingested = n, "hg repo scan complete"); } total += n; } Err(e) => { warn!(repo, error = %e, "hg repo scan failed; continuing"); } } } self.state.touch(SOURCE_NAME).await?; debug!(ingested = total, "hg backfill complete"); Ok(total) } } /// Parse a hgweb date array `[seconds, tz_offset_secs]` into UTC. fn parse_hg_date(arr: &[Value]) -> Option> { let secs = arr.first()?.as_f64()? as i64; Utc.timestamp_opt(secs, 0).single() } #[cfg(test)] mod tests { use super::*; #[test] fn parse_hg_date_handles_seconds() { let arr = vec![Value::from(1_700_000_000_f64), Value::from(0_f64)]; let dt = parse_hg_date(&arr).expect("parses"); assert_eq!(dt.timestamp(), 1_700_000_000); } #[test] fn log_url_uses_revset_author_query() { let src = HgSource { client: Client::new(), writer: Arc::new(NoopWriter), state: Arc::new(NoopState), config: HgConfig::default(), }; let url = src.log_url("mozilla-central", "thijssen"); assert!(url.contains("json-log?rev=author(thijssen)")); assert!(url.contains("revcount=500")); } // Tiny stub impls just so we can construct an HgSource for unit tests. struct NoopWriter; #[async_trait] impl EventWriter for NoopWriter { async fn upsert_events( &self, _events: &[Event], ) -> Result { Ok(0) } async fn upsert_repo_languages( &self, _languages: &[moments_entities::RepoLanguage], ) -> Result { Ok(0) } } struct NoopState; #[async_trait] impl PollerStateStore for NoopState { async fn load( &self, _source: &str, ) -> Result, moments_core::StoreError> { Ok(None) } async fn save( &self, _source: &str, _etag: Option<&str>, _last_modified: Option>, ) -> Result<(), moments_core::StoreError> { Ok(()) } async fn touch(&self, _source: &str) -> Result<(), moments_core::StoreError> { Ok(()) } } }