From e4052c4c9a3e8470ab57690467a17a262e00f160 Mon Sep 17 00:00:00 2001 From: rob thijssen Date: Sun, 3 May 2026 18:49:06 +0300 Subject: [PATCH] feat(worker): add github search api source for historical backfill MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The Events API is hard-capped at 90 days (15 events for grenade right now). The Search API has its own 1000-result-per-query cap but reaches the start of the user's GitHub history — for grenade, 430 issues/PRs going back to 2012-08-08. GET /search/issues?q=author:&sort=created&order=desc Polled hourly by default but defaults to 24h interval since this is backfill, not a live feed. After the first run most upserts are no-ops. Stored as Source::Github with action "Issue" or "PullRequest" (distinguished by the .pull_request field on the search item), keyed `github-issue:/#`. /search/commits is deliberately not used: GitHub matches the same commit across every fork that contains it, so 275k of grenade's "commits" are mostly duplicated fork hits in repos he never authored to. If commit history becomes valuable we should enumerate his repos and walk per-repo /commits?author= instead. Visibility: search/issues items don't carry .private, so we lookup /repos/{full_name} once per unique repo encountered (cached for the duration of the poll). Failure to resolve is treated as private — better to under-expose than over-expose on the public timeline. Reshape: presentation/github.rs gains an Issue/PullRequest path that extracts from the search item shape (html_url, number, title, state, .pull_request.merged_at) rather than the events-API wrapper. Merged PRs use the GitMerge icon, mirroring the events-API path. Worker now spawns two tokio tasks (events + search), aborts both on SIGINT. New env: SEARCH_POLL_INTERVAL_SECS (default 86400). Tests: +2 in moments-data (URL parsing), +2 in moments-core (search Issue + merged-PR reshape) — 14 total green. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../moments-core/src/presentation/github.rs | 125 +++++++++ crates/moments-data/src/github_search.rs | 250 ++++++++++++++++++ crates/moments-data/src/lib.rs | 1 + crates/moments-worker/src/main.rs | 25 +- 4 files changed, 400 insertions(+), 1 deletion(-) create mode 100644 crates/moments-data/src/github_search.rs diff --git a/crates/moments-core/src/presentation/github.rs b/crates/moments-core/src/presentation/github.rs index f8324d2..ec27e44 100644 --- a/crates/moments-core/src/presentation/github.rs +++ b/crates/moments-core/src/presentation/github.rs @@ -4,6 +4,12 @@ use moments_entities::{ use serde_json::Value; pub(crate) fn reshape(event: &Event) -> TimelineItem { + // Search-API items have a different payload shape (the search item itself + // rather than a wrapped event), so dispatch them through a separate path. + if matches!(event.action.as_str(), "Issue" | "PullRequest") { + return search_reshape(event); + } + let p = &event.payload; let repo_name = p.get("repo").and_then(|r| r.get("name")).and_then(Value::as_str); let actor_login = p @@ -360,6 +366,74 @@ fn public(repo: Option<&str>) -> Reshaped { (TimelineIcon::Generic, title, None, None) } +fn search_reshape(event: &Event) -> TimelineItem { + let p = &event.payload; + let html_url = p.get("html_url").and_then(Value::as_str).unwrap_or(""); + let number = p.get("number").and_then(Value::as_i64).unwrap_or(0); + let issue_title = p.get("title").and_then(Value::as_str).unwrap_or(""); + let state = p.get("state").and_then(Value::as_str).unwrap_or(""); + let pr_obj = p.get("pull_request"); + let is_pr = pr_obj.is_some(); + let merged = pr_obj + .and_then(|pr| pr.get("merged_at")) + .map(|v| !v.is_null()) + .unwrap_or(false); + let user_login = p + .get("user") + .and_then(|u| u.get("login")) + .and_then(Value::as_str); + + let repo = repo_from_url(html_url).unwrap_or_else(|| "(unknown repo)".into()); + + let verb = match (is_pr, state, merged) { + (true, "closed", true) => "merged", + (true, "closed", false) => "closed", + (true, _, _) => "opened", + (false, "closed", _) => "closed", + (false, _, _) => "opened", + }; + let kind = if is_pr { "pull request" } else { "issue" }; + let icon = match (is_pr, verb) { + (true, "merged") => TimelineIcon::GitMerge, + (true, _) => TimelineIcon::PullRequest, + (false, _) => TimelineIcon::Issue, + }; + + let mut title = Vec::new(); + if let Some(actor) = user_login { + title.push(TitleSegment::link( + actor.to_string(), + format!("https://github.com/{actor}"), + )); + title.push(TitleSegment::text(" ")); + } + title.push(TitleSegment::text(format!("{verb} {kind} "))); + title.push(TitleSegment::link(format!("#{number}"), html_url.to_string())); + title.push(TitleSegment::text(" in ")); + title.push(repo_link(&repo)); + + let subtitle = (!issue_title.is_empty()).then(|| vec![TitleSegment::text(issue_title.to_string())]); + + TimelineItem { + id: event.id.clone(), + source: Source::Github, + action: event.action.clone(), + occurred_at: event.occurred_at, + icon, + title, + subtitle, + body: None, + } +} + +fn repo_from_url(url: &str) -> Option { + let stripped = url.strip_prefix("https://github.com/")?; + let mut parts = stripped.splitn(3, '/'); + let owner = parts.next()?; + let repo = parts.next()?; + (!owner.is_empty() && !repo.is_empty()).then(|| format!("{owner}/{repo}")) +} + fn fallback(repo: Option<&str>, action: &str) -> Reshaped { let title = match repo { Some(r) => vec![ @@ -475,6 +549,57 @@ mod tests { } } + #[test] + fn search_issue_reshape_open() { + let raw = json!({ + "number": 125, + "title": "Feature: peer blocklist", + "state": "open", + "html_url": "https://github.com/Nehliin/vortex/issues/125", + "user": { "login": "grenade" } + }); + let item = reshape(&ev("Issue", raw)); + assert_eq!(item.icon, TimelineIcon::Issue); + let rendered: String = item + .title + .iter() + .map(|s| match s { + TitleSegment::Text { text } => text.clone(), + TitleSegment::Link { text, .. } => text.clone(), + }) + .collect(); + assert!( + rendered.contains("opened issue #125 in Nehliin/vortex"), + "got: {rendered}" + ); + } + + #[test] + fn search_pr_reshape_merged_uses_merge_icon() { + let raw = json!({ + "number": 42, + "title": "wire it up", + "state": "closed", + "html_url": "https://github.com/grenade/moments/pull/42", + "user": { "login": "grenade" }, + "pull_request": { "merged_at": "2026-04-15T10:00:00Z" } + }); + let item = reshape(&ev("PullRequest", raw)); + assert_eq!(item.icon, TimelineIcon::GitMerge); + let rendered: String = item + .title + .iter() + .map(|s| match s { + TitleSegment::Text { text } => text.clone(), + TitleSegment::Link { text, .. } => text.clone(), + }) + .collect(); + assert!( + rendered.contains("merged pull request #42 in grenade/moments"), + "got: {rendered}" + ); + } + #[test] fn unknown_event_falls_back() { let raw = json!({ diff --git a/crates/moments-data/src/github_search.rs b/crates/moments-data/src/github_search.rs new file mode 100644 index 0000000..4abb93c --- /dev/null +++ b/crates/moments-data/src/github_search.rs @@ -0,0 +1,250 @@ +//! GitHub Search API ingestion for historical backfill. +//! +//! The Events API caps at 90 days; this source uses `/search/issues` with +//! `author:` to recover issues and PRs going back as far as GitHub +//! retains them (1000-result ceiling per the Search API's hard cap). +//! +//! `/search/commits` is deliberately not used: GitHub matches the same commit +//! across every fork that contains it, inflating result counts and surfacing +//! commits in repos the user never authored to. If commit history becomes +//! desirable we should enumerate the user's repos and walk per-repo +//! `/repos/{o}/{r}/commits?author=...` instead. + +use std::collections::HashMap; +use std::sync::Arc; + +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use moments_core::{EventSource, EventWriter, PollerStateStore, SourceError}; +use moments_entities::{Event, Source}; +use reqwest::{Client, header}; +use serde_json::Value; +use tracing::{debug, warn}; + +const SOURCE_NAME: &str = "github-search"; +const USER_AGENT: &str = concat!( + "moments/", + env!("CARGO_PKG_VERSION"), + " (+https://rob.tn)" +); + +#[derive(Clone, Debug)] +pub struct GithubSearchConfig { + pub user: String, + pub token: Option, + pub per_page: u32, + /// Hard cap on pages walked per query. The Search API itself only returns + /// the first 1000 results across pages, so 10 × 100 covers everything. + pub max_pages: u32, +} + +impl Default for GithubSearchConfig { + fn default() -> Self { + Self { + user: "grenade".into(), + token: None, + per_page: 100, + max_pages: 10, + } + } +} + +pub struct GithubSearchSource { + client: Client, + writer: Arc, + state: Arc, + config: GithubSearchConfig, +} + +impl GithubSearchSource { + pub fn new( + client: Client, + writer: Arc, + state: Arc, + config: GithubSearchConfig, + ) -> Self { + Self { + client, + writer, + state, + config, + } + } + + fn apply_headers(&self, mut req: reqwest::RequestBuilder) -> reqwest::RequestBuilder { + req = req + .header(header::ACCEPT, "application/vnd.github+json") + .header("X-GitHub-Api-Version", "2022-11-28") + .header(header::USER_AGENT, USER_AGENT); + if let Some(token) = &self.config.token { + req = req.header(header::AUTHORIZATION, format!("Bearer {token}")); + } + req + } + + /// Read repo visibility from `/repos/{full_name}`. Used for results from + /// /search/issues, which don't include the visibility flag inline. + async fn fetch_repo_private(&self, full_name: &str) -> Result { + let url = format!("https://api.github.com/repos/{full_name}"); + let req = self.apply_headers(self.client.get(&url)); + let resp = req + .send() + .await + .map_err(|e| SourceError::Http(e.to_string()))?; + if !resp.status().is_success() { + // Repo may be deleted / inaccessible. Treat as private (safer: + // we'd rather under-expose than over-expose). + return Err(SourceError::Http(format!("{} GET {}", resp.status(), url))); + } + let v: Value = resp + .json() + .await + .map_err(|e| SourceError::Parse(e.to_string()))?; + Ok(v.get("private").and_then(Value::as_bool).unwrap_or(false)) + } + + async fn search_issues( + &self, + vis_cache: &mut HashMap, + ) -> Result { + let mut total = 0usize; + for page in 1..=self.config.max_pages { + let url = format!( + "https://api.github.com/search/issues?q=author:{}&sort=created&order=desc&per_page={}&page={}", + self.config.user, self.config.per_page, page + ); + let req = self.apply_headers(self.client.get(&url)); + let resp = req + .send() + .await + .map_err(|e| SourceError::Http(e.to_string()))?; + if !resp.status().is_success() { + return Err(SourceError::Http(format!("{} GET {}", resp.status(), url))); + } + let body: Value = resp + .json() + .await + .map_err(|e| SourceError::Parse(e.to_string()))?; + let items = body + .get("items") + .and_then(Value::as_array) + .cloned() + .unwrap_or_default(); + if items.is_empty() { + break; + } + + let mut events = Vec::with_capacity(items.len()); + for item in &items { + if let Some(ev) = self.search_issue_to_event(item, vis_cache).await { + events.push(ev); + } + } + total += self.writer.upsert_events(&events).await?; + + // Last page if we got fewer than per_page items. + if items.len() < self.config.per_page as usize { + break; + } + } + Ok(total) + } + + async fn search_issue_to_event( + &self, + item: &Value, + vis_cache: &mut HashMap, + ) -> Option { + let number = item.get("number").and_then(Value::as_i64)?; + let html_url = item.get("html_url").and_then(Value::as_str)?; + let created_at_str = item.get("created_at").and_then(Value::as_str)?; + let occurred_at = DateTime::parse_from_rfc3339(created_at_str) + .ok()? + .with_timezone(&Utc); + let repo = repo_from_html_url(html_url)?; + + let private = match vis_cache.get(&repo).copied() { + Some(p) => p, + None => match self.fetch_repo_private(&repo).await { + Ok(p) => { + vis_cache.insert(repo.clone(), p); + p + } + Err(e) => { + warn!(repo = %repo, error = %e, "repo visibility lookup failed; treating as private"); + vis_cache.insert(repo.clone(), true); + true + } + }, + }; + + let action = if item.get("pull_request").is_some() { + "PullRequest" + } else { + "Issue" + }; + + Some(Event { + id: format!("github-issue:{repo}#{number}"), + source: Source::Github, + action: action.into(), + occurred_at, + public: !private, + payload: item.clone(), + }) + } +} + +#[async_trait] +impl EventSource for GithubSearchSource { + fn name(&self) -> &'static str { + SOURCE_NAME + } + + async fn poll(&self) -> Result { + let mut vis_cache: HashMap = HashMap::new(); + let total = self.search_issues(&mut vis_cache).await?; + self.state.touch(SOURCE_NAME).await?; + debug!( + ingested = total, + unique_repos = vis_cache.len(), + "github-search poll complete" + ); + Ok(total) + } +} + +/// Extract `owner/repo` from a github.com URL like +/// `https://github.com/owner/repo/{issues,pull}/42`. +fn repo_from_html_url(url: &str) -> Option { + let stripped = url.strip_prefix("https://github.com/")?; + let mut parts = stripped.splitn(3, '/'); + let owner = parts.next()?; + let repo = parts.next()?; + if owner.is_empty() || repo.is_empty() { + return None; + } + Some(format!("{owner}/{repo}")) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn extracts_repo_from_html_url() { + assert_eq!( + repo_from_html_url("https://github.com/Nehliin/vortex/issues/125").as_deref(), + Some("Nehliin/vortex") + ); + assert_eq!( + repo_from_html_url("https://github.com/grenade/moments/pull/3").as_deref(), + Some("grenade/moments") + ); + } + + #[test] + fn rejects_non_github_host() { + assert!(repo_from_html_url("https://gitlab.com/x/y/-/issues/1").is_none()); + } +} diff --git a/crates/moments-data/src/lib.rs b/crates/moments-data/src/lib.rs index bd274ea..789d5eb 100644 --- a/crates/moments-data/src/lib.rs +++ b/crates/moments-data/src/lib.rs @@ -1,4 +1,5 @@ pub mod github; +pub mod github_search; use async_trait::async_trait; use chrono::{DateTime, Utc}; diff --git a/crates/moments-worker/src/main.rs b/crates/moments-worker/src/main.rs index 00a9d3f..4477087 100644 --- a/crates/moments-worker/src/main.rs +++ b/crates/moments-worker/src/main.rs @@ -5,6 +5,7 @@ use moments_core::{EventSource, run_poller}; use moments_data::{ PgStore, github::{GithubConfig, GithubSource}, + github_search::{GithubSearchConfig, GithubSearchSource}, }; use reqwest::Client; use tracing::info; @@ -22,9 +23,14 @@ struct Args { #[arg(long, env = "GITHUB_TOKEN")] github_token: Option, - /// Seconds between poll attempts per source. + /// Seconds between Events-API polls (live feed, last 90 days). #[arg(long, env = "POLL_INTERVAL_SECS", default_value = "600")] interval_secs: u64, + + /// Seconds between Search-API polls (historical issue/PR backfill). + /// Defaults to 24h — this is a backfill, not a live feed. + #[arg(long, env = "SEARCH_POLL_INTERVAL_SECS", default_value = "86400")] + search_interval_secs: u64, } #[tokio::main] @@ -50,18 +56,35 @@ async fn main() -> anyhow::Result<()> { }, )) as Arc; + let github_search = Arc::new(GithubSearchSource::new( + http.clone(), + store.clone(), + store.clone(), + GithubSearchConfig { + user: args.github_user.clone(), + token: args.github_token.clone(), + ..Default::default() + }, + )) as Arc; + info!( github_user = args.github_user, interval_secs = args.interval_secs, + search_interval_secs = args.search_interval_secs, "worker started" ); let interval = Duration::from_secs(args.interval_secs); + let search_interval = Duration::from_secs(args.search_interval_secs); + let github_task = tokio::spawn(async move { run_poller(github, interval).await }); + let github_search_task = + tokio::spawn(async move { run_poller(github_search, search_interval).await }); tokio::signal::ctrl_c().await?; info!("shutdown signal received"); github_task.abort(); + github_search_task.abort(); Ok(()) }