Compare commits
4 Commits
4c8a663288
...
2da9461b44
| Author | SHA1 | Date | |
|---|---|---|---|
|
2da9461b44
|
|||
|
3f3a1fb33e
|
|||
|
88fbbba60b
|
|||
|
1bbe55dc84
|
@@ -14,8 +14,9 @@ GITEA_TOKEN={{GITEA_TOKEN}}
|
||||
GITEA_POLL_INTERVAL_SECS=600
|
||||
|
||||
HG_HOST=hg-edge.mozilla.org
|
||||
HG_REPOS=build/puppet,build/tools,build/buildbot-configs
|
||||
HG_AUTHOR_TERMS=thijssen,grenade
|
||||
HG_GROUPS=build,integration
|
||||
HG_REPOS=mozilla-central
|
||||
HG_AUTHOR_TERMS=rthijssen,grenade
|
||||
HG_POLL_INTERVAL_SECS=86400
|
||||
|
||||
BUGZILLA_HOST=bugzilla.mozilla.org
|
||||
|
||||
@@ -71,10 +71,17 @@ impl GiteaSource {
|
||||
}
|
||||
}
|
||||
|
||||
fn page_url(&self, page: u32) -> String {
|
||||
fn user_feed_base_url(&self) -> String {
|
||||
format!(
|
||||
"https://{}/api/v1/users/{}/activities/feeds?only-performed-by=true&limit={}&page={}",
|
||||
self.config.host, self.config.user, self.config.per_page, page
|
||||
"https://{}/api/v1/users/{}/activities/feeds?only-performed-by=true&limit={}",
|
||||
self.config.host, self.config.user, self.config.per_page
|
||||
)
|
||||
}
|
||||
|
||||
fn org_feed_base_url(&self, org: &str) -> String {
|
||||
format!(
|
||||
"https://{}/api/v1/orgs/{}/activities/feeds?limit={}",
|
||||
self.config.host, org, self.config.per_page
|
||||
)
|
||||
}
|
||||
|
||||
@@ -87,22 +94,51 @@ impl GiteaSource {
|
||||
}
|
||||
req
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl EventSource for GiteaSource {
|
||||
fn name(&self) -> &'static str {
|
||||
SOURCE_NAME
|
||||
/// Discover organizations the authenticated user belongs to.
|
||||
/// Returns an empty vec if no token is configured or the request fails.
|
||||
async fn discover_orgs(&self) -> Result<Vec<String>, SourceError> {
|
||||
if self.config.token.is_none() {
|
||||
return Ok(vec![]);
|
||||
}
|
||||
let url = format!("https://{}/api/v1/user/orgs", self.config.host);
|
||||
let req = self.apply_headers(self.client.get(&url));
|
||||
let resp = req
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| SourceError::Http(e.to_string()))?;
|
||||
if !resp.status().is_success() {
|
||||
tracing::warn!(status = %resp.status(), "failed to discover gitea orgs");
|
||||
return Ok(vec![]);
|
||||
}
|
||||
let orgs: Vec<Value> = resp
|
||||
.json()
|
||||
.await
|
||||
.map_err(|e| SourceError::Parse(e.to_string()))?;
|
||||
Ok(orgs
|
||||
.iter()
|
||||
.filter_map(|o| o.get("username").and_then(Value::as_str).map(String::from))
|
||||
.collect())
|
||||
}
|
||||
|
||||
async fn poll(&self) -> Result<usize, SourceError> {
|
||||
let prior = self.state.load(SOURCE_NAME).await?;
|
||||
/// Poll a single activity feed, paginating on first run. When `filter_user`
|
||||
/// is true, only events performed by `self.config.user` are ingested (used
|
||||
/// for org feeds which contain all members' activity).
|
||||
///
|
||||
/// `base_url` should contain everything except the `&page=N` suffix.
|
||||
async fn poll_feed(
|
||||
&self,
|
||||
state_key: &str,
|
||||
base_url: &str,
|
||||
filter_user: bool,
|
||||
) -> Result<usize, SourceError> {
|
||||
let prior = self.state.load(state_key).await?;
|
||||
let first_run = prior.is_none();
|
||||
let max_pages = if first_run { MAX_BACKFILL_PAGES } else { 1 };
|
||||
|
||||
let mut total = 0usize;
|
||||
for page in 1..=max_pages {
|
||||
let url = self.page_url(page);
|
||||
let url = format!("{base_url}&page={page}");
|
||||
let req = self.apply_headers(self.client.get(&url));
|
||||
let resp = req
|
||||
.send()
|
||||
@@ -121,6 +157,16 @@ impl EventSource for GiteaSource {
|
||||
|
||||
let events: Vec<Event> = items
|
||||
.iter()
|
||||
.filter(|it| {
|
||||
if !filter_user {
|
||||
return true;
|
||||
}
|
||||
it.get("act_user")
|
||||
.and_then(|u| u.get("login"))
|
||||
.and_then(Value::as_str)
|
||||
.map(|login| login.eq_ignore_ascii_case(&self.config.user))
|
||||
.unwrap_or(false)
|
||||
})
|
||||
.filter_map(|it| parse_gitea_event(it, &self.config.host))
|
||||
.collect();
|
||||
total += self.writer.upsert_events(&events).await?;
|
||||
@@ -130,8 +176,37 @@ impl EventSource for GiteaSource {
|
||||
}
|
||||
}
|
||||
|
||||
self.state.touch(SOURCE_NAME).await?;
|
||||
debug!(ingested = total, "gitea poll complete");
|
||||
self.state.touch(state_key).await?;
|
||||
Ok(total)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl EventSource for GiteaSource {
|
||||
fn name(&self) -> &'static str {
|
||||
SOURCE_NAME
|
||||
}
|
||||
|
||||
async fn poll(&self) -> Result<usize, SourceError> {
|
||||
// Poll user's own activity feed (existing behavior).
|
||||
let user_url = self.user_feed_base_url();
|
||||
let mut total = self.poll_feed(SOURCE_NAME, &user_url, false).await?;
|
||||
|
||||
// Discover orgs and poll each org's activity feed, filtering for
|
||||
// events performed by this user.
|
||||
let orgs = self.discover_orgs().await?;
|
||||
for org in &orgs {
|
||||
let state_key = format!("gitea:org:{org}");
|
||||
let org_url = self.org_feed_base_url(org);
|
||||
match self.poll_feed(&state_key, &org_url, true).await {
|
||||
Ok(n) => total += n,
|
||||
Err(e) => {
|
||||
tracing::warn!(org = %org, error = %e, "failed to poll org feed");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
debug!(ingested = total, orgs = orgs.len(), "gitea poll complete");
|
||||
Ok(total)
|
||||
}
|
||||
}
|
||||
@@ -191,6 +266,37 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn org_event_user_filter_predicate() {
|
||||
let by_user = json!({
|
||||
"id": 500, "op_type": "commit_repo", "is_private": false,
|
||||
"created": "2026-05-03T10:00:00Z",
|
||||
"act_user": { "login": "grenade" },
|
||||
"repo": { "full_name": "myorg/somerepo" }
|
||||
});
|
||||
let by_other = json!({
|
||||
"id": 501, "op_type": "commit_repo", "is_private": false,
|
||||
"created": "2026-05-03T10:01:00Z",
|
||||
"act_user": { "login": "otherperson" },
|
||||
"repo": { "full_name": "myorg/somerepo" }
|
||||
});
|
||||
// Both parse as valid events
|
||||
assert!(parse_gitea_event(&by_user, "git.lair.cafe").is_some());
|
||||
assert!(parse_gitea_event(&by_other, "git.lair.cafe").is_some());
|
||||
// The user-filter predicate used by poll_feed
|
||||
let is_user = |item: &Value, user: &str| -> bool {
|
||||
item.get("act_user")
|
||||
.and_then(|u| u.get("login"))
|
||||
.and_then(Value::as_str)
|
||||
.map(|login| login.eq_ignore_ascii_case(user))
|
||||
.unwrap_or(false)
|
||||
};
|
||||
assert!(is_user(&by_user, "grenade"));
|
||||
assert!(!is_user(&by_other, "grenade"));
|
||||
// Case-insensitive match
|
||||
assert!(is_user(&by_user, "Grenade"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn private_event_marked_private() {
|
||||
let raw = json!({
|
||||
|
||||
@@ -1,14 +1,14 @@
|
||||
//! hg-edge.mozilla.org pushlog ingestion.
|
||||
//! hg-edge.mozilla.org changeset ingestion via `json-log` revset queries.
|
||||
//!
|
||||
//! mozilla's hg pushlog filter `user=` matches the *pusher*, not the
|
||||
//! changeset author. As a community-level contributor whose code was
|
||||
//! reviewed and pushed by sheriffs/reviewers, the user filter returns 0
|
||||
//! results. So this source pulls full pushlogs from a configured set of
|
||||
//! repos and filters changeset authors client-side by substring match.
|
||||
//! Uses the `json-log?rev=author(term)` endpoint which returns changesets
|
||||
//! by the *author* (not the pusher), so it captures commits landed by
|
||||
//! sheriffs on behalf of the contributor.
|
||||
//!
|
||||
//! The result set is historical (mozilla retired hg) — no new events
|
||||
//! expected after the initial backfill. Daily polling keeps the upserts
|
||||
//! cheap and idempotent.
|
||||
//! Repos are discovered within configured groups (e.g. `build`) via the
|
||||
//! `/{group}/?style=json` index, plus any individually listed repos
|
||||
//! (e.g. `mozilla-central`). Once the first successful scan completes
|
||||
//! (poller state is touched), all subsequent polls are skipped — the
|
||||
//! data is historical and will not change.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -26,13 +26,17 @@ const USER_AGENT: &str = concat!(
|
||||
env!("CARGO_PKG_VERSION"),
|
||||
" (+https://rob.tn)"
|
||||
);
|
||||
/// Maximum changesets returned per json-log request.
|
||||
const REV_COUNT: u32 = 500;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct HgConfig {
|
||||
pub host: String,
|
||||
/// Case-insensitive substrings matched against changeset author fields.
|
||||
/// Substrings matched via `author(term)` revset queries.
|
||||
pub author_terms: Vec<String>,
|
||||
/// Repo paths under host, e.g. "build/puppet".
|
||||
/// Repo groups to scan — each is enumerated via `/{group}/?style=json`.
|
||||
pub groups: Vec<String>,
|
||||
/// Individual repos to scan (e.g. `mozilla-central`).
|
||||
pub repos: Vec<String>,
|
||||
}
|
||||
|
||||
@@ -40,12 +44,9 @@ impl Default for HgConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
host: "hg-edge.mozilla.org".into(),
|
||||
author_terms: vec!["thijssen".into(), "grenade".into()],
|
||||
repos: vec![
|
||||
"build/puppet".into(),
|
||||
"build/tools".into(),
|
||||
"build/buildbot-configs".into(),
|
||||
],
|
||||
author_terms: vec!["rthijssen".into(), "grenade".into()],
|
||||
groups: vec!["build".into(), "integration".into()],
|
||||
repos: vec!["mozilla-central".into()],
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -72,23 +73,51 @@ impl HgSource {
|
||||
}
|
||||
}
|
||||
|
||||
fn pushlog_url(&self, repo: &str) -> String {
|
||||
/// Discover repos in a group via `/{group}/?style=json`.
|
||||
async fn discover_repos(&self, group: &str) -> Result<Vec<String>, SourceError> {
|
||||
let url = format!("https://{}/{}/?style=json", self.config.host, group);
|
||||
let resp = self
|
||||
.client
|
||||
.get(&url)
|
||||
.header(header::USER_AGENT, USER_AGENT)
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| SourceError::Http(e.to_string()))?;
|
||||
if !resp.status().is_success() {
|
||||
warn!(group, status = %resp.status(), "failed to discover repos in group");
|
||||
return Ok(vec![]);
|
||||
}
|
||||
let body: Value = resp
|
||||
.json()
|
||||
.await
|
||||
.map_err(|e| SourceError::Parse(e.to_string()))?;
|
||||
Ok(body
|
||||
.get("entries")
|
||||
.and_then(Value::as_array)
|
||||
.map(|entries| {
|
||||
entries
|
||||
.iter()
|
||||
.filter_map(|e| {
|
||||
e.get("name")
|
||||
.and_then(Value::as_str)
|
||||
.map(|name| format!("{group}/{name}"))
|
||||
})
|
||||
.collect()
|
||||
})
|
||||
.unwrap_or_default())
|
||||
}
|
||||
|
||||
fn log_url(&self, repo: &str, author_term: &str) -> String {
|
||||
format!(
|
||||
"https://{}/{}/json-pushes?version=2&full=1",
|
||||
self.config.host, repo
|
||||
"https://{}/{}/json-log?rev=author({})&style=json&revcount={}",
|
||||
self.config.host, repo, author_term, REV_COUNT
|
||||
)
|
||||
}
|
||||
|
||||
fn matches_author(&self, author: &str) -> bool {
|
||||
let lower = author.to_lowercase();
|
||||
self.config
|
||||
.author_terms
|
||||
.iter()
|
||||
.any(|t| lower.contains(&t.to_lowercase()))
|
||||
}
|
||||
|
||||
async fn scan_repo(&self, repo: &str) -> Result<usize, SourceError> {
|
||||
let url = self.pushlog_url(repo);
|
||||
let mut all_events = Vec::new();
|
||||
for term in &self.config.author_terms {
|
||||
let url = self.log_url(repo, term);
|
||||
let resp = self
|
||||
.client
|
||||
.get(&url)
|
||||
@@ -104,62 +133,38 @@ impl HgSource {
|
||||
.await
|
||||
.map_err(|e| SourceError::Parse(e.to_string()))?;
|
||||
|
||||
let mut events = Vec::new();
|
||||
if let Some(pushes) = body.get("pushes").and_then(Value::as_object) {
|
||||
for (pushid, push) in pushes {
|
||||
let pushed_at_secs = push.get("date").and_then(Value::as_i64).unwrap_or(0);
|
||||
let pushed_at = Utc
|
||||
.timestamp_opt(pushed_at_secs, 0)
|
||||
.single()
|
||||
.unwrap_or_else(Utc::now);
|
||||
let pusher = push
|
||||
.get("user")
|
||||
.and_then(Value::as_str)
|
||||
.unwrap_or("")
|
||||
.to_string();
|
||||
if let Some(changesets) = push.get("changesets").and_then(Value::as_array) {
|
||||
for cs in changesets {
|
||||
let author = cs.get("author").and_then(Value::as_str).unwrap_or("");
|
||||
if !self.matches_author(author) {
|
||||
continue;
|
||||
}
|
||||
let node = cs.get("node").and_then(Value::as_str).unwrap_or("");
|
||||
if let Some(entries) = body.get("entries").and_then(Value::as_array) {
|
||||
for entry in entries {
|
||||
let node = entry.get("node").and_then(Value::as_str).unwrap_or("");
|
||||
if node.is_empty() {
|
||||
continue;
|
||||
}
|
||||
let occurred_at = cs
|
||||
let occurred_at = entry
|
||||
.get("date")
|
||||
.and_then(Value::as_array)
|
||||
.and_then(|a| parse_hg_date(a))
|
||||
.unwrap_or(pushed_at);
|
||||
.unwrap_or_else(Utc::now);
|
||||
|
||||
let mut payload = cs.clone();
|
||||
let mut payload = entry.clone();
|
||||
if let Some(obj) = payload.as_object_mut() {
|
||||
obj.insert("_repo".into(), Value::String(repo.into()));
|
||||
obj.insert(
|
||||
"_host".into(),
|
||||
Value::String(self.config.host.clone()),
|
||||
);
|
||||
obj.insert("_pusher".into(), Value::String(pusher.clone()));
|
||||
obj.insert(
|
||||
"_pushid".into(),
|
||||
Value::String(pushid.clone()),
|
||||
);
|
||||
}
|
||||
events.push(Event {
|
||||
all_events.push(Event {
|
||||
id: format!("hg:{repo}:{node}"),
|
||||
source: Source::Hg,
|
||||
action: "Commit".into(),
|
||||
occurred_at,
|
||||
// mozilla hg-edge is exclusively public.
|
||||
public: true,
|
||||
payload,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(self.writer.upsert_events(&events).await?)
|
||||
Ok(self.writer.upsert_events(&all_events).await?)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -170,11 +175,26 @@ impl EventSource for HgSource {
|
||||
}
|
||||
|
||||
async fn poll(&self) -> Result<usize, SourceError> {
|
||||
// hg repos are archived — one complete scan is sufficient.
|
||||
if self.state.load(SOURCE_NAME).await?.is_some() {
|
||||
debug!("hg already backfilled, skipping");
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
let mut repos: Vec<String> = self.config.repos.clone();
|
||||
for group in &self.config.groups {
|
||||
let discovered = self.discover_repos(group).await?;
|
||||
debug!(group, repos = discovered.len(), "discovered hg repos");
|
||||
repos.extend(discovered);
|
||||
}
|
||||
|
||||
let mut total = 0usize;
|
||||
for repo in &self.config.repos {
|
||||
for repo in &repos {
|
||||
match self.scan_repo(repo).await {
|
||||
Ok(n) => {
|
||||
if n > 0 {
|
||||
debug!(repo, ingested = n, "hg repo scan complete");
|
||||
}
|
||||
total += n;
|
||||
}
|
||||
Err(e) => {
|
||||
@@ -183,11 +203,12 @@ impl EventSource for HgSource {
|
||||
}
|
||||
}
|
||||
self.state.touch(SOURCE_NAME).await?;
|
||||
debug!(ingested = total, "hg backfill complete");
|
||||
Ok(total)
|
||||
}
|
||||
}
|
||||
|
||||
/// Parse a pushlog date array `[seconds, tz_offset_secs]` into UTC.
|
||||
/// Parse a hgweb date array `[seconds, tz_offset_secs]` into UTC.
|
||||
fn parse_hg_date(arr: &[Value]) -> Option<DateTime<Utc>> {
|
||||
let secs = arr.first()?.as_f64()? as i64;
|
||||
Utc.timestamp_opt(secs, 0).single()
|
||||
@@ -197,23 +218,6 @@ fn parse_hg_date(arr: &[Value]) -> Option<DateTime<Utc>> {
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn matches_author_substring_case_insensitive() {
|
||||
let s = HgSource {
|
||||
client: Client::new(),
|
||||
writer: Arc::new(NoopWriter),
|
||||
state: Arc::new(NoopState),
|
||||
config: HgConfig {
|
||||
author_terms: vec!["thijssen".into(), "grenade".into()],
|
||||
..Default::default()
|
||||
},
|
||||
};
|
||||
assert!(s.matches_author("Rob Thijssen <rob@example.com>"));
|
||||
assert!(s.matches_author("grenade@example"));
|
||||
assert!(s.matches_author("THIJSSEN"));
|
||||
assert!(!s.matches_author("Other Person <other@example>"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_hg_date_handles_seconds() {
|
||||
let arr = vec![Value::from(1_700_000_000_f64), Value::from(0_f64)];
|
||||
@@ -221,6 +225,19 @@ mod tests {
|
||||
assert_eq!(dt.timestamp(), 1_700_000_000);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn log_url_uses_revset_author_query() {
|
||||
let src = HgSource {
|
||||
client: Client::new(),
|
||||
writer: Arc::new(NoopWriter),
|
||||
state: Arc::new(NoopState),
|
||||
config: HgConfig::default(),
|
||||
};
|
||||
let url = src.log_url("mozilla-central", "thijssen");
|
||||
assert!(url.contains("json-log?rev=author(thijssen)"));
|
||||
assert!(url.contains("revcount=500"));
|
||||
}
|
||||
|
||||
// Tiny stub impls just so we can construct an HgSource for unit tests.
|
||||
struct NoopWriter;
|
||||
#[async_trait]
|
||||
|
||||
@@ -51,21 +51,31 @@ struct Args {
|
||||
#[arg(long, env = "HG_HOST", default_value = "hg-edge.mozilla.org")]
|
||||
hg_host: String,
|
||||
|
||||
/// Comma-separated mozilla hg repo paths to scan, e.g. "build/puppet,build/tools".
|
||||
/// Comma-separated repo groups to scan. Repos within each group are
|
||||
/// discovered via `/{group}/?style=json`.
|
||||
#[arg(
|
||||
long,
|
||||
env = "HG_GROUPS",
|
||||
value_delimiter = ',',
|
||||
default_value = "build,integration"
|
||||
)]
|
||||
hg_groups: Vec<String>,
|
||||
|
||||
/// Comma-separated individual repos to scan (e.g. `mozilla-central`).
|
||||
#[arg(
|
||||
long,
|
||||
env = "HG_REPOS",
|
||||
value_delimiter = ',',
|
||||
default_value = "build/puppet,build/tools,build/buildbot-configs"
|
||||
default_value = "mozilla-central"
|
||||
)]
|
||||
hg_repos: Vec<String>,
|
||||
|
||||
/// Comma-separated case-insensitive substrings matched against changeset author fields.
|
||||
/// Comma-separated author substrings for `author()` revset queries.
|
||||
#[arg(
|
||||
long,
|
||||
env = "HG_AUTHOR_TERMS",
|
||||
value_delimiter = ',',
|
||||
default_value = "thijssen,grenade"
|
||||
default_value = "rthijssen,grenade"
|
||||
)]
|
||||
hg_author_terms: Vec<String>,
|
||||
|
||||
@@ -141,6 +151,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
HgConfig {
|
||||
host: args.hg_host.clone(),
|
||||
author_terms: args.hg_author_terms.clone(),
|
||||
groups: args.hg_groups.clone(),
|
||||
repos: args.hg_repos.clone(),
|
||||
},
|
||||
)) as Arc<dyn EventSource>;
|
||||
@@ -162,7 +173,9 @@ async fn main() -> anyhow::Result<()> {
|
||||
gitea_host = args.gitea_host,
|
||||
gitea_user = args.gitea_user,
|
||||
hg_host = args.hg_host,
|
||||
hg_groups = ?args.hg_groups,
|
||||
hg_repos = ?args.hg_repos,
|
||||
hg_author_terms = ?args.hg_author_terms,
|
||||
bugzilla_host = args.bugzilla_host,
|
||||
bugzilla_email = args.bugzilla_email,
|
||||
events_interval_secs = args.interval_secs,
|
||||
|
||||
140
script/hg-ingest.sh
Executable file
140
script/hg-ingest.sh
Executable file
@@ -0,0 +1,140 @@
|
||||
#!/usr/bin/env bash
|
||||
#
|
||||
# One-shot hg changeset ingestion via local clones.
|
||||
#
|
||||
# Bare-clones each hg repo, extracts changesets matching author terms,
|
||||
# and inserts them into the moments database. Sets poller_state so the
|
||||
# worker won't re-scan.
|
||||
#
|
||||
# Requirements: hg (mercurial), psql, jq
|
||||
#
|
||||
# Usage:
|
||||
# DATABASE_URL="postgres://..." ./script/hg-ingest.sh
|
||||
#
|
||||
set -euo pipefail
|
||||
|
||||
DATABASE_URL="${DATABASE_URL:-postgres://moments_rw@magrathea.kosherinata.internal:5432/moments?sslmode=verify-full&sslrootcert=/etc/pki/ca-trust/source/anchors/root-internal.pem&sslcert=/etc/pki/tls/misc/$(hostname -f).pem&sslkey=/etc/pki/tls/private/$(hostname -f).pem}"
|
||||
HG_HOST="${HG_HOST:-hg-edge.mozilla.org}"
|
||||
WORK_DIR="${HG_WORK_DIR:-$HOME/hg}"
|
||||
|
||||
# Repos to clone (groups are expanded inline)
|
||||
REPOS=(
|
||||
integration/mozilla-inbound
|
||||
integration/autoland
|
||||
integration/fx-team
|
||||
integration/b2g-inbound
|
||||
build/puppet
|
||||
build/tools
|
||||
build/buildbot
|
||||
build/buildbot-configs
|
||||
build/slave_health
|
||||
build/mozharness
|
||||
build/braindump
|
||||
build/cloud-tools
|
||||
build/compare-locales
|
||||
build/nagios-core
|
||||
build/partner-repacks
|
||||
build/preproduction
|
||||
build/rpm-sources
|
||||
build/talos
|
||||
build/tupperware
|
||||
build/ash-mozharness
|
||||
build/autoland
|
||||
build/opsi-package-sources
|
||||
)
|
||||
|
||||
# Author terms — matched case-insensitively against changeset author fields
|
||||
AUTHOR_TERMS=("rthijssen" "grenade")
|
||||
|
||||
: "${DATABASE_URL:?DATABASE_URL must be set}"
|
||||
|
||||
mkdir -p "$WORK_DIR"
|
||||
|
||||
total=0
|
||||
|
||||
CLONE_DIR="$WORK_DIR/clone"
|
||||
CACHE_DIR="$WORK_DIR/cache"
|
||||
mkdir -p "$CACHE_DIR"
|
||||
cd "$WORK_DIR"
|
||||
|
||||
for repo in "${REPOS[@]}"; do
|
||||
cache_file="$CACHE_DIR/$(echo "$repo" | tr '/' '_').tsv"
|
||||
|
||||
# Skip repos already cached (re-run safe)
|
||||
if [ -f "$cache_file" ]; then
|
||||
echo "[hg-ingest] $repo: using cached results"
|
||||
else
|
||||
# Remove any previous clone to keep only one on disk
|
||||
rm -rf "$CLONE_DIR"
|
||||
|
||||
echo "[hg-ingest] cloning $repo"
|
||||
if ! hg clone --noupdate "https://$HG_HOST/$repo" "$CLONE_DIR"; then
|
||||
echo "[hg-ingest] clone failed: $repo (skipping)"
|
||||
continue
|
||||
fi
|
||||
|
||||
# Build revset: author(term1) or author(term2) ...
|
||||
revset=""
|
||||
for term in "${AUTHOR_TERMS[@]}"; do
|
||||
if [ -z "$revset" ]; then
|
||||
revset="author('$term')"
|
||||
else
|
||||
revset="$revset or author('$term')"
|
||||
fi
|
||||
done
|
||||
|
||||
# Extract matching changesets to cache file
|
||||
hg log -R "$CLONE_DIR" -r "$revset" \
|
||||
--template '{node}\t{author}\t{date|hgdate}\t{desc|firstline}\n' \
|
||||
> "$cache_file" || true
|
||||
|
||||
# Free disk immediately
|
||||
rm -rf "$CLONE_DIR"
|
||||
fi
|
||||
|
||||
# Ingest cached results into the database
|
||||
count=0
|
||||
while IFS=$'\t' read -r node author date_raw desc; do
|
||||
[ -z "$node" ] && continue
|
||||
|
||||
# {date|hgdate} outputs "timestamp offset" — take just the timestamp
|
||||
date_ts="${date_raw%% *}"
|
||||
|
||||
# Build ISO timestamp from unix epoch
|
||||
occurred_at=$(date -u -d "@${date_ts}" '+%Y-%m-%dT%H:%M:%SZ')
|
||||
|
||||
event_id="hg:${repo}:${node}"
|
||||
|
||||
# Build payload JSON (jq handles all escaping)
|
||||
payload=$(jq -n \
|
||||
--arg node "$node" \
|
||||
--arg user "$author" \
|
||||
--arg desc "$desc" \
|
||||
--arg repo "$repo" \
|
||||
--arg host "$HG_HOST" \
|
||||
'{node: $node, user: $user, desc: $desc, _repo: $repo, _host: $host}')
|
||||
|
||||
# Upsert into events table
|
||||
psql "$DATABASE_URL" -q -c "
|
||||
INSERT INTO events (id, source, action, occurred_at, public, payload)
|
||||
VALUES (\$\$${event_id}\$\$, 'hg', 'Commit', '${occurred_at}', true, \$\$${payload}\$\$::jsonb)
|
||||
ON CONFLICT (id) DO NOTHING;
|
||||
"
|
||||
|
||||
count=$((count + 1))
|
||||
done < "$cache_file"
|
||||
|
||||
if [ "$count" -gt 0 ]; then
|
||||
echo "[hg-ingest] $repo: $count changesets ingested"
|
||||
fi
|
||||
total=$((total + count))
|
||||
done
|
||||
|
||||
# Mark poller state so the worker skips hg
|
||||
psql "$DATABASE_URL" -q -c "
|
||||
INSERT INTO poller_state (source, last_fetched)
|
||||
VALUES ('hg', now())
|
||||
ON CONFLICT (source) DO UPDATE SET last_fetched = now();
|
||||
"
|
||||
|
||||
echo "[hg-ingest] done. total: $total changesets"
|
||||
@@ -34,6 +34,21 @@ a.hot-pink {
|
||||
color: #2c3e50;
|
||||
}
|
||||
|
||||
.vertical-timeline-element-content h4.vertical-timeline-element-title {
|
||||
font-size: 0.85rem;
|
||||
}
|
||||
|
||||
.vertical-timeline-element-content h5.vertical-timeline-element-subtitle {
|
||||
font-size: 0.75rem;
|
||||
}
|
||||
|
||||
.vertical-timeline-element-content p,
|
||||
.vertical-timeline-element-content ul,
|
||||
.vertical-timeline-element-content li,
|
||||
.vertical-timeline-element-content code {
|
||||
font-size: 0.75rem;
|
||||
}
|
||||
|
||||
.vertical-timeline-element-content a {
|
||||
color: #1565c0;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user