//! Per-repo commit enumeration for full GitHub history. //! //! Discovers repos via two sources: //! 1. REST `/user/repos` — repos where the user is owner, collaborator, //! or org member. //! 2. GraphQL `repositoriesContributedTo` — repos the user has committed //! to, opened issues/PRs on, or reviewed, even without collaborator //! status. No result cap (cursor-paginated). //! //! Then walks each repo's commit history via //! `/repos/{owner}/{repo}/commits?author={user}` with a `since` cursor //! to avoid re-fetching known commits. //! //! Events use `github-commit:{sha}` as their ID, matching the scheme in //! `github_search`, so duplicates are resolved via idempotent upsert. use std::collections::HashSet; use std::sync::Arc; use async_trait::async_trait; use chrono::{DateTime, Utc}; use moments_core::{EventSource, EventWriter, PollerStateStore, SourceError}; use moments_entities::{Event, RepoLanguage, Source}; use reqwest::{Client, header}; use serde_json::Value; use tracing::{debug, warn}; const SOURCE_NAME: &str = "github-repo"; const USER_AGENT: &str = concat!( "moments/", env!("CARGO_PKG_VERSION"), " (+https://rob.tn)" ); const MAX_BACKFILL_PAGES: u32 = 100; #[derive(Clone, Debug)] pub struct GithubRepoConfig { pub user: String, pub token: Option, pub per_page: u32, } impl Default for GithubRepoConfig { fn default() -> Self { Self { user: "grenade".into(), token: None, per_page: 100, } } } pub struct GithubRepoSource { client: Client, writer: Arc, state: Arc, config: GithubRepoConfig, } impl GithubRepoSource { pub fn new( client: Client, writer: Arc, state: Arc, config: GithubRepoConfig, ) -> Self { Self { client, writer, state, config, } } fn apply_headers(&self, mut req: reqwest::RequestBuilder) -> reqwest::RequestBuilder { req = req .header(header::ACCEPT, "application/vnd.github+json") .header("X-GitHub-Api-Version", "2022-11-28") .header(header::USER_AGENT, USER_AGENT); if let Some(token) = &self.config.token { req = req.header(header::AUTHORIZATION, format!("Bearer {token}")); } req } /// Discover all repos the authenticated user can access. async fn discover_repos(&self) -> Result, SourceError> { if self.config.token.is_none() { return Ok(vec![]); } let mut repos = Vec::new(); for page in 1..=50 { let url = format!( "https://api.github.com/user/repos?affiliation=owner,collaborator,organization_member&visibility=all&per_page={}&page={}", self.config.per_page, page ); let req = self.apply_headers(self.client.get(&url)); let resp = req .send() .await .map_err(|e| SourceError::Http(e.to_string()))?; if !resp.status().is_success() { return Err(SourceError::Http(format!("{} GET {}", resp.status(), url))); } let items: Vec = resp .json() .await .map_err(|e| SourceError::Parse(e.to_string()))?; if items.is_empty() { break; } for item in &items { if let Some(r) = parse_repo(item) { repos.push(r); } } if items.len() < self.config.per_page as usize { break; } } // Supplement with repos from GraphQL repositoriesContributedTo. // This catches repos where the user contributed via PRs but isn't // an owner, collaborator, or org member — no result cap. let mut known: HashSet = repos.iter().map(|r| r.full_name.clone()).collect(); let contributed = self.discover_contributed_repos().await; match contributed { Ok(extra) => { for r in extra { if known.insert(r.full_name.clone()) { repos.push(r); } } } Err(e) => { warn!(error = %e, "GraphQL contributed-repos discovery failed; continuing with known repos"); } } Ok(repos) } /// Discover repos the user has contributed to via GraphQL. /// Uses cursor-based pagination with no result cap. async fn discover_contributed_repos(&self) -> Result, SourceError> { let token = match &self.config.token { Some(t) => t, None => return Ok(vec![]), }; let mut repos = Vec::new(); let mut cursor: Option = None; loop { let after = match &cursor { Some(c) => format!(", after: \"{}\"", c), None => String::new(), }; let query = format!( r#"{{ user(login: "{}") {{ repositoriesContributedTo(first: 100, contributionTypes: [COMMIT, PULL_REQUEST, ISSUE]{}) {{ pageInfo {{ hasNextPage endCursor }} nodes {{ nameWithOwner isPrivate }} }} }} }}"#, self.config.user, after ); let body = serde_json::json!({ "query": query }); let resp = self .client .post("https://api.github.com/graphql") .header(header::AUTHORIZATION, format!("Bearer {token}")) .header(header::USER_AGENT, USER_AGENT) .header(header::CONTENT_TYPE, "application/json") .json(&body) .send() .await .map_err(|e| SourceError::Http(e.to_string()))?; if !resp.status().is_success() { return Err(SourceError::Http(format!( "{} POST graphql", resp.status() ))); } let data: Value = resp .json() .await .map_err(|e| SourceError::Parse(e.to_string()))?; // Check for GraphQL-level errors if let Some(errors) = data.get("errors").and_then(Value::as_array) { if let Some(msg) = errors.first().and_then(|e| e.get("message")).and_then(Value::as_str) { return Err(SourceError::Http(format!("GraphQL error: {msg}"))); } } let contributed = &data["data"]["user"]["repositoriesContributedTo"]; let nodes = contributed["nodes"].as_array(); if let Some(nodes) = nodes { for node in nodes { let full_name = node .get("nameWithOwner") .and_then(Value::as_str); let private = node .get("isPrivate") .and_then(Value::as_bool) .unwrap_or(false); if let Some(name) = full_name { repos.push(Repo { full_name: name.to_string(), private, }); } } } let has_next = contributed["pageInfo"]["hasNextPage"] .as_bool() .unwrap_or(false); if !has_next { break; } cursor = contributed["pageInfo"]["endCursor"] .as_str() .map(String::from); } debug!(repos = repos.len(), "discovered contributed repos via GraphQL"); Ok(repos) } /// Fetch commits for a single repo, paginating fully on first run /// and using `since` on subsequent runs to catch everything new. async fn scan_repo(&self, repo: &Repo) -> Result { let state_key = format!("github-repo:{}", repo.full_name); let prior = self.state.load(&state_key).await?; let since = prior.as_ref().and_then(|s| s.last_modified); let mut total = 0usize; let mut newest: Option> = since; for page in 1..=MAX_BACKFILL_PAGES { let mut url = format!( "https://api.github.com/repos/{}/commits?author={}&per_page={}&page={}", repo.full_name, self.config.user, self.config.per_page, page ); if let Some(since_dt) = since { url.push_str(&format!("&since={}", since_dt.to_rfc3339())); } let req = self.apply_headers(self.client.get(&url)); let resp = req .send() .await .map_err(|e| SourceError::Http(e.to_string()))?; let status = resp.status(); // 409 = empty repo (no commits at all), not an error if status.as_u16() == 409 { break; } if status.as_u16() == 403 || status.as_u16() == 429 { warn!(repo = %repo.full_name, status = %status, "rate limited; stopping early"); return Err(SourceError::Http(format!("{} GET {}", status, url))); } if status.as_u16() == 404 { warn!(repo = %repo.full_name, "repo not found; skipping"); break; } if !status.is_success() { return Err(SourceError::Http(format!("{} GET {}", status, url))); } let items: Vec = resp .json() .await .map_err(|e| SourceError::Parse(e.to_string()))?; if items.is_empty() { break; } let events: Vec = items .iter() .filter_map(|item| parse_commit(item, repo)) .collect(); for ev in &events { newest = Some(match newest { Some(n) if ev.occurred_at > n => ev.occurred_at, Some(n) => n, None => ev.occurred_at, }); } total += self.writer.upsert_events(&events).await?; if items.len() < self.config.per_page as usize { break; } } self.state.save(&state_key, None, newest).await?; Ok(total) } /// Batch-fetch language breakdowns for repos via GraphQL, upserting /// into repo_languages. Repos are batched using GraphQL aliases to /// minimise round trips. async fn fetch_languages(&self, repos: &[Repo]) -> Result { let token = match &self.config.token { Some(t) => t, None => return Ok(0), }; let mut total = 0usize; for chunk in repos.chunks(20) { let mut fragments = Vec::with_capacity(chunk.len()); for (i, repo) in chunk.iter().enumerate() { let parts: Vec<&str> = repo.full_name.splitn(2, '/').collect(); if parts.len() != 2 { continue; } fragments.push(format!( r#"r{i}: repository(owner: "{}", name: "{}") {{ languages(first: 20, orderBy: {{field: SIZE, direction: DESC}}) {{ edges {{ size node {{ name color }} }} }} }}"#, parts[0], parts[1] )); } if fragments.is_empty() { continue; } let query = format!("{{ {} }}", fragments.join(" ")); let body = serde_json::json!({ "query": query }); let resp = self .client .post("https://api.github.com/graphql") .header(header::AUTHORIZATION, format!("Bearer {token}")) .header(header::USER_AGENT, USER_AGENT) .header(header::CONTENT_TYPE, "application/json") .json(&body) .send() .await .map_err(|e| SourceError::Http(e.to_string()))?; if !resp.status().is_success() { warn!(status = %resp.status(), "GraphQL language fetch failed"); break; } let data: Value = resp .json() .await .map_err(|e| SourceError::Parse(e.to_string()))?; if let Some(errors) = data.get("errors").and_then(Value::as_array) { if let Some(msg) = errors.first().and_then(|e| e.get("message")).and_then(Value::as_str) { warn!(error = %msg, "GraphQL language fetch had errors"); } } let data_obj = match data.get("data") { Some(d) => d, None => continue, }; let mut languages = Vec::new(); for (i, repo) in chunk.iter().enumerate() { let alias = format!("r{i}"); let edges = data_obj .get(&alias) .and_then(|r| r.get("languages")) .and_then(|l| l.get("edges")) .and_then(Value::as_array); if let Some(edges) = edges { for edge in edges { let size = edge.get("size").and_then(Value::as_i64).unwrap_or(0); let name = edge .get("node") .and_then(|n| n.get("name")) .and_then(Value::as_str); let color = edge .get("node") .and_then(|n| n.get("color")) .and_then(Value::as_str); if let Some(name) = name { languages.push(RepoLanguage { source: Source::Github, repo: repo.full_name.clone(), language: name.to_string(), bytes: size, color: color.map(String::from), }); } } } } total += self.writer.upsert_repo_languages(&languages).await?; } debug!(total, "repo languages updated"); Ok(total) } } #[async_trait] impl EventSource for GithubRepoSource { fn name(&self) -> &'static str { SOURCE_NAME } async fn poll(&self) -> Result { let repos = self.discover_repos().await?; debug!(repos = repos.len(), "discovered github repos"); let mut total = 0usize; for repo in &repos { match self.scan_repo(repo).await { Ok(n) => { if n > 0 { debug!(repo = %repo.full_name, ingested = n, "repo commit scan complete"); } total += n; } Err(SourceError::Http(ref msg)) if msg.starts_with("403") || msg.starts_with("429") => { warn!("rate limited during repo scan; ending poll early"); break; } Err(e) => { warn!(repo = %repo.full_name, error = %e, "repo scan failed; continuing"); } } } if let Err(e) = self.fetch_languages(&repos).await { warn!(error = %e, "language fetch failed; continuing"); } self.state.touch(SOURCE_NAME).await?; debug!(ingested = total, repos = repos.len(), "github-repo poll complete"); Ok(total) } } #[derive(Debug, Clone)] struct Repo { full_name: String, private: bool, } fn parse_repo(item: &Value) -> Option { let full_name = item.get("full_name").and_then(Value::as_str)?; let private = item.get("private").and_then(Value::as_bool).unwrap_or(false); Some(Repo { full_name: full_name.to_string(), private, }) } fn parse_commit(item: &Value, repo: &Repo) -> Option { let sha = item.get("sha").and_then(Value::as_str)?; let date_str = item .get("commit") .and_then(|c| c.get("author")) .and_then(|a| a.get("date")) .and_then(Value::as_str) .or_else(|| { item.get("commit") .and_then(|c| c.get("committer")) .and_then(|c| c.get("date")) .and_then(Value::as_str) })?; let occurred_at = DateTime::parse_from_rfc3339(date_str) .ok()? .with_timezone(&Utc); let mut payload = item.clone(); if let Some(obj) = payload.as_object_mut() { obj.insert("_repo".into(), Value::String(repo.full_name.clone())); } Some(Event { id: format!("github-commit:{sha}"), source: Source::Github, action: "Commit".into(), occurred_at, public: !repo.private, payload, }) } #[cfg(test)] mod tests { use super::*; use serde_json::json; #[test] fn parse_commit_uses_sha_as_id() { let repo = Repo { full_name: "grenade/moments".into(), private: false, }; let raw = json!({ "sha": "abc123", "commit": { "author": { "date": "2024-01-15T10:30:00Z" }, "message": "fix something" } }); let ev = parse_commit(&raw, &repo).expect("parses"); assert_eq!(ev.id, "github-commit:abc123"); assert_eq!(ev.action, "Commit"); assert!(ev.public); } #[test] fn parse_commit_private_repo() { let repo = Repo { full_name: "grenade/secret".into(), private: true, }; let raw = json!({ "sha": "def456", "commit": { "author": { "date": "2024-01-15T10:30:00Z" }, "message": "secret change" } }); let ev = parse_commit(&raw, &repo).expect("parses"); assert!(!ev.public); } #[test] fn parse_commit_falls_back_to_committer_date() { let repo = Repo { full_name: "grenade/moments".into(), private: false, }; let raw = json!({ "sha": "ghi789", "commit": { "committer": { "date": "2024-02-01T12:00:00Z" }, "message": "no author date" } }); let ev = parse_commit(&raw, &repo).expect("parses"); assert_eq!(ev.id, "github-commit:ghi789"); } #[test] fn parse_repo_extracts_fields() { let raw = json!({ "full_name": "grenade/moments", "private": false }); let repo = parse_repo(&raw).expect("parses"); assert_eq!(repo.full_name, "grenade/moments"); assert!(!repo.private); } }