Files
moments/crates/moments-data/src/github_search.rs
rob thijssen 818a535903 feat(worker): capture commits on non-default branches and forks
The ingestion paths each had a gap that let non-default-branch work
slip through: /search/commits silently excludes forks, the per-repo
REST commit scan only walked the default branch, and the user events
feed ages out after 90 days. Catch them by enumerating branches per
repo and scanning each (with per-branch state cursors so a brand-new
branch isn't cut off by the default branch's cursor), pre-filtering
branches via a GraphQL HEAD-author check so big upstream forks like
azure-docs don't trigger hundreds of wasted REST calls, treating
GitHub's HTTP 500 on author-filtered empty branches as "no commits"
rather than a server error, and adding fork:true to the search query.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-17 16:04:58 +03:00

412 lines
14 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
//! GitHub Search API ingestion for historical backfill.
//!
//! The Events API caps at 90 days; this source uses `/search/issues` and
//! `/search/commits` with `author:<user>` to recover issues, PRs, and
//! commits going back as far as GitHub retains them (1000-result ceiling
//! per query is the Search API's hard cap).
//!
//! Fork duplication on /search/commits — the same commit SHA appears in
//! every fork that still contains it — is handled by:
//! * deduplicating by `id = github-commit:<sha>` within each batch
//! before upsert (postgres ON CONFLICT errors if the same conflict
//! target appears twice in one INSERT);
//! * upserting with last-write-wins across batches and runs (the SHA
//! is the same; the repo association may flip between forks but the
//! commit itself is identical).
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use moments_core::{EventSource, EventWriter, PollerStateStore, SourceError};
use moments_entities::{Event, Source};
use reqwest::{Client, header};
use serde_json::Value;
use tracing::{debug, warn};
const SOURCE_NAME: &str = "github-search";
const USER_AGENT: &str = concat!(
"moments/",
env!("CARGO_PKG_VERSION"),
" (+https://rob.tn)"
);
#[derive(Clone, Debug)]
pub struct GithubSearchConfig {
pub user: String,
pub token: Option<String>,
pub per_page: u32,
/// Hard cap on pages walked per query. The Search API itself only returns
/// the first 1000 results across pages, so 10 × 100 covers everything.
pub max_pages: u32,
}
impl Default for GithubSearchConfig {
fn default() -> Self {
Self {
user: "grenade".into(),
token: None,
per_page: 100,
max_pages: 10,
}
}
}
pub struct GithubSearchSource {
client: Client,
writer: Arc<dyn EventWriter>,
state: Arc<dyn PollerStateStore>,
config: GithubSearchConfig,
}
impl GithubSearchSource {
pub fn new(
client: Client,
writer: Arc<dyn EventWriter>,
state: Arc<dyn PollerStateStore>,
config: GithubSearchConfig,
) -> Self {
Self {
client,
writer,
state,
config,
}
}
fn apply_headers(&self, mut req: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
req = req
.header(header::ACCEPT, "application/vnd.github+json")
.header("X-GitHub-Api-Version", "2022-11-28")
.header(header::USER_AGENT, USER_AGENT);
if let Some(token) = &self.config.token {
req = req.header(header::AUTHORIZATION, format!("Bearer {token}"));
}
req
}
/// Read repo visibility from `/repos/{full_name}`. Used for results from
/// /search/issues, which don't include the visibility flag inline.
async fn fetch_repo_private(&self, full_name: &str) -> Result<bool, SourceError> {
let url = format!("https://api.github.com/repos/{full_name}");
let req = self.apply_headers(self.client.get(&url));
let resp = req
.send()
.await
.map_err(|e| SourceError::Http(e.to_string()))?;
if !resp.status().is_success() {
// Repo may be deleted / inaccessible. Treat as private (safer:
// we'd rather under-expose than over-expose).
return Err(SourceError::Http(format!("{} GET {}", resp.status(), url)));
}
let v: Value = resp
.json()
.await
.map_err(|e| SourceError::Parse(e.to_string()))?;
Ok(v.get("private").and_then(Value::as_bool).unwrap_or(false))
}
async fn search_commits(
&self,
vis_cache: &mut HashMap<String, bool>,
) -> Result<usize, SourceError> {
let mut total = 0usize;
for page in 1..=self.config.max_pages {
// `fork:true` opts forks into the search — by default GitHub's
// search API excludes them entirely, which means commits on a
// user's fork (regardless of branch) never surface here.
let url = format!(
"https://api.github.com/search/commits?q=author:{}+fork:true&sort=author-date&order=desc&per_page={}&page={}",
self.config.user, self.config.per_page, page
);
let req = self.apply_headers(self.client.get(&url));
let resp = req
.send()
.await
.map_err(|e| SourceError::Http(e.to_string()))?;
if !resp.status().is_success() {
return Err(SourceError::Http(format!("{} GET {}", resp.status(), url)));
}
let body: Value = resp
.json()
.await
.map_err(|e| SourceError::Parse(e.to_string()))?;
let items = body
.get("items")
.and_then(Value::as_array)
.cloned()
.unwrap_or_default();
if items.is_empty() {
break;
}
// Dedup within the page by id (same commit in multiple forks
// returns multiple search items with the same SHA — postgres
// refuses ON CONFLICT when the conflict target appears twice
// in one INSERT).
let mut seen: HashSet<String> = HashSet::new();
let mut events = Vec::with_capacity(items.len());
for item in &items {
if let Some(ev) = parse_commit_event(item) {
if seen.insert(ev.id.clone()) {
// Opportunistically populate the visibility cache so
// search_issues can reuse it for the same repos.
if let Some(repo) = item
.get("repository")
.and_then(|r| r.get("full_name"))
.and_then(Value::as_str)
{
vis_cache.insert(repo.to_string(), !ev.public);
}
events.push(ev);
}
}
}
total += self.writer.upsert_events(&events).await?;
if items.len() < self.config.per_page as usize {
break;
}
}
Ok(total)
}
async fn search_issues(
&self,
vis_cache: &mut HashMap<String, bool>,
) -> Result<usize, SourceError> {
let mut total = 0usize;
for page in 1..=self.config.max_pages {
let url = format!(
"https://api.github.com/search/issues?q=author:{}&sort=created&order=desc&per_page={}&page={}",
self.config.user, self.config.per_page, page
);
let req = self.apply_headers(self.client.get(&url));
let resp = req
.send()
.await
.map_err(|e| SourceError::Http(e.to_string()))?;
if !resp.status().is_success() {
return Err(SourceError::Http(format!("{} GET {}", resp.status(), url)));
}
let body: Value = resp
.json()
.await
.map_err(|e| SourceError::Parse(e.to_string()))?;
let items = body
.get("items")
.and_then(Value::as_array)
.cloned()
.unwrap_or_default();
if items.is_empty() {
break;
}
let mut events = Vec::with_capacity(items.len());
for item in &items {
if let Some(ev) = self.search_issue_to_event(item, vis_cache).await {
events.push(ev);
}
}
total += self.writer.upsert_events(&events).await?;
// Last page if we got fewer than per_page items.
if items.len() < self.config.per_page as usize {
break;
}
}
Ok(total)
}
async fn search_issue_to_event(
&self,
item: &Value,
vis_cache: &mut HashMap<String, bool>,
) -> Option<Event> {
let number = item.get("number").and_then(Value::as_i64)?;
let html_url = item.get("html_url").and_then(Value::as_str)?;
let created_at_str = item.get("created_at").and_then(Value::as_str)?;
let occurred_at = DateTime::parse_from_rfc3339(created_at_str)
.ok()?
.with_timezone(&Utc);
let repo = repo_from_html_url(html_url)?;
let private = match vis_cache.get(&repo).copied() {
Some(p) => p,
None => match self.fetch_repo_private(&repo).await {
Ok(p) => {
vis_cache.insert(repo.clone(), p);
p
}
Err(e) => {
warn!(repo = %repo, error = %e, "repo visibility lookup failed; treating as private");
vis_cache.insert(repo.clone(), true);
true
}
},
};
let action = if item.get("pull_request").is_some() {
"PullRequest"
} else {
"Issue"
};
Some(Event {
id: format!("github-issue:{repo}#{number}"),
source: Source::Github,
action: action.into(),
occurred_at,
public: !private,
payload: item.clone(),
})
}
}
#[async_trait]
impl EventSource for GithubSearchSource {
fn name(&self) -> &'static str {
SOURCE_NAME
}
async fn poll(&self) -> Result<usize, SourceError> {
let mut vis_cache: HashMap<String, bool> = HashMap::new();
// Run commits first so vis_cache is partly seeded with inline-flag
// visibility before the issue loop hits its (more expensive) per-repo
// lookups.
let commits = self.search_commits(&mut vis_cache).await?;
let issues = self.search_issues(&mut vis_cache).await?;
self.state.touch(SOURCE_NAME).await?;
debug!(
commits,
issues,
unique_repos = vis_cache.len(),
"github-search poll complete"
);
Ok(commits + issues)
}
}
/// Convert a /search/commits item into our Event row. Returns None if the
/// item is missing required fields.
fn parse_commit_event(item: &Value) -> Option<Event> {
let sha = item.get("sha").and_then(Value::as_str)?;
let html_url = item.get("html_url").and_then(Value::as_str)?;
// Prefer author.date — it's when the work was written; committer.date
// can shift on rebase. Either is RFC3339.
let date_str = item
.get("commit")
.and_then(|c| c.get("author"))
.and_then(|a| a.get("date"))
.and_then(Value::as_str)
.or_else(|| {
item.get("commit")
.and_then(|c| c.get("committer"))
.and_then(|c| c.get("date"))
.and_then(Value::as_str)
})?;
let occurred_at = DateTime::parse_from_rfc3339(date_str)
.ok()?
.with_timezone(&Utc);
let private = item
.get("repository")
.and_then(|r| r.get("private"))
.and_then(Value::as_bool)
.unwrap_or(false);
// Sanity-check the html_url points at github.com so we don't ingest
// garbage if GitHub ever changes its URL shape.
if !html_url.starts_with("https://github.com/") {
return None;
}
Some(Event {
id: format!("github-commit:{sha}"),
source: Source::Github,
action: "Commit".into(),
occurred_at,
public: !private,
payload: item.clone(),
})
}
/// Extract `owner/repo` from a github.com URL like
/// `https://github.com/owner/repo/{issues,pull}/42`.
fn repo_from_html_url(url: &str) -> Option<String> {
let stripped = url.strip_prefix("https://github.com/")?;
let mut parts = stripped.splitn(3, '/');
let owner = parts.next()?;
let repo = parts.next()?;
if owner.is_empty() || repo.is_empty() {
return None;
}
Some(format!("{owner}/{repo}"))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn extracts_repo_from_html_url() {
assert_eq!(
repo_from_html_url("https://github.com/Nehliin/vortex/issues/125").as_deref(),
Some("Nehliin/vortex")
);
assert_eq!(
repo_from_html_url("https://github.com/grenade/moments/pull/3").as_deref(),
Some("grenade/moments")
);
}
#[test]
fn rejects_non_github_host() {
assert!(repo_from_html_url("https://gitlab.com/x/y/-/issues/1").is_none());
}
#[test]
fn parse_commit_uses_sha_as_id() {
let raw = serde_json::json!({
"sha": "a6fcefbe909a97ad5a049b9fa48bc74309af10d9",
"html_url": "https://github.com/faith1337z/Trade/commit/a6fcefbe909a97ad5a049b9fa48bc74309af10d9",
"commit": {
"author": { "name": "rob", "date": "2017-11-13T23:32:31+02:00" },
"committer": { "name": "rob", "date": "2017-11-13T22:32:31+01:00" },
"message": "split multiline message into multiple irc messages"
},
"repository": { "full_name": "faith1337z/Trade", "private": false }
});
let ev = parse_commit_event(&raw).expect("parses");
assert_eq!(ev.id, "github-commit:a6fcefbe909a97ad5a049b9fa48bc74309af10d9");
assert_eq!(ev.action, "Commit");
assert!(ev.public);
}
#[test]
fn parse_commit_marks_private_repo() {
let raw = serde_json::json!({
"sha": "deadbeef",
"html_url": "https://github.com/grenade/private-repo/commit/deadbeef",
"commit": {
"author": { "date": "2024-01-01T00:00:00Z" },
"message": "x"
},
"repository": { "full_name": "grenade/private-repo", "private": true }
});
let ev = parse_commit_event(&raw).expect("parses");
assert!(!ev.public);
}
#[test]
fn parse_commit_rejects_non_github_url() {
let raw = serde_json::json!({
"sha": "abc",
"html_url": "https://example.com/x/commit/abc",
"commit": { "author": { "date": "2024-01-01T00:00:00Z" } },
"repository": { "full_name": "x/y", "private": false }
});
assert!(parse_commit_event(&raw).is_none());
}
}