feat(worker): capture commits on non-default branches and forks
The ingestion paths each had a gap that let non-default-branch work slip through: /search/commits silently excludes forks, the per-repo REST commit scan only walked the default branch, and the user events feed ages out after 90 days. Catch them by enumerating branches per repo and scanning each (with per-branch state cursors so a brand-new branch isn't cut off by the default branch's cursor), pre-filtering branches via a GraphQL HEAD-author check so big upstream forks like azure-docs don't trigger hundreds of wasted REST calls, treating GitHub's HTTP 500 on author-filtered empty branches as "no commits" rather than a server error, and adding fork:true to the search query. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -1307,6 +1307,7 @@ dependencies = [
|
|||||||
"chrono",
|
"chrono",
|
||||||
"moments-core",
|
"moments-core",
|
||||||
"moments-entities",
|
"moments-entities",
|
||||||
|
"percent-encoding",
|
||||||
"reqwest",
|
"reqwest",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
|
|||||||
@@ -17,3 +17,4 @@ tracing.workspace = true
|
|||||||
async-trait.workspace = true
|
async-trait.workspace = true
|
||||||
reqwest.workspace = true
|
reqwest.workspace = true
|
||||||
serde.workspace = true
|
serde.workspace = true
|
||||||
|
percent-encoding = "2"
|
||||||
|
|||||||
@@ -7,12 +7,17 @@
|
|||||||
//! to, opened issues/PRs on, or reviewed, even without collaborator
|
//! to, opened issues/PRs on, or reviewed, even without collaborator
|
||||||
//! status. No result cap (cursor-paginated).
|
//! status. No result cap (cursor-paginated).
|
||||||
//!
|
//!
|
||||||
//! Then walks each repo's commit history via
|
//! Then walks each branch's commit history via
|
||||||
//! `/repos/{owner}/{repo}/commits?author={user}` with a `since` cursor
|
//! `/repos/{owner}/{repo}/commits?author={user}&sha={branch}` with a
|
||||||
//! to avoid re-fetching known commits.
|
//! per-branch `since` cursor to avoid re-fetching known commits. Walking
|
||||||
|
//! every branch (not just the default) is what catches work-in-progress
|
||||||
|
//! on feature branches and pushes to fork branches that never get merged
|
||||||
|
//! upstream — neither the user events feed nor /search/commits surface
|
||||||
|
//! those reliably.
|
||||||
//!
|
//!
|
||||||
//! Events use `github-commit:{sha}` as their ID, matching the scheme in
|
//! Events use `github-commit:{sha}` as their ID, matching the scheme in
|
||||||
//! `github_search`, so duplicates are resolved via idempotent upsert.
|
//! `github_search`, so duplicates are resolved via idempotent upsert
|
||||||
|
//! (the same commit reached via two branches just upserts twice).
|
||||||
|
|
||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
@@ -21,10 +26,30 @@ use async_trait::async_trait;
|
|||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
use moments_core::{EventSource, EventWriter, PollerStateStore, SourceError};
|
use moments_core::{EventSource, EventWriter, PollerStateStore, SourceError};
|
||||||
use moments_entities::{Event, RepoLanguage, Source};
|
use moments_entities::{Event, RepoLanguage, Source};
|
||||||
|
use percent_encoding::{AsciiSet, CONTROLS, utf8_percent_encode};
|
||||||
use reqwest::{Client, header};
|
use reqwest::{Client, header};
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
use tracing::{debug, warn};
|
use tracing::{debug, warn};
|
||||||
|
|
||||||
|
/// Encode characters that have meaning in a URL query — branch names can
|
||||||
|
/// contain `/`, `#`, `?`, etc. Whitelisting is too fragile; encode anything
|
||||||
|
/// outside the unreserved set plus a few safe characters.
|
||||||
|
const BRANCH_ENCODE_SET: &AsciiSet = &CONTROLS
|
||||||
|
.add(b' ')
|
||||||
|
.add(b'"')
|
||||||
|
.add(b'#')
|
||||||
|
.add(b'<')
|
||||||
|
.add(b'>')
|
||||||
|
.add(b'?')
|
||||||
|
.add(b'`')
|
||||||
|
.add(b'{')
|
||||||
|
.add(b'}')
|
||||||
|
.add(b'/')
|
||||||
|
.add(b'&')
|
||||||
|
.add(b'=')
|
||||||
|
.add(b'+')
|
||||||
|
.add(b'%');
|
||||||
|
|
||||||
const SOURCE_NAME: &str = "github-repo";
|
const SOURCE_NAME: &str = "github-repo";
|
||||||
const USER_AGENT: &str = concat!(
|
const USER_AGENT: &str = concat!(
|
||||||
"moments/",
|
"moments/",
|
||||||
@@ -227,19 +252,217 @@ impl GithubRepoSource {
|
|||||||
Ok(repos)
|
Ok(repos)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Fetch commits for a single repo, paginating fully on first run
|
/// Branch discovery via GraphQL, filtered to branches whose HEAD
|
||||||
/// and using `since` on subsequent runs to catch everything new.
|
/// commit was authored by the user. Skips the long tail of
|
||||||
|
/// upstream-contributor branches in large forks (e.g. azure-docs).
|
||||||
|
///
|
||||||
|
/// Why HEAD author and not `history(author:).totalCount`: the latter
|
||||||
|
/// forces GraphQL to walk full commit history per branch looking for
|
||||||
|
/// matches, which times out (502) on forks with thousands of branches.
|
||||||
|
/// Checking the HEAD commit's author is O(1) per branch. The blind
|
||||||
|
/// spot — branches with the user's older commits but a different
|
||||||
|
/// HEAD author — is rare in practice for forks/feature branches.
|
||||||
|
///
|
||||||
|
/// On any GraphQL failure, callers should fall back to `list_branches`
|
||||||
|
/// (REST, walks everything; 500s from empty branches are silenced
|
||||||
|
/// inside `scan_repo_branch`).
|
||||||
|
async fn list_branches_with_commits(
|
||||||
|
&self,
|
||||||
|
repo: &Repo,
|
||||||
|
user_login: &str,
|
||||||
|
) -> Result<Vec<String>, SourceError> {
|
||||||
|
let token = match &self.config.token {
|
||||||
|
Some(t) => t,
|
||||||
|
None => return Err(SourceError::Http("no token; graphql unavailable".into())),
|
||||||
|
};
|
||||||
|
let parts: Vec<&str> = repo.full_name.splitn(2, '/').collect();
|
||||||
|
if parts.len() != 2 {
|
||||||
|
return Ok(Vec::new());
|
||||||
|
}
|
||||||
|
let (owner, name) = (parts[0], parts[1]);
|
||||||
|
|
||||||
|
let mut branches = Vec::new();
|
||||||
|
let mut cursor: Option<String> = None;
|
||||||
|
// Cap pages to bound cost on pathological repos. 50 pages × 100
|
||||||
|
// branches = 5000; well past anything plausible for a human user.
|
||||||
|
for _ in 0..50u32 {
|
||||||
|
let after = match &cursor {
|
||||||
|
Some(c) => format!(", after: \"{}\"", c),
|
||||||
|
None => String::new(),
|
||||||
|
};
|
||||||
|
// `author.user.login` resolves the commit's GitHub user (may
|
||||||
|
// differ from the raw commit author name); falling back to
|
||||||
|
// `author.email` is intentionally omitted to keep the query
|
||||||
|
// shape minimal — false negatives there are caught by the
|
||||||
|
// REST fallback on the next poll cycle.
|
||||||
|
let query = format!(
|
||||||
|
r#"{{ repository(owner: "{owner}", name: "{name}") {{ refs(refPrefix: "refs/heads/", first: 100{after}) {{ pageInfo {{ hasNextPage endCursor }} nodes {{ name target {{ ... on Commit {{ author {{ user {{ login }} }} }} }} }} }} }} }}"#,
|
||||||
|
);
|
||||||
|
let body = serde_json::json!({ "query": query });
|
||||||
|
let resp = self
|
||||||
|
.client
|
||||||
|
.post("https://api.github.com/graphql")
|
||||||
|
.header(header::AUTHORIZATION, format!("Bearer {token}"))
|
||||||
|
.header(header::USER_AGENT, USER_AGENT)
|
||||||
|
.header(header::CONTENT_TYPE, "application/json")
|
||||||
|
.json(&body)
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.map_err(|e| SourceError::Http(e.to_string()))?;
|
||||||
|
if !resp.status().is_success() {
|
||||||
|
return Err(SourceError::Http(format!(
|
||||||
|
"{} POST graphql (branches {}/{})",
|
||||||
|
resp.status(),
|
||||||
|
owner,
|
||||||
|
name
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
let data: Value = resp
|
||||||
|
.json()
|
||||||
|
.await
|
||||||
|
.map_err(|e| SourceError::Parse(e.to_string()))?;
|
||||||
|
if let Some(errors) = data.get("errors").and_then(Value::as_array) {
|
||||||
|
if let Some(msg) = errors.first().and_then(|e| e.get("message")).and_then(Value::as_str) {
|
||||||
|
return Err(SourceError::Http(format!("GraphQL error listing branches: {msg}")));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let refs = &data["data"]["repository"]["refs"];
|
||||||
|
if refs.is_null() {
|
||||||
|
// Repo may be deleted or inaccessible — treat as empty.
|
||||||
|
return Ok(Vec::new());
|
||||||
|
}
|
||||||
|
if let Some(nodes) = refs["nodes"].as_array() {
|
||||||
|
for node in nodes {
|
||||||
|
let branch = node["name"].as_str();
|
||||||
|
let head_login = node["target"]["author"]["user"]["login"].as_str();
|
||||||
|
if let (Some(b), Some(login)) = (branch, head_login) {
|
||||||
|
if login.eq_ignore_ascii_case(user_login) {
|
||||||
|
branches.push(b.to_string());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let has_next = refs["pageInfo"]["hasNextPage"].as_bool().unwrap_or(false);
|
||||||
|
if !has_next {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
cursor = refs["pageInfo"]["endCursor"].as_str().map(String::from);
|
||||||
|
}
|
||||||
|
Ok(branches)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// List every branch in a repo. Returns an empty vec for empty (409)
|
||||||
|
/// or missing (404) repos; surfaces rate-limit / transport errors so the
|
||||||
|
/// caller can decide whether to bail.
|
||||||
|
async fn list_branches(&self, repo: &Repo) -> Result<Vec<String>, SourceError> {
|
||||||
|
let mut branches = Vec::new();
|
||||||
|
for page in 1..=10u32 {
|
||||||
|
let url = format!(
|
||||||
|
"https://api.github.com/repos/{}/branches?per_page={}&page={}",
|
||||||
|
repo.full_name, self.config.per_page, page
|
||||||
|
);
|
||||||
|
let req = self.apply_headers(self.client.get(&url));
|
||||||
|
let resp = req
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.map_err(|e| SourceError::Http(e.to_string()))?;
|
||||||
|
|
||||||
|
let status = resp.status();
|
||||||
|
if status.as_u16() == 404 || status.as_u16() == 409 {
|
||||||
|
return Ok(Vec::new());
|
||||||
|
}
|
||||||
|
if status.as_u16() == 403 || status.as_u16() == 429 {
|
||||||
|
return Err(SourceError::Http(format!("{} GET {}", status, url)));
|
||||||
|
}
|
||||||
|
if !status.is_success() {
|
||||||
|
return Err(SourceError::Http(format!("{} GET {}", status, url)));
|
||||||
|
}
|
||||||
|
|
||||||
|
let items: Vec<Value> = resp
|
||||||
|
.json()
|
||||||
|
.await
|
||||||
|
.map_err(|e| SourceError::Parse(e.to_string()))?;
|
||||||
|
if items.is_empty() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
for item in &items {
|
||||||
|
if let Some(name) = item.get("name").and_then(Value::as_str) {
|
||||||
|
branches.push(name.to_string());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if items.len() < self.config.per_page as usize {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(branches)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Fetch commits for a single repo across all branches the user has
|
||||||
|
/// touched. Per-branch state keys (`github-repo:{full_name}@{branch}`)
|
||||||
|
/// hold the newest seen commit timestamp so each branch can be
|
||||||
|
/// incremented independently — important because a brand new branch's
|
||||||
|
/// `since` cursor must start unset even when the default branch has
|
||||||
|
/// been polled many times already.
|
||||||
|
///
|
||||||
|
/// When `user_id` is supplied, branches are pre-filtered via GraphQL
|
||||||
|
/// to those with at least one commit by the user — vastly cheaper for
|
||||||
|
/// large upstream forks where most branches were never touched. On
|
||||||
|
/// GraphQL failure (or no token), falls back to the REST branch list
|
||||||
|
/// and relies on the per-branch 500-as-empty handling to discard the
|
||||||
|
/// noise.
|
||||||
async fn scan_repo(&self, repo: &Repo) -> Result<usize, SourceError> {
|
async fn scan_repo(&self, repo: &Repo) -> Result<usize, SourceError> {
|
||||||
let state_key = format!("github-repo:{}", repo.full_name);
|
let branches = if self.config.token.is_some() {
|
||||||
|
match self.list_branches_with_commits(repo, &self.config.user).await {
|
||||||
|
Ok(b) => b,
|
||||||
|
Err(e) => {
|
||||||
|
warn!(repo = %repo.full_name, error = %e, "graphql branch filter failed; falling back to REST");
|
||||||
|
self.list_branches(repo).await?
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
self.list_branches(repo).await?
|
||||||
|
};
|
||||||
|
if branches.is_empty() {
|
||||||
|
return Ok(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut total = 0usize;
|
||||||
|
// Dedup commits seen via multiple branches in one tick. Without this
|
||||||
|
// the same SHA appears in the upsert batch twice (postgres rejects
|
||||||
|
// duplicate conflict targets in a single INSERT).
|
||||||
|
let mut seen_in_tick: HashSet<String> = HashSet::new();
|
||||||
|
for branch in &branches {
|
||||||
|
match self.scan_repo_branch(repo, branch, &mut seen_in_tick).await {
|
||||||
|
Ok(n) => total += n,
|
||||||
|
Err(SourceError::Http(ref msg)) if msg.starts_with("403") || msg.starts_with("429") => {
|
||||||
|
return Err(SourceError::Http(msg.clone()));
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
warn!(repo = %repo.full_name, branch = %branch, error = %e, "branch scan failed; continuing");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(total)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn scan_repo_branch(
|
||||||
|
&self,
|
||||||
|
repo: &Repo,
|
||||||
|
branch: &str,
|
||||||
|
seen_in_tick: &mut HashSet<String>,
|
||||||
|
) -> Result<usize, SourceError> {
|
||||||
|
let state_key = format!("github-repo:{}@{}", repo.full_name, branch);
|
||||||
let prior = self.state.load(&state_key).await?;
|
let prior = self.state.load(&state_key).await?;
|
||||||
let since = prior.as_ref().and_then(|s| s.last_modified);
|
let since = prior.as_ref().and_then(|s| s.last_modified);
|
||||||
|
|
||||||
|
let encoded_branch = utf8_percent_encode(branch, BRANCH_ENCODE_SET).to_string();
|
||||||
|
|
||||||
let mut total = 0usize;
|
let mut total = 0usize;
|
||||||
let mut newest: Option<DateTime<Utc>> = since;
|
let mut newest: Option<DateTime<Utc>> = since;
|
||||||
for page in 1..=MAX_BACKFILL_PAGES {
|
for page in 1..=MAX_BACKFILL_PAGES {
|
||||||
let mut url = format!(
|
let mut url = format!(
|
||||||
"https://api.github.com/repos/{}/commits?author={}&per_page={}&page={}",
|
"https://api.github.com/repos/{}/commits?author={}&sha={}&per_page={}&page={}",
|
||||||
repo.full_name, self.config.user, self.config.per_page, page
|
repo.full_name, self.config.user, encoded_branch, self.config.per_page, page
|
||||||
);
|
);
|
||||||
if let Some(since_dt) = since {
|
if let Some(since_dt) = since {
|
||||||
url.push_str(&format!("&since={}", since_dt.to_rfc3339()));
|
url.push_str(&format!("&since={}", since_dt.to_rfc3339()));
|
||||||
@@ -256,11 +479,20 @@ impl GithubRepoSource {
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if status.as_u16() == 403 || status.as_u16() == 429 {
|
if status.as_u16() == 403 || status.as_u16() == 429 {
|
||||||
warn!(repo = %repo.full_name, status = %status, "rate limited; stopping early");
|
warn!(repo = %repo.full_name, branch = %branch, status = %status, "rate limited; stopping early");
|
||||||
return Err(SourceError::Http(format!("{} GET {}", status, url)));
|
return Err(SourceError::Http(format!("{} GET {}", status, url)));
|
||||||
}
|
}
|
||||||
if status.as_u16() == 404 {
|
if status.as_u16() == 404 {
|
||||||
warn!(repo = %repo.full_name, "repo not found; skipping");
|
warn!(repo = %repo.full_name, branch = %branch, "repo or branch not found; skipping");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
// GitHub's `/repos/.../commits?author=X&sha=branch` returns 500
|
||||||
|
// (not an empty array) when the user has zero commits on the
|
||||||
|
// specified branch. Treat it as "no commits on this branch"
|
||||||
|
// rather than a server error — surfacing it as a warning floods
|
||||||
|
// logs on forks whose branches were all authored by upstream.
|
||||||
|
if status.as_u16() == 500 {
|
||||||
|
debug!(repo = %repo.full_name, branch = %branch, "no commits by author on branch (500)");
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if !status.is_success() {
|
if !status.is_success() {
|
||||||
@@ -275,16 +507,32 @@ impl GithubRepoSource {
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
let events: Vec<Event> = items
|
let mut events = Vec::with_capacity(items.len());
|
||||||
.iter()
|
for item in &items {
|
||||||
.filter_map(|item| parse_commit(item, repo))
|
if let Some(ev) = parse_commit(item, repo) {
|
||||||
.collect();
|
if seen_in_tick.insert(ev.id.clone()) {
|
||||||
for ev in &events {
|
if let Some(n) = newest {
|
||||||
newest = Some(match newest {
|
if ev.occurred_at > n {
|
||||||
Some(n) if ev.occurred_at > n => ev.occurred_at,
|
newest = Some(ev.occurred_at);
|
||||||
Some(n) => n,
|
}
|
||||||
None => ev.occurred_at,
|
} else {
|
||||||
});
|
newest = Some(ev.occurred_at);
|
||||||
|
}
|
||||||
|
events.push(ev);
|
||||||
|
} else {
|
||||||
|
// Already ingested via another branch this tick;
|
||||||
|
// still advance `newest` so the per-branch cursor
|
||||||
|
// doesn't get stuck behind shared history.
|
||||||
|
let occurred = parse_commit_date(item);
|
||||||
|
if let Some(t) = occurred {
|
||||||
|
newest = Some(match newest {
|
||||||
|
Some(n) if t > n => t,
|
||||||
|
Some(n) => n,
|
||||||
|
None => t,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
total += self.writer.upsert_events(&events).await?;
|
total += self.writer.upsert_events(&events).await?;
|
||||||
|
|
||||||
@@ -451,8 +699,7 @@ fn parse_repo(item: &Value) -> Option<Repo> {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn parse_commit(item: &Value, repo: &Repo) -> Option<Event> {
|
fn parse_commit_date(item: &Value) -> Option<DateTime<Utc>> {
|
||||||
let sha = item.get("sha").and_then(Value::as_str)?;
|
|
||||||
let date_str = item
|
let date_str = item
|
||||||
.get("commit")
|
.get("commit")
|
||||||
.and_then(|c| c.get("author"))
|
.and_then(|c| c.get("author"))
|
||||||
@@ -464,9 +711,16 @@ fn parse_commit(item: &Value, repo: &Repo) -> Option<Event> {
|
|||||||
.and_then(|c| c.get("date"))
|
.and_then(|c| c.get("date"))
|
||||||
.and_then(Value::as_str)
|
.and_then(Value::as_str)
|
||||||
})?;
|
})?;
|
||||||
let occurred_at = DateTime::parse_from_rfc3339(date_str)
|
Some(
|
||||||
.ok()?
|
DateTime::parse_from_rfc3339(date_str)
|
||||||
.with_timezone(&Utc);
|
.ok()?
|
||||||
|
.with_timezone(&Utc),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn parse_commit(item: &Value, repo: &Repo) -> Option<Event> {
|
||||||
|
let sha = item.get("sha").and_then(Value::as_str)?;
|
||||||
|
let occurred_at = parse_commit_date(item)?;
|
||||||
|
|
||||||
let mut payload = item.clone();
|
let mut payload = item.clone();
|
||||||
if let Some(obj) = payload.as_object_mut() {
|
if let Some(obj) = payload.as_object_mut() {
|
||||||
|
|||||||
@@ -113,8 +113,11 @@ impl GithubSearchSource {
|
|||||||
) -> Result<usize, SourceError> {
|
) -> Result<usize, SourceError> {
|
||||||
let mut total = 0usize;
|
let mut total = 0usize;
|
||||||
for page in 1..=self.config.max_pages {
|
for page in 1..=self.config.max_pages {
|
||||||
|
// `fork:true` opts forks into the search — by default GitHub's
|
||||||
|
// search API excludes them entirely, which means commits on a
|
||||||
|
// user's fork (regardless of branch) never surface here.
|
||||||
let url = format!(
|
let url = format!(
|
||||||
"https://api.github.com/search/commits?q=author:{}&sort=author-date&order=desc&per_page={}&page={}",
|
"https://api.github.com/search/commits?q=author:{}+fork:true&sort=author-date&order=desc&per_page={}&page={}",
|
||||||
self.config.user, self.config.per_page, page
|
self.config.user, self.config.per_page, page
|
||||||
);
|
);
|
||||||
let req = self.apply_headers(self.client.get(&url));
|
let req = self.apply_headers(self.client.get(&url));
|
||||||
|
|||||||
Reference in New Issue
Block a user