Compare commits
4 Commits
4c8a663288
...
2da9461b44
| Author | SHA1 | Date | |
|---|---|---|---|
|
2da9461b44
|
|||
|
3f3a1fb33e
|
|||
|
88fbbba60b
|
|||
|
1bbe55dc84
|
@@ -14,8 +14,9 @@ GITEA_TOKEN={{GITEA_TOKEN}}
|
|||||||
GITEA_POLL_INTERVAL_SECS=600
|
GITEA_POLL_INTERVAL_SECS=600
|
||||||
|
|
||||||
HG_HOST=hg-edge.mozilla.org
|
HG_HOST=hg-edge.mozilla.org
|
||||||
HG_REPOS=build/puppet,build/tools,build/buildbot-configs
|
HG_GROUPS=build,integration
|
||||||
HG_AUTHOR_TERMS=thijssen,grenade
|
HG_REPOS=mozilla-central
|
||||||
|
HG_AUTHOR_TERMS=rthijssen,grenade
|
||||||
HG_POLL_INTERVAL_SECS=86400
|
HG_POLL_INTERVAL_SECS=86400
|
||||||
|
|
||||||
BUGZILLA_HOST=bugzilla.mozilla.org
|
BUGZILLA_HOST=bugzilla.mozilla.org
|
||||||
|
|||||||
@@ -71,10 +71,17 @@ impl GiteaSource {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn page_url(&self, page: u32) -> String {
|
fn user_feed_base_url(&self) -> String {
|
||||||
format!(
|
format!(
|
||||||
"https://{}/api/v1/users/{}/activities/feeds?only-performed-by=true&limit={}&page={}",
|
"https://{}/api/v1/users/{}/activities/feeds?only-performed-by=true&limit={}",
|
||||||
self.config.host, self.config.user, self.config.per_page, page
|
self.config.host, self.config.user, self.config.per_page
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn org_feed_base_url(&self, org: &str) -> String {
|
||||||
|
format!(
|
||||||
|
"https://{}/api/v1/orgs/{}/activities/feeds?limit={}",
|
||||||
|
self.config.host, org, self.config.per_page
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -87,22 +94,51 @@ impl GiteaSource {
|
|||||||
}
|
}
|
||||||
req
|
req
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Discover organizations the authenticated user belongs to.
|
||||||
|
/// Returns an empty vec if no token is configured or the request fails.
|
||||||
|
async fn discover_orgs(&self) -> Result<Vec<String>, SourceError> {
|
||||||
|
if self.config.token.is_none() {
|
||||||
|
return Ok(vec![]);
|
||||||
|
}
|
||||||
|
let url = format!("https://{}/api/v1/user/orgs", self.config.host);
|
||||||
|
let req = self.apply_headers(self.client.get(&url));
|
||||||
|
let resp = req
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.map_err(|e| SourceError::Http(e.to_string()))?;
|
||||||
|
if !resp.status().is_success() {
|
||||||
|
tracing::warn!(status = %resp.status(), "failed to discover gitea orgs");
|
||||||
|
return Ok(vec![]);
|
||||||
|
}
|
||||||
|
let orgs: Vec<Value> = resp
|
||||||
|
.json()
|
||||||
|
.await
|
||||||
|
.map_err(|e| SourceError::Parse(e.to_string()))?;
|
||||||
|
Ok(orgs
|
||||||
|
.iter()
|
||||||
|
.filter_map(|o| o.get("username").and_then(Value::as_str).map(String::from))
|
||||||
|
.collect())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
/// Poll a single activity feed, paginating on first run. When `filter_user`
|
||||||
impl EventSource for GiteaSource {
|
/// is true, only events performed by `self.config.user` are ingested (used
|
||||||
fn name(&self) -> &'static str {
|
/// for org feeds which contain all members' activity).
|
||||||
SOURCE_NAME
|
///
|
||||||
}
|
/// `base_url` should contain everything except the `&page=N` suffix.
|
||||||
|
async fn poll_feed(
|
||||||
async fn poll(&self) -> Result<usize, SourceError> {
|
&self,
|
||||||
let prior = self.state.load(SOURCE_NAME).await?;
|
state_key: &str,
|
||||||
|
base_url: &str,
|
||||||
|
filter_user: bool,
|
||||||
|
) -> Result<usize, SourceError> {
|
||||||
|
let prior = self.state.load(state_key).await?;
|
||||||
let first_run = prior.is_none();
|
let first_run = prior.is_none();
|
||||||
let max_pages = if first_run { MAX_BACKFILL_PAGES } else { 1 };
|
let max_pages = if first_run { MAX_BACKFILL_PAGES } else { 1 };
|
||||||
|
|
||||||
let mut total = 0usize;
|
let mut total = 0usize;
|
||||||
for page in 1..=max_pages {
|
for page in 1..=max_pages {
|
||||||
let url = self.page_url(page);
|
let url = format!("{base_url}&page={page}");
|
||||||
let req = self.apply_headers(self.client.get(&url));
|
let req = self.apply_headers(self.client.get(&url));
|
||||||
let resp = req
|
let resp = req
|
||||||
.send()
|
.send()
|
||||||
@@ -121,6 +157,16 @@ impl EventSource for GiteaSource {
|
|||||||
|
|
||||||
let events: Vec<Event> = items
|
let events: Vec<Event> = items
|
||||||
.iter()
|
.iter()
|
||||||
|
.filter(|it| {
|
||||||
|
if !filter_user {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
it.get("act_user")
|
||||||
|
.and_then(|u| u.get("login"))
|
||||||
|
.and_then(Value::as_str)
|
||||||
|
.map(|login| login.eq_ignore_ascii_case(&self.config.user))
|
||||||
|
.unwrap_or(false)
|
||||||
|
})
|
||||||
.filter_map(|it| parse_gitea_event(it, &self.config.host))
|
.filter_map(|it| parse_gitea_event(it, &self.config.host))
|
||||||
.collect();
|
.collect();
|
||||||
total += self.writer.upsert_events(&events).await?;
|
total += self.writer.upsert_events(&events).await?;
|
||||||
@@ -130,8 +176,37 @@ impl EventSource for GiteaSource {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
self.state.touch(SOURCE_NAME).await?;
|
self.state.touch(state_key).await?;
|
||||||
debug!(ingested = total, "gitea poll complete");
|
Ok(total)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl EventSource for GiteaSource {
|
||||||
|
fn name(&self) -> &'static str {
|
||||||
|
SOURCE_NAME
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn poll(&self) -> Result<usize, SourceError> {
|
||||||
|
// Poll user's own activity feed (existing behavior).
|
||||||
|
let user_url = self.user_feed_base_url();
|
||||||
|
let mut total = self.poll_feed(SOURCE_NAME, &user_url, false).await?;
|
||||||
|
|
||||||
|
// Discover orgs and poll each org's activity feed, filtering for
|
||||||
|
// events performed by this user.
|
||||||
|
let orgs = self.discover_orgs().await?;
|
||||||
|
for org in &orgs {
|
||||||
|
let state_key = format!("gitea:org:{org}");
|
||||||
|
let org_url = self.org_feed_base_url(org);
|
||||||
|
match self.poll_feed(&state_key, &org_url, true).await {
|
||||||
|
Ok(n) => total += n,
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!(org = %org, error = %e, "failed to poll org feed");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
debug!(ingested = total, orgs = orgs.len(), "gitea poll complete");
|
||||||
Ok(total)
|
Ok(total)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -191,6 +266,37 @@ mod tests {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn org_event_user_filter_predicate() {
|
||||||
|
let by_user = json!({
|
||||||
|
"id": 500, "op_type": "commit_repo", "is_private": false,
|
||||||
|
"created": "2026-05-03T10:00:00Z",
|
||||||
|
"act_user": { "login": "grenade" },
|
||||||
|
"repo": { "full_name": "myorg/somerepo" }
|
||||||
|
});
|
||||||
|
let by_other = json!({
|
||||||
|
"id": 501, "op_type": "commit_repo", "is_private": false,
|
||||||
|
"created": "2026-05-03T10:01:00Z",
|
||||||
|
"act_user": { "login": "otherperson" },
|
||||||
|
"repo": { "full_name": "myorg/somerepo" }
|
||||||
|
});
|
||||||
|
// Both parse as valid events
|
||||||
|
assert!(parse_gitea_event(&by_user, "git.lair.cafe").is_some());
|
||||||
|
assert!(parse_gitea_event(&by_other, "git.lair.cafe").is_some());
|
||||||
|
// The user-filter predicate used by poll_feed
|
||||||
|
let is_user = |item: &Value, user: &str| -> bool {
|
||||||
|
item.get("act_user")
|
||||||
|
.and_then(|u| u.get("login"))
|
||||||
|
.and_then(Value::as_str)
|
||||||
|
.map(|login| login.eq_ignore_ascii_case(user))
|
||||||
|
.unwrap_or(false)
|
||||||
|
};
|
||||||
|
assert!(is_user(&by_user, "grenade"));
|
||||||
|
assert!(!is_user(&by_other, "grenade"));
|
||||||
|
// Case-insensitive match
|
||||||
|
assert!(is_user(&by_user, "Grenade"));
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn private_event_marked_private() {
|
fn private_event_marked_private() {
|
||||||
let raw = json!({
|
let raw = json!({
|
||||||
|
|||||||
@@ -1,14 +1,14 @@
|
|||||||
//! hg-edge.mozilla.org pushlog ingestion.
|
//! hg-edge.mozilla.org changeset ingestion via `json-log` revset queries.
|
||||||
//!
|
//!
|
||||||
//! mozilla's hg pushlog filter `user=` matches the *pusher*, not the
|
//! Uses the `json-log?rev=author(term)` endpoint which returns changesets
|
||||||
//! changeset author. As a community-level contributor whose code was
|
//! by the *author* (not the pusher), so it captures commits landed by
|
||||||
//! reviewed and pushed by sheriffs/reviewers, the user filter returns 0
|
//! sheriffs on behalf of the contributor.
|
||||||
//! results. So this source pulls full pushlogs from a configured set of
|
|
||||||
//! repos and filters changeset authors client-side by substring match.
|
|
||||||
//!
|
//!
|
||||||
//! The result set is historical (mozilla retired hg) — no new events
|
//! Repos are discovered within configured groups (e.g. `build`) via the
|
||||||
//! expected after the initial backfill. Daily polling keeps the upserts
|
//! `/{group}/?style=json` index, plus any individually listed repos
|
||||||
//! cheap and idempotent.
|
//! (e.g. `mozilla-central`). Once the first successful scan completes
|
||||||
|
//! (poller state is touched), all subsequent polls are skipped — the
|
||||||
|
//! data is historical and will not change.
|
||||||
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
@@ -26,13 +26,17 @@ const USER_AGENT: &str = concat!(
|
|||||||
env!("CARGO_PKG_VERSION"),
|
env!("CARGO_PKG_VERSION"),
|
||||||
" (+https://rob.tn)"
|
" (+https://rob.tn)"
|
||||||
);
|
);
|
||||||
|
/// Maximum changesets returned per json-log request.
|
||||||
|
const REV_COUNT: u32 = 500;
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct HgConfig {
|
pub struct HgConfig {
|
||||||
pub host: String,
|
pub host: String,
|
||||||
/// Case-insensitive substrings matched against changeset author fields.
|
/// Substrings matched via `author(term)` revset queries.
|
||||||
pub author_terms: Vec<String>,
|
pub author_terms: Vec<String>,
|
||||||
/// Repo paths under host, e.g. "build/puppet".
|
/// Repo groups to scan — each is enumerated via `/{group}/?style=json`.
|
||||||
|
pub groups: Vec<String>,
|
||||||
|
/// Individual repos to scan (e.g. `mozilla-central`).
|
||||||
pub repos: Vec<String>,
|
pub repos: Vec<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -40,12 +44,9 @@ impl Default for HgConfig {
|
|||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
Self {
|
Self {
|
||||||
host: "hg-edge.mozilla.org".into(),
|
host: "hg-edge.mozilla.org".into(),
|
||||||
author_terms: vec!["thijssen".into(), "grenade".into()],
|
author_terms: vec!["rthijssen".into(), "grenade".into()],
|
||||||
repos: vec![
|
groups: vec!["build".into(), "integration".into()],
|
||||||
"build/puppet".into(),
|
repos: vec!["mozilla-central".into()],
|
||||||
"build/tools".into(),
|
|
||||||
"build/buildbot-configs".into(),
|
|
||||||
],
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -72,23 +73,51 @@ impl HgSource {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn pushlog_url(&self, repo: &str) -> String {
|
/// Discover repos in a group via `/{group}/?style=json`.
|
||||||
|
async fn discover_repos(&self, group: &str) -> Result<Vec<String>, SourceError> {
|
||||||
|
let url = format!("https://{}/{}/?style=json", self.config.host, group);
|
||||||
|
let resp = self
|
||||||
|
.client
|
||||||
|
.get(&url)
|
||||||
|
.header(header::USER_AGENT, USER_AGENT)
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.map_err(|e| SourceError::Http(e.to_string()))?;
|
||||||
|
if !resp.status().is_success() {
|
||||||
|
warn!(group, status = %resp.status(), "failed to discover repos in group");
|
||||||
|
return Ok(vec![]);
|
||||||
|
}
|
||||||
|
let body: Value = resp
|
||||||
|
.json()
|
||||||
|
.await
|
||||||
|
.map_err(|e| SourceError::Parse(e.to_string()))?;
|
||||||
|
Ok(body
|
||||||
|
.get("entries")
|
||||||
|
.and_then(Value::as_array)
|
||||||
|
.map(|entries| {
|
||||||
|
entries
|
||||||
|
.iter()
|
||||||
|
.filter_map(|e| {
|
||||||
|
e.get("name")
|
||||||
|
.and_then(Value::as_str)
|
||||||
|
.map(|name| format!("{group}/{name}"))
|
||||||
|
})
|
||||||
|
.collect()
|
||||||
|
})
|
||||||
|
.unwrap_or_default())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn log_url(&self, repo: &str, author_term: &str) -> String {
|
||||||
format!(
|
format!(
|
||||||
"https://{}/{}/json-pushes?version=2&full=1",
|
"https://{}/{}/json-log?rev=author({})&style=json&revcount={}",
|
||||||
self.config.host, repo
|
self.config.host, repo, author_term, REV_COUNT
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn matches_author(&self, author: &str) -> bool {
|
|
||||||
let lower = author.to_lowercase();
|
|
||||||
self.config
|
|
||||||
.author_terms
|
|
||||||
.iter()
|
|
||||||
.any(|t| lower.contains(&t.to_lowercase()))
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn scan_repo(&self, repo: &str) -> Result<usize, SourceError> {
|
async fn scan_repo(&self, repo: &str) -> Result<usize, SourceError> {
|
||||||
let url = self.pushlog_url(repo);
|
let mut all_events = Vec::new();
|
||||||
|
for term in &self.config.author_terms {
|
||||||
|
let url = self.log_url(repo, term);
|
||||||
let resp = self
|
let resp = self
|
||||||
.client
|
.client
|
||||||
.get(&url)
|
.get(&url)
|
||||||
@@ -104,62 +133,38 @@ impl HgSource {
|
|||||||
.await
|
.await
|
||||||
.map_err(|e| SourceError::Parse(e.to_string()))?;
|
.map_err(|e| SourceError::Parse(e.to_string()))?;
|
||||||
|
|
||||||
let mut events = Vec::new();
|
if let Some(entries) = body.get("entries").and_then(Value::as_array) {
|
||||||
if let Some(pushes) = body.get("pushes").and_then(Value::as_object) {
|
for entry in entries {
|
||||||
for (pushid, push) in pushes {
|
let node = entry.get("node").and_then(Value::as_str).unwrap_or("");
|
||||||
let pushed_at_secs = push.get("date").and_then(Value::as_i64).unwrap_or(0);
|
|
||||||
let pushed_at = Utc
|
|
||||||
.timestamp_opt(pushed_at_secs, 0)
|
|
||||||
.single()
|
|
||||||
.unwrap_or_else(Utc::now);
|
|
||||||
let pusher = push
|
|
||||||
.get("user")
|
|
||||||
.and_then(Value::as_str)
|
|
||||||
.unwrap_or("")
|
|
||||||
.to_string();
|
|
||||||
if let Some(changesets) = push.get("changesets").and_then(Value::as_array) {
|
|
||||||
for cs in changesets {
|
|
||||||
let author = cs.get("author").and_then(Value::as_str).unwrap_or("");
|
|
||||||
if !self.matches_author(author) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
let node = cs.get("node").and_then(Value::as_str).unwrap_or("");
|
|
||||||
if node.is_empty() {
|
if node.is_empty() {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
let occurred_at = cs
|
let occurred_at = entry
|
||||||
.get("date")
|
.get("date")
|
||||||
.and_then(Value::as_array)
|
.and_then(Value::as_array)
|
||||||
.and_then(|a| parse_hg_date(a))
|
.and_then(|a| parse_hg_date(a))
|
||||||
.unwrap_or(pushed_at);
|
.unwrap_or_else(Utc::now);
|
||||||
|
|
||||||
let mut payload = cs.clone();
|
let mut payload = entry.clone();
|
||||||
if let Some(obj) = payload.as_object_mut() {
|
if let Some(obj) = payload.as_object_mut() {
|
||||||
obj.insert("_repo".into(), Value::String(repo.into()));
|
obj.insert("_repo".into(), Value::String(repo.into()));
|
||||||
obj.insert(
|
obj.insert(
|
||||||
"_host".into(),
|
"_host".into(),
|
||||||
Value::String(self.config.host.clone()),
|
Value::String(self.config.host.clone()),
|
||||||
);
|
);
|
||||||
obj.insert("_pusher".into(), Value::String(pusher.clone()));
|
|
||||||
obj.insert(
|
|
||||||
"_pushid".into(),
|
|
||||||
Value::String(pushid.clone()),
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
events.push(Event {
|
all_events.push(Event {
|
||||||
id: format!("hg:{repo}:{node}"),
|
id: format!("hg:{repo}:{node}"),
|
||||||
source: Source::Hg,
|
source: Source::Hg,
|
||||||
action: "Commit".into(),
|
action: "Commit".into(),
|
||||||
occurred_at,
|
occurred_at,
|
||||||
// mozilla hg-edge is exclusively public.
|
|
||||||
public: true,
|
public: true,
|
||||||
payload,
|
payload,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
Ok(self.writer.upsert_events(&all_events).await?)
|
||||||
Ok(self.writer.upsert_events(&events).await?)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -170,11 +175,26 @@ impl EventSource for HgSource {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn poll(&self) -> Result<usize, SourceError> {
|
async fn poll(&self) -> Result<usize, SourceError> {
|
||||||
|
// hg repos are archived — one complete scan is sufficient.
|
||||||
|
if self.state.load(SOURCE_NAME).await?.is_some() {
|
||||||
|
debug!("hg already backfilled, skipping");
|
||||||
|
return Ok(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut repos: Vec<String> = self.config.repos.clone();
|
||||||
|
for group in &self.config.groups {
|
||||||
|
let discovered = self.discover_repos(group).await?;
|
||||||
|
debug!(group, repos = discovered.len(), "discovered hg repos");
|
||||||
|
repos.extend(discovered);
|
||||||
|
}
|
||||||
|
|
||||||
let mut total = 0usize;
|
let mut total = 0usize;
|
||||||
for repo in &self.config.repos {
|
for repo in &repos {
|
||||||
match self.scan_repo(repo).await {
|
match self.scan_repo(repo).await {
|
||||||
Ok(n) => {
|
Ok(n) => {
|
||||||
|
if n > 0 {
|
||||||
debug!(repo, ingested = n, "hg repo scan complete");
|
debug!(repo, ingested = n, "hg repo scan complete");
|
||||||
|
}
|
||||||
total += n;
|
total += n;
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
@@ -183,11 +203,12 @@ impl EventSource for HgSource {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
self.state.touch(SOURCE_NAME).await?;
|
self.state.touch(SOURCE_NAME).await?;
|
||||||
|
debug!(ingested = total, "hg backfill complete");
|
||||||
Ok(total)
|
Ok(total)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Parse a pushlog date array `[seconds, tz_offset_secs]` into UTC.
|
/// Parse a hgweb date array `[seconds, tz_offset_secs]` into UTC.
|
||||||
fn parse_hg_date(arr: &[Value]) -> Option<DateTime<Utc>> {
|
fn parse_hg_date(arr: &[Value]) -> Option<DateTime<Utc>> {
|
||||||
let secs = arr.first()?.as_f64()? as i64;
|
let secs = arr.first()?.as_f64()? as i64;
|
||||||
Utc.timestamp_opt(secs, 0).single()
|
Utc.timestamp_opt(secs, 0).single()
|
||||||
@@ -197,23 +218,6 @@ fn parse_hg_date(arr: &[Value]) -> Option<DateTime<Utc>> {
|
|||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn matches_author_substring_case_insensitive() {
|
|
||||||
let s = HgSource {
|
|
||||||
client: Client::new(),
|
|
||||||
writer: Arc::new(NoopWriter),
|
|
||||||
state: Arc::new(NoopState),
|
|
||||||
config: HgConfig {
|
|
||||||
author_terms: vec!["thijssen".into(), "grenade".into()],
|
|
||||||
..Default::default()
|
|
||||||
},
|
|
||||||
};
|
|
||||||
assert!(s.matches_author("Rob Thijssen <rob@example.com>"));
|
|
||||||
assert!(s.matches_author("grenade@example"));
|
|
||||||
assert!(s.matches_author("THIJSSEN"));
|
|
||||||
assert!(!s.matches_author("Other Person <other@example>"));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn parse_hg_date_handles_seconds() {
|
fn parse_hg_date_handles_seconds() {
|
||||||
let arr = vec![Value::from(1_700_000_000_f64), Value::from(0_f64)];
|
let arr = vec![Value::from(1_700_000_000_f64), Value::from(0_f64)];
|
||||||
@@ -221,6 +225,19 @@ mod tests {
|
|||||||
assert_eq!(dt.timestamp(), 1_700_000_000);
|
assert_eq!(dt.timestamp(), 1_700_000_000);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn log_url_uses_revset_author_query() {
|
||||||
|
let src = HgSource {
|
||||||
|
client: Client::new(),
|
||||||
|
writer: Arc::new(NoopWriter),
|
||||||
|
state: Arc::new(NoopState),
|
||||||
|
config: HgConfig::default(),
|
||||||
|
};
|
||||||
|
let url = src.log_url("mozilla-central", "thijssen");
|
||||||
|
assert!(url.contains("json-log?rev=author(thijssen)"));
|
||||||
|
assert!(url.contains("revcount=500"));
|
||||||
|
}
|
||||||
|
|
||||||
// Tiny stub impls just so we can construct an HgSource for unit tests.
|
// Tiny stub impls just so we can construct an HgSource for unit tests.
|
||||||
struct NoopWriter;
|
struct NoopWriter;
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
|
|||||||
@@ -51,21 +51,31 @@ struct Args {
|
|||||||
#[arg(long, env = "HG_HOST", default_value = "hg-edge.mozilla.org")]
|
#[arg(long, env = "HG_HOST", default_value = "hg-edge.mozilla.org")]
|
||||||
hg_host: String,
|
hg_host: String,
|
||||||
|
|
||||||
/// Comma-separated mozilla hg repo paths to scan, e.g. "build/puppet,build/tools".
|
/// Comma-separated repo groups to scan. Repos within each group are
|
||||||
|
/// discovered via `/{group}/?style=json`.
|
||||||
|
#[arg(
|
||||||
|
long,
|
||||||
|
env = "HG_GROUPS",
|
||||||
|
value_delimiter = ',',
|
||||||
|
default_value = "build,integration"
|
||||||
|
)]
|
||||||
|
hg_groups: Vec<String>,
|
||||||
|
|
||||||
|
/// Comma-separated individual repos to scan (e.g. `mozilla-central`).
|
||||||
#[arg(
|
#[arg(
|
||||||
long,
|
long,
|
||||||
env = "HG_REPOS",
|
env = "HG_REPOS",
|
||||||
value_delimiter = ',',
|
value_delimiter = ',',
|
||||||
default_value = "build/puppet,build/tools,build/buildbot-configs"
|
default_value = "mozilla-central"
|
||||||
)]
|
)]
|
||||||
hg_repos: Vec<String>,
|
hg_repos: Vec<String>,
|
||||||
|
|
||||||
/// Comma-separated case-insensitive substrings matched against changeset author fields.
|
/// Comma-separated author substrings for `author()` revset queries.
|
||||||
#[arg(
|
#[arg(
|
||||||
long,
|
long,
|
||||||
env = "HG_AUTHOR_TERMS",
|
env = "HG_AUTHOR_TERMS",
|
||||||
value_delimiter = ',',
|
value_delimiter = ',',
|
||||||
default_value = "thijssen,grenade"
|
default_value = "rthijssen,grenade"
|
||||||
)]
|
)]
|
||||||
hg_author_terms: Vec<String>,
|
hg_author_terms: Vec<String>,
|
||||||
|
|
||||||
@@ -141,6 +151,7 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
HgConfig {
|
HgConfig {
|
||||||
host: args.hg_host.clone(),
|
host: args.hg_host.clone(),
|
||||||
author_terms: args.hg_author_terms.clone(),
|
author_terms: args.hg_author_terms.clone(),
|
||||||
|
groups: args.hg_groups.clone(),
|
||||||
repos: args.hg_repos.clone(),
|
repos: args.hg_repos.clone(),
|
||||||
},
|
},
|
||||||
)) as Arc<dyn EventSource>;
|
)) as Arc<dyn EventSource>;
|
||||||
@@ -162,7 +173,9 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
gitea_host = args.gitea_host,
|
gitea_host = args.gitea_host,
|
||||||
gitea_user = args.gitea_user,
|
gitea_user = args.gitea_user,
|
||||||
hg_host = args.hg_host,
|
hg_host = args.hg_host,
|
||||||
|
hg_groups = ?args.hg_groups,
|
||||||
hg_repos = ?args.hg_repos,
|
hg_repos = ?args.hg_repos,
|
||||||
|
hg_author_terms = ?args.hg_author_terms,
|
||||||
bugzilla_host = args.bugzilla_host,
|
bugzilla_host = args.bugzilla_host,
|
||||||
bugzilla_email = args.bugzilla_email,
|
bugzilla_email = args.bugzilla_email,
|
||||||
events_interval_secs = args.interval_secs,
|
events_interval_secs = args.interval_secs,
|
||||||
|
|||||||
140
script/hg-ingest.sh
Executable file
140
script/hg-ingest.sh
Executable file
@@ -0,0 +1,140 @@
|
|||||||
|
#!/usr/bin/env bash
|
||||||
|
#
|
||||||
|
# One-shot hg changeset ingestion via local clones.
|
||||||
|
#
|
||||||
|
# Bare-clones each hg repo, extracts changesets matching author terms,
|
||||||
|
# and inserts them into the moments database. Sets poller_state so the
|
||||||
|
# worker won't re-scan.
|
||||||
|
#
|
||||||
|
# Requirements: hg (mercurial), psql, jq
|
||||||
|
#
|
||||||
|
# Usage:
|
||||||
|
# DATABASE_URL="postgres://..." ./script/hg-ingest.sh
|
||||||
|
#
|
||||||
|
set -euo pipefail
|
||||||
|
|
||||||
|
DATABASE_URL="${DATABASE_URL:-postgres://moments_rw@magrathea.kosherinata.internal:5432/moments?sslmode=verify-full&sslrootcert=/etc/pki/ca-trust/source/anchors/root-internal.pem&sslcert=/etc/pki/tls/misc/$(hostname -f).pem&sslkey=/etc/pki/tls/private/$(hostname -f).pem}"
|
||||||
|
HG_HOST="${HG_HOST:-hg-edge.mozilla.org}"
|
||||||
|
WORK_DIR="${HG_WORK_DIR:-$HOME/hg}"
|
||||||
|
|
||||||
|
# Repos to clone (groups are expanded inline)
|
||||||
|
REPOS=(
|
||||||
|
integration/mozilla-inbound
|
||||||
|
integration/autoland
|
||||||
|
integration/fx-team
|
||||||
|
integration/b2g-inbound
|
||||||
|
build/puppet
|
||||||
|
build/tools
|
||||||
|
build/buildbot
|
||||||
|
build/buildbot-configs
|
||||||
|
build/slave_health
|
||||||
|
build/mozharness
|
||||||
|
build/braindump
|
||||||
|
build/cloud-tools
|
||||||
|
build/compare-locales
|
||||||
|
build/nagios-core
|
||||||
|
build/partner-repacks
|
||||||
|
build/preproduction
|
||||||
|
build/rpm-sources
|
||||||
|
build/talos
|
||||||
|
build/tupperware
|
||||||
|
build/ash-mozharness
|
||||||
|
build/autoland
|
||||||
|
build/opsi-package-sources
|
||||||
|
)
|
||||||
|
|
||||||
|
# Author terms — matched case-insensitively against changeset author fields
|
||||||
|
AUTHOR_TERMS=("rthijssen" "grenade")
|
||||||
|
|
||||||
|
: "${DATABASE_URL:?DATABASE_URL must be set}"
|
||||||
|
|
||||||
|
mkdir -p "$WORK_DIR"
|
||||||
|
|
||||||
|
total=0
|
||||||
|
|
||||||
|
CLONE_DIR="$WORK_DIR/clone"
|
||||||
|
CACHE_DIR="$WORK_DIR/cache"
|
||||||
|
mkdir -p "$CACHE_DIR"
|
||||||
|
cd "$WORK_DIR"
|
||||||
|
|
||||||
|
for repo in "${REPOS[@]}"; do
|
||||||
|
cache_file="$CACHE_DIR/$(echo "$repo" | tr '/' '_').tsv"
|
||||||
|
|
||||||
|
# Skip repos already cached (re-run safe)
|
||||||
|
if [ -f "$cache_file" ]; then
|
||||||
|
echo "[hg-ingest] $repo: using cached results"
|
||||||
|
else
|
||||||
|
# Remove any previous clone to keep only one on disk
|
||||||
|
rm -rf "$CLONE_DIR"
|
||||||
|
|
||||||
|
echo "[hg-ingest] cloning $repo"
|
||||||
|
if ! hg clone --noupdate "https://$HG_HOST/$repo" "$CLONE_DIR"; then
|
||||||
|
echo "[hg-ingest] clone failed: $repo (skipping)"
|
||||||
|
continue
|
||||||
|
fi
|
||||||
|
|
||||||
|
# Build revset: author(term1) or author(term2) ...
|
||||||
|
revset=""
|
||||||
|
for term in "${AUTHOR_TERMS[@]}"; do
|
||||||
|
if [ -z "$revset" ]; then
|
||||||
|
revset="author('$term')"
|
||||||
|
else
|
||||||
|
revset="$revset or author('$term')"
|
||||||
|
fi
|
||||||
|
done
|
||||||
|
|
||||||
|
# Extract matching changesets to cache file
|
||||||
|
hg log -R "$CLONE_DIR" -r "$revset" \
|
||||||
|
--template '{node}\t{author}\t{date|hgdate}\t{desc|firstline}\n' \
|
||||||
|
> "$cache_file" || true
|
||||||
|
|
||||||
|
# Free disk immediately
|
||||||
|
rm -rf "$CLONE_DIR"
|
||||||
|
fi
|
||||||
|
|
||||||
|
# Ingest cached results into the database
|
||||||
|
count=0
|
||||||
|
while IFS=$'\t' read -r node author date_raw desc; do
|
||||||
|
[ -z "$node" ] && continue
|
||||||
|
|
||||||
|
# {date|hgdate} outputs "timestamp offset" — take just the timestamp
|
||||||
|
date_ts="${date_raw%% *}"
|
||||||
|
|
||||||
|
# Build ISO timestamp from unix epoch
|
||||||
|
occurred_at=$(date -u -d "@${date_ts}" '+%Y-%m-%dT%H:%M:%SZ')
|
||||||
|
|
||||||
|
event_id="hg:${repo}:${node}"
|
||||||
|
|
||||||
|
# Build payload JSON (jq handles all escaping)
|
||||||
|
payload=$(jq -n \
|
||||||
|
--arg node "$node" \
|
||||||
|
--arg user "$author" \
|
||||||
|
--arg desc "$desc" \
|
||||||
|
--arg repo "$repo" \
|
||||||
|
--arg host "$HG_HOST" \
|
||||||
|
'{node: $node, user: $user, desc: $desc, _repo: $repo, _host: $host}')
|
||||||
|
|
||||||
|
# Upsert into events table
|
||||||
|
psql "$DATABASE_URL" -q -c "
|
||||||
|
INSERT INTO events (id, source, action, occurred_at, public, payload)
|
||||||
|
VALUES (\$\$${event_id}\$\$, 'hg', 'Commit', '${occurred_at}', true, \$\$${payload}\$\$::jsonb)
|
||||||
|
ON CONFLICT (id) DO NOTHING;
|
||||||
|
"
|
||||||
|
|
||||||
|
count=$((count + 1))
|
||||||
|
done < "$cache_file"
|
||||||
|
|
||||||
|
if [ "$count" -gt 0 ]; then
|
||||||
|
echo "[hg-ingest] $repo: $count changesets ingested"
|
||||||
|
fi
|
||||||
|
total=$((total + count))
|
||||||
|
done
|
||||||
|
|
||||||
|
# Mark poller state so the worker skips hg
|
||||||
|
psql "$DATABASE_URL" -q -c "
|
||||||
|
INSERT INTO poller_state (source, last_fetched)
|
||||||
|
VALUES ('hg', now())
|
||||||
|
ON CONFLICT (source) DO UPDATE SET last_fetched = now();
|
||||||
|
"
|
||||||
|
|
||||||
|
echo "[hg-ingest] done. total: $total changesets"
|
||||||
@@ -34,6 +34,21 @@ a.hot-pink {
|
|||||||
color: #2c3e50;
|
color: #2c3e50;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
.vertical-timeline-element-content h4.vertical-timeline-element-title {
|
||||||
|
font-size: 0.85rem;
|
||||||
|
}
|
||||||
|
|
||||||
|
.vertical-timeline-element-content h5.vertical-timeline-element-subtitle {
|
||||||
|
font-size: 0.75rem;
|
||||||
|
}
|
||||||
|
|
||||||
|
.vertical-timeline-element-content p,
|
||||||
|
.vertical-timeline-element-content ul,
|
||||||
|
.vertical-timeline-element-content li,
|
||||||
|
.vertical-timeline-element-content code {
|
||||||
|
font-size: 0.75rem;
|
||||||
|
}
|
||||||
|
|
||||||
.vertical-timeline-element-content a {
|
.vertical-timeline-element-content a {
|
||||||
color: #1565c0;
|
color: #1565c0;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user