diff --git a/Cargo.lock b/Cargo.lock index 9f7a1e8..12e4f79 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1307,6 +1307,7 @@ dependencies = [ "chrono", "moments-core", "moments-entities", + "percent-encoding", "reqwest", "serde", "serde_json", diff --git a/crates/moments-data/Cargo.toml b/crates/moments-data/Cargo.toml index 7505022..1578b90 100644 --- a/crates/moments-data/Cargo.toml +++ b/crates/moments-data/Cargo.toml @@ -17,3 +17,4 @@ tracing.workspace = true async-trait.workspace = true reqwest.workspace = true serde.workspace = true +percent-encoding = "2" diff --git a/crates/moments-data/src/github_repo.rs b/crates/moments-data/src/github_repo.rs index dffa2e0..02511c1 100644 --- a/crates/moments-data/src/github_repo.rs +++ b/crates/moments-data/src/github_repo.rs @@ -7,12 +7,17 @@ //! to, opened issues/PRs on, or reviewed, even without collaborator //! status. No result cap (cursor-paginated). //! -//! Then walks each repo's commit history via -//! `/repos/{owner}/{repo}/commits?author={user}` with a `since` cursor -//! to avoid re-fetching known commits. +//! Then walks each branch's commit history via +//! `/repos/{owner}/{repo}/commits?author={user}&sha={branch}` with a +//! per-branch `since` cursor to avoid re-fetching known commits. Walking +//! every branch (not just the default) is what catches work-in-progress +//! on feature branches and pushes to fork branches that never get merged +//! upstream — neither the user events feed nor /search/commits surface +//! those reliably. //! //! Events use `github-commit:{sha}` as their ID, matching the scheme in -//! `github_search`, so duplicates are resolved via idempotent upsert. +//! `github_search`, so duplicates are resolved via idempotent upsert +//! (the same commit reached via two branches just upserts twice). use std::collections::HashSet; use std::sync::Arc; @@ -21,10 +26,30 @@ use async_trait::async_trait; use chrono::{DateTime, Utc}; use moments_core::{EventSource, EventWriter, PollerStateStore, SourceError}; use moments_entities::{Event, RepoLanguage, Source}; +use percent_encoding::{AsciiSet, CONTROLS, utf8_percent_encode}; use reqwest::{Client, header}; use serde_json::Value; use tracing::{debug, warn}; +/// Encode characters that have meaning in a URL query — branch names can +/// contain `/`, `#`, `?`, etc. Whitelisting is too fragile; encode anything +/// outside the unreserved set plus a few safe characters. +const BRANCH_ENCODE_SET: &AsciiSet = &CONTROLS + .add(b' ') + .add(b'"') + .add(b'#') + .add(b'<') + .add(b'>') + .add(b'?') + .add(b'`') + .add(b'{') + .add(b'}') + .add(b'/') + .add(b'&') + .add(b'=') + .add(b'+') + .add(b'%'); + const SOURCE_NAME: &str = "github-repo"; const USER_AGENT: &str = concat!( "moments/", @@ -227,19 +252,217 @@ impl GithubRepoSource { Ok(repos) } - /// Fetch commits for a single repo, paginating fully on first run - /// and using `since` on subsequent runs to catch everything new. + /// Branch discovery via GraphQL, filtered to branches whose HEAD + /// commit was authored by the user. Skips the long tail of + /// upstream-contributor branches in large forks (e.g. azure-docs). + /// + /// Why HEAD author and not `history(author:).totalCount`: the latter + /// forces GraphQL to walk full commit history per branch looking for + /// matches, which times out (502) on forks with thousands of branches. + /// Checking the HEAD commit's author is O(1) per branch. The blind + /// spot — branches with the user's older commits but a different + /// HEAD author — is rare in practice for forks/feature branches. + /// + /// On any GraphQL failure, callers should fall back to `list_branches` + /// (REST, walks everything; 500s from empty branches are silenced + /// inside `scan_repo_branch`). + async fn list_branches_with_commits( + &self, + repo: &Repo, + user_login: &str, + ) -> Result, SourceError> { + let token = match &self.config.token { + Some(t) => t, + None => return Err(SourceError::Http("no token; graphql unavailable".into())), + }; + let parts: Vec<&str> = repo.full_name.splitn(2, '/').collect(); + if parts.len() != 2 { + return Ok(Vec::new()); + } + let (owner, name) = (parts[0], parts[1]); + + let mut branches = Vec::new(); + let mut cursor: Option = None; + // Cap pages to bound cost on pathological repos. 50 pages × 100 + // branches = 5000; well past anything plausible for a human user. + for _ in 0..50u32 { + let after = match &cursor { + Some(c) => format!(", after: \"{}\"", c), + None => String::new(), + }; + // `author.user.login` resolves the commit's GitHub user (may + // differ from the raw commit author name); falling back to + // `author.email` is intentionally omitted to keep the query + // shape minimal — false negatives there are caught by the + // REST fallback on the next poll cycle. + let query = format!( + r#"{{ repository(owner: "{owner}", name: "{name}") {{ refs(refPrefix: "refs/heads/", first: 100{after}) {{ pageInfo {{ hasNextPage endCursor }} nodes {{ name target {{ ... on Commit {{ author {{ user {{ login }} }} }} }} }} }} }} }}"#, + ); + let body = serde_json::json!({ "query": query }); + let resp = self + .client + .post("https://api.github.com/graphql") + .header(header::AUTHORIZATION, format!("Bearer {token}")) + .header(header::USER_AGENT, USER_AGENT) + .header(header::CONTENT_TYPE, "application/json") + .json(&body) + .send() + .await + .map_err(|e| SourceError::Http(e.to_string()))?; + if !resp.status().is_success() { + return Err(SourceError::Http(format!( + "{} POST graphql (branches {}/{})", + resp.status(), + owner, + name + ))); + } + let data: Value = resp + .json() + .await + .map_err(|e| SourceError::Parse(e.to_string()))?; + if let Some(errors) = data.get("errors").and_then(Value::as_array) { + if let Some(msg) = errors.first().and_then(|e| e.get("message")).and_then(Value::as_str) { + return Err(SourceError::Http(format!("GraphQL error listing branches: {msg}"))); + } + } + let refs = &data["data"]["repository"]["refs"]; + if refs.is_null() { + // Repo may be deleted or inaccessible — treat as empty. + return Ok(Vec::new()); + } + if let Some(nodes) = refs["nodes"].as_array() { + for node in nodes { + let branch = node["name"].as_str(); + let head_login = node["target"]["author"]["user"]["login"].as_str(); + if let (Some(b), Some(login)) = (branch, head_login) { + if login.eq_ignore_ascii_case(user_login) { + branches.push(b.to_string()); + } + } + } + } + let has_next = refs["pageInfo"]["hasNextPage"].as_bool().unwrap_or(false); + if !has_next { + break; + } + cursor = refs["pageInfo"]["endCursor"].as_str().map(String::from); + } + Ok(branches) + } + + /// List every branch in a repo. Returns an empty vec for empty (409) + /// or missing (404) repos; surfaces rate-limit / transport errors so the + /// caller can decide whether to bail. + async fn list_branches(&self, repo: &Repo) -> Result, SourceError> { + let mut branches = Vec::new(); + for page in 1..=10u32 { + let url = format!( + "https://api.github.com/repos/{}/branches?per_page={}&page={}", + repo.full_name, self.config.per_page, page + ); + let req = self.apply_headers(self.client.get(&url)); + let resp = req + .send() + .await + .map_err(|e| SourceError::Http(e.to_string()))?; + + let status = resp.status(); + if status.as_u16() == 404 || status.as_u16() == 409 { + return Ok(Vec::new()); + } + if status.as_u16() == 403 || status.as_u16() == 429 { + return Err(SourceError::Http(format!("{} GET {}", status, url))); + } + if !status.is_success() { + return Err(SourceError::Http(format!("{} GET {}", status, url))); + } + + let items: Vec = resp + .json() + .await + .map_err(|e| SourceError::Parse(e.to_string()))?; + if items.is_empty() { + break; + } + for item in &items { + if let Some(name) = item.get("name").and_then(Value::as_str) { + branches.push(name.to_string()); + } + } + if items.len() < self.config.per_page as usize { + break; + } + } + Ok(branches) + } + + /// Fetch commits for a single repo across all branches the user has + /// touched. Per-branch state keys (`github-repo:{full_name}@{branch}`) + /// hold the newest seen commit timestamp so each branch can be + /// incremented independently — important because a brand new branch's + /// `since` cursor must start unset even when the default branch has + /// been polled many times already. + /// + /// When `user_id` is supplied, branches are pre-filtered via GraphQL + /// to those with at least one commit by the user — vastly cheaper for + /// large upstream forks where most branches were never touched. On + /// GraphQL failure (or no token), falls back to the REST branch list + /// and relies on the per-branch 500-as-empty handling to discard the + /// noise. async fn scan_repo(&self, repo: &Repo) -> Result { - let state_key = format!("github-repo:{}", repo.full_name); + let branches = if self.config.token.is_some() { + match self.list_branches_with_commits(repo, &self.config.user).await { + Ok(b) => b, + Err(e) => { + warn!(repo = %repo.full_name, error = %e, "graphql branch filter failed; falling back to REST"); + self.list_branches(repo).await? + } + } + } else { + self.list_branches(repo).await? + }; + if branches.is_empty() { + return Ok(0); + } + + let mut total = 0usize; + // Dedup commits seen via multiple branches in one tick. Without this + // the same SHA appears in the upsert batch twice (postgres rejects + // duplicate conflict targets in a single INSERT). + let mut seen_in_tick: HashSet = HashSet::new(); + for branch in &branches { + match self.scan_repo_branch(repo, branch, &mut seen_in_tick).await { + Ok(n) => total += n, + Err(SourceError::Http(ref msg)) if msg.starts_with("403") || msg.starts_with("429") => { + return Err(SourceError::Http(msg.clone())); + } + Err(e) => { + warn!(repo = %repo.full_name, branch = %branch, error = %e, "branch scan failed; continuing"); + } + } + } + Ok(total) + } + + async fn scan_repo_branch( + &self, + repo: &Repo, + branch: &str, + seen_in_tick: &mut HashSet, + ) -> Result { + let state_key = format!("github-repo:{}@{}", repo.full_name, branch); let prior = self.state.load(&state_key).await?; let since = prior.as_ref().and_then(|s| s.last_modified); + let encoded_branch = utf8_percent_encode(branch, BRANCH_ENCODE_SET).to_string(); + let mut total = 0usize; let mut newest: Option> = since; for page in 1..=MAX_BACKFILL_PAGES { let mut url = format!( - "https://api.github.com/repos/{}/commits?author={}&per_page={}&page={}", - repo.full_name, self.config.user, self.config.per_page, page + "https://api.github.com/repos/{}/commits?author={}&sha={}&per_page={}&page={}", + repo.full_name, self.config.user, encoded_branch, self.config.per_page, page ); if let Some(since_dt) = since { url.push_str(&format!("&since={}", since_dt.to_rfc3339())); @@ -256,11 +479,20 @@ impl GithubRepoSource { break; } if status.as_u16() == 403 || status.as_u16() == 429 { - warn!(repo = %repo.full_name, status = %status, "rate limited; stopping early"); + warn!(repo = %repo.full_name, branch = %branch, status = %status, "rate limited; stopping early"); return Err(SourceError::Http(format!("{} GET {}", status, url))); } if status.as_u16() == 404 { - warn!(repo = %repo.full_name, "repo not found; skipping"); + warn!(repo = %repo.full_name, branch = %branch, "repo or branch not found; skipping"); + break; + } + // GitHub's `/repos/.../commits?author=X&sha=branch` returns 500 + // (not an empty array) when the user has zero commits on the + // specified branch. Treat it as "no commits on this branch" + // rather than a server error — surfacing it as a warning floods + // logs on forks whose branches were all authored by upstream. + if status.as_u16() == 500 { + debug!(repo = %repo.full_name, branch = %branch, "no commits by author on branch (500)"); break; } if !status.is_success() { @@ -275,16 +507,32 @@ impl GithubRepoSource { break; } - let events: Vec = items - .iter() - .filter_map(|item| parse_commit(item, repo)) - .collect(); - for ev in &events { - newest = Some(match newest { - Some(n) if ev.occurred_at > n => ev.occurred_at, - Some(n) => n, - None => ev.occurred_at, - }); + let mut events = Vec::with_capacity(items.len()); + for item in &items { + if let Some(ev) = parse_commit(item, repo) { + if seen_in_tick.insert(ev.id.clone()) { + if let Some(n) = newest { + if ev.occurred_at > n { + newest = Some(ev.occurred_at); + } + } else { + newest = Some(ev.occurred_at); + } + events.push(ev); + } else { + // Already ingested via another branch this tick; + // still advance `newest` so the per-branch cursor + // doesn't get stuck behind shared history. + let occurred = parse_commit_date(item); + if let Some(t) = occurred { + newest = Some(match newest { + Some(n) if t > n => t, + Some(n) => n, + None => t, + }); + } + } + } } total += self.writer.upsert_events(&events).await?; @@ -451,8 +699,7 @@ fn parse_repo(item: &Value) -> Option { }) } -fn parse_commit(item: &Value, repo: &Repo) -> Option { - let sha = item.get("sha").and_then(Value::as_str)?; +fn parse_commit_date(item: &Value) -> Option> { let date_str = item .get("commit") .and_then(|c| c.get("author")) @@ -464,9 +711,16 @@ fn parse_commit(item: &Value, repo: &Repo) -> Option { .and_then(|c| c.get("date")) .and_then(Value::as_str) })?; - let occurred_at = DateTime::parse_from_rfc3339(date_str) - .ok()? - .with_timezone(&Utc); + Some( + DateTime::parse_from_rfc3339(date_str) + .ok()? + .with_timezone(&Utc), + ) +} + +fn parse_commit(item: &Value, repo: &Repo) -> Option { + let sha = item.get("sha").and_then(Value::as_str)?; + let occurred_at = parse_commit_date(item)?; let mut payload = item.clone(); if let Some(obj) = payload.as_object_mut() { diff --git a/crates/moments-data/src/github_search.rs b/crates/moments-data/src/github_search.rs index dc6a43b..3a20bc6 100644 --- a/crates/moments-data/src/github_search.rs +++ b/crates/moments-data/src/github_search.rs @@ -113,8 +113,11 @@ impl GithubSearchSource { ) -> Result { let mut total = 0usize; for page in 1..=self.config.max_pages { + // `fork:true` opts forks into the search — by default GitHub's + // search API excludes them entirely, which means commits on a + // user's fork (regardless of branch) never surface here. let url = format!( - "https://api.github.com/search/commits?q=author:{}&sort=author-date&order=desc&per_page={}&page={}", + "https://api.github.com/search/commits?q=author:{}+fork:true&sort=author-date&order=desc&per_page={}&page={}", self.config.user, self.config.per_page, page ); let req = self.apply_headers(self.client.get(&url));