diff --git a/crates/moments-core/src/presentation/github.rs b/crates/moments-core/src/presentation/github.rs index f8324d2..ec27e44 100644 --- a/crates/moments-core/src/presentation/github.rs +++ b/crates/moments-core/src/presentation/github.rs @@ -4,6 +4,12 @@ use moments_entities::{ use serde_json::Value; pub(crate) fn reshape(event: &Event) -> TimelineItem { + // Search-API items have a different payload shape (the search item itself + // rather than a wrapped event), so dispatch them through a separate path. + if matches!(event.action.as_str(), "Issue" | "PullRequest") { + return search_reshape(event); + } + let p = &event.payload; let repo_name = p.get("repo").and_then(|r| r.get("name")).and_then(Value::as_str); let actor_login = p @@ -360,6 +366,74 @@ fn public(repo: Option<&str>) -> Reshaped { (TimelineIcon::Generic, title, None, None) } +fn search_reshape(event: &Event) -> TimelineItem { + let p = &event.payload; + let html_url = p.get("html_url").and_then(Value::as_str).unwrap_or(""); + let number = p.get("number").and_then(Value::as_i64).unwrap_or(0); + let issue_title = p.get("title").and_then(Value::as_str).unwrap_or(""); + let state = p.get("state").and_then(Value::as_str).unwrap_or(""); + let pr_obj = p.get("pull_request"); + let is_pr = pr_obj.is_some(); + let merged = pr_obj + .and_then(|pr| pr.get("merged_at")) + .map(|v| !v.is_null()) + .unwrap_or(false); + let user_login = p + .get("user") + .and_then(|u| u.get("login")) + .and_then(Value::as_str); + + let repo = repo_from_url(html_url).unwrap_or_else(|| "(unknown repo)".into()); + + let verb = match (is_pr, state, merged) { + (true, "closed", true) => "merged", + (true, "closed", false) => "closed", + (true, _, _) => "opened", + (false, "closed", _) => "closed", + (false, _, _) => "opened", + }; + let kind = if is_pr { "pull request" } else { "issue" }; + let icon = match (is_pr, verb) { + (true, "merged") => TimelineIcon::GitMerge, + (true, _) => TimelineIcon::PullRequest, + (false, _) => TimelineIcon::Issue, + }; + + let mut title = Vec::new(); + if let Some(actor) = user_login { + title.push(TitleSegment::link( + actor.to_string(), + format!("https://github.com/{actor}"), + )); + title.push(TitleSegment::text(" ")); + } + title.push(TitleSegment::text(format!("{verb} {kind} "))); + title.push(TitleSegment::link(format!("#{number}"), html_url.to_string())); + title.push(TitleSegment::text(" in ")); + title.push(repo_link(&repo)); + + let subtitle = (!issue_title.is_empty()).then(|| vec![TitleSegment::text(issue_title.to_string())]); + + TimelineItem { + id: event.id.clone(), + source: Source::Github, + action: event.action.clone(), + occurred_at: event.occurred_at, + icon, + title, + subtitle, + body: None, + } +} + +fn repo_from_url(url: &str) -> Option { + let stripped = url.strip_prefix("https://github.com/")?; + let mut parts = stripped.splitn(3, '/'); + let owner = parts.next()?; + let repo = parts.next()?; + (!owner.is_empty() && !repo.is_empty()).then(|| format!("{owner}/{repo}")) +} + fn fallback(repo: Option<&str>, action: &str) -> Reshaped { let title = match repo { Some(r) => vec![ @@ -475,6 +549,57 @@ mod tests { } } + #[test] + fn search_issue_reshape_open() { + let raw = json!({ + "number": 125, + "title": "Feature: peer blocklist", + "state": "open", + "html_url": "https://github.com/Nehliin/vortex/issues/125", + "user": { "login": "grenade" } + }); + let item = reshape(&ev("Issue", raw)); + assert_eq!(item.icon, TimelineIcon::Issue); + let rendered: String = item + .title + .iter() + .map(|s| match s { + TitleSegment::Text { text } => text.clone(), + TitleSegment::Link { text, .. } => text.clone(), + }) + .collect(); + assert!( + rendered.contains("opened issue #125 in Nehliin/vortex"), + "got: {rendered}" + ); + } + + #[test] + fn search_pr_reshape_merged_uses_merge_icon() { + let raw = json!({ + "number": 42, + "title": "wire it up", + "state": "closed", + "html_url": "https://github.com/grenade/moments/pull/42", + "user": { "login": "grenade" }, + "pull_request": { "merged_at": "2026-04-15T10:00:00Z" } + }); + let item = reshape(&ev("PullRequest", raw)); + assert_eq!(item.icon, TimelineIcon::GitMerge); + let rendered: String = item + .title + .iter() + .map(|s| match s { + TitleSegment::Text { text } => text.clone(), + TitleSegment::Link { text, .. } => text.clone(), + }) + .collect(); + assert!( + rendered.contains("merged pull request #42 in grenade/moments"), + "got: {rendered}" + ); + } + #[test] fn unknown_event_falls_back() { let raw = json!({ diff --git a/crates/moments-data/src/github_search.rs b/crates/moments-data/src/github_search.rs new file mode 100644 index 0000000..4abb93c --- /dev/null +++ b/crates/moments-data/src/github_search.rs @@ -0,0 +1,250 @@ +//! GitHub Search API ingestion for historical backfill. +//! +//! The Events API caps at 90 days; this source uses `/search/issues` with +//! `author:` to recover issues and PRs going back as far as GitHub +//! retains them (1000-result ceiling per the Search API's hard cap). +//! +//! `/search/commits` is deliberately not used: GitHub matches the same commit +//! across every fork that contains it, inflating result counts and surfacing +//! commits in repos the user never authored to. If commit history becomes +//! desirable we should enumerate the user's repos and walk per-repo +//! `/repos/{o}/{r}/commits?author=...` instead. + +use std::collections::HashMap; +use std::sync::Arc; + +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use moments_core::{EventSource, EventWriter, PollerStateStore, SourceError}; +use moments_entities::{Event, Source}; +use reqwest::{Client, header}; +use serde_json::Value; +use tracing::{debug, warn}; + +const SOURCE_NAME: &str = "github-search"; +const USER_AGENT: &str = concat!( + "moments/", + env!("CARGO_PKG_VERSION"), + " (+https://rob.tn)" +); + +#[derive(Clone, Debug)] +pub struct GithubSearchConfig { + pub user: String, + pub token: Option, + pub per_page: u32, + /// Hard cap on pages walked per query. The Search API itself only returns + /// the first 1000 results across pages, so 10 × 100 covers everything. + pub max_pages: u32, +} + +impl Default for GithubSearchConfig { + fn default() -> Self { + Self { + user: "grenade".into(), + token: None, + per_page: 100, + max_pages: 10, + } + } +} + +pub struct GithubSearchSource { + client: Client, + writer: Arc, + state: Arc, + config: GithubSearchConfig, +} + +impl GithubSearchSource { + pub fn new( + client: Client, + writer: Arc, + state: Arc, + config: GithubSearchConfig, + ) -> Self { + Self { + client, + writer, + state, + config, + } + } + + fn apply_headers(&self, mut req: reqwest::RequestBuilder) -> reqwest::RequestBuilder { + req = req + .header(header::ACCEPT, "application/vnd.github+json") + .header("X-GitHub-Api-Version", "2022-11-28") + .header(header::USER_AGENT, USER_AGENT); + if let Some(token) = &self.config.token { + req = req.header(header::AUTHORIZATION, format!("Bearer {token}")); + } + req + } + + /// Read repo visibility from `/repos/{full_name}`. Used for results from + /// /search/issues, which don't include the visibility flag inline. + async fn fetch_repo_private(&self, full_name: &str) -> Result { + let url = format!("https://api.github.com/repos/{full_name}"); + let req = self.apply_headers(self.client.get(&url)); + let resp = req + .send() + .await + .map_err(|e| SourceError::Http(e.to_string()))?; + if !resp.status().is_success() { + // Repo may be deleted / inaccessible. Treat as private (safer: + // we'd rather under-expose than over-expose). + return Err(SourceError::Http(format!("{} GET {}", resp.status(), url))); + } + let v: Value = resp + .json() + .await + .map_err(|e| SourceError::Parse(e.to_string()))?; + Ok(v.get("private").and_then(Value::as_bool).unwrap_or(false)) + } + + async fn search_issues( + &self, + vis_cache: &mut HashMap, + ) -> Result { + let mut total = 0usize; + for page in 1..=self.config.max_pages { + let url = format!( + "https://api.github.com/search/issues?q=author:{}&sort=created&order=desc&per_page={}&page={}", + self.config.user, self.config.per_page, page + ); + let req = self.apply_headers(self.client.get(&url)); + let resp = req + .send() + .await + .map_err(|e| SourceError::Http(e.to_string()))?; + if !resp.status().is_success() { + return Err(SourceError::Http(format!("{} GET {}", resp.status(), url))); + } + let body: Value = resp + .json() + .await + .map_err(|e| SourceError::Parse(e.to_string()))?; + let items = body + .get("items") + .and_then(Value::as_array) + .cloned() + .unwrap_or_default(); + if items.is_empty() { + break; + } + + let mut events = Vec::with_capacity(items.len()); + for item in &items { + if let Some(ev) = self.search_issue_to_event(item, vis_cache).await { + events.push(ev); + } + } + total += self.writer.upsert_events(&events).await?; + + // Last page if we got fewer than per_page items. + if items.len() < self.config.per_page as usize { + break; + } + } + Ok(total) + } + + async fn search_issue_to_event( + &self, + item: &Value, + vis_cache: &mut HashMap, + ) -> Option { + let number = item.get("number").and_then(Value::as_i64)?; + let html_url = item.get("html_url").and_then(Value::as_str)?; + let created_at_str = item.get("created_at").and_then(Value::as_str)?; + let occurred_at = DateTime::parse_from_rfc3339(created_at_str) + .ok()? + .with_timezone(&Utc); + let repo = repo_from_html_url(html_url)?; + + let private = match vis_cache.get(&repo).copied() { + Some(p) => p, + None => match self.fetch_repo_private(&repo).await { + Ok(p) => { + vis_cache.insert(repo.clone(), p); + p + } + Err(e) => { + warn!(repo = %repo, error = %e, "repo visibility lookup failed; treating as private"); + vis_cache.insert(repo.clone(), true); + true + } + }, + }; + + let action = if item.get("pull_request").is_some() { + "PullRequest" + } else { + "Issue" + }; + + Some(Event { + id: format!("github-issue:{repo}#{number}"), + source: Source::Github, + action: action.into(), + occurred_at, + public: !private, + payload: item.clone(), + }) + } +} + +#[async_trait] +impl EventSource for GithubSearchSource { + fn name(&self) -> &'static str { + SOURCE_NAME + } + + async fn poll(&self) -> Result { + let mut vis_cache: HashMap = HashMap::new(); + let total = self.search_issues(&mut vis_cache).await?; + self.state.touch(SOURCE_NAME).await?; + debug!( + ingested = total, + unique_repos = vis_cache.len(), + "github-search poll complete" + ); + Ok(total) + } +} + +/// Extract `owner/repo` from a github.com URL like +/// `https://github.com/owner/repo/{issues,pull}/42`. +fn repo_from_html_url(url: &str) -> Option { + let stripped = url.strip_prefix("https://github.com/")?; + let mut parts = stripped.splitn(3, '/'); + let owner = parts.next()?; + let repo = parts.next()?; + if owner.is_empty() || repo.is_empty() { + return None; + } + Some(format!("{owner}/{repo}")) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn extracts_repo_from_html_url() { + assert_eq!( + repo_from_html_url("https://github.com/Nehliin/vortex/issues/125").as_deref(), + Some("Nehliin/vortex") + ); + assert_eq!( + repo_from_html_url("https://github.com/grenade/moments/pull/3").as_deref(), + Some("grenade/moments") + ); + } + + #[test] + fn rejects_non_github_host() { + assert!(repo_from_html_url("https://gitlab.com/x/y/-/issues/1").is_none()); + } +} diff --git a/crates/moments-data/src/lib.rs b/crates/moments-data/src/lib.rs index bd274ea..789d5eb 100644 --- a/crates/moments-data/src/lib.rs +++ b/crates/moments-data/src/lib.rs @@ -1,4 +1,5 @@ pub mod github; +pub mod github_search; use async_trait::async_trait; use chrono::{DateTime, Utc}; diff --git a/crates/moments-worker/src/main.rs b/crates/moments-worker/src/main.rs index 00a9d3f..4477087 100644 --- a/crates/moments-worker/src/main.rs +++ b/crates/moments-worker/src/main.rs @@ -5,6 +5,7 @@ use moments_core::{EventSource, run_poller}; use moments_data::{ PgStore, github::{GithubConfig, GithubSource}, + github_search::{GithubSearchConfig, GithubSearchSource}, }; use reqwest::Client; use tracing::info; @@ -22,9 +23,14 @@ struct Args { #[arg(long, env = "GITHUB_TOKEN")] github_token: Option, - /// Seconds between poll attempts per source. + /// Seconds between Events-API polls (live feed, last 90 days). #[arg(long, env = "POLL_INTERVAL_SECS", default_value = "600")] interval_secs: u64, + + /// Seconds between Search-API polls (historical issue/PR backfill). + /// Defaults to 24h — this is a backfill, not a live feed. + #[arg(long, env = "SEARCH_POLL_INTERVAL_SECS", default_value = "86400")] + search_interval_secs: u64, } #[tokio::main] @@ -50,18 +56,35 @@ async fn main() -> anyhow::Result<()> { }, )) as Arc; + let github_search = Arc::new(GithubSearchSource::new( + http.clone(), + store.clone(), + store.clone(), + GithubSearchConfig { + user: args.github_user.clone(), + token: args.github_token.clone(), + ..Default::default() + }, + )) as Arc; + info!( github_user = args.github_user, interval_secs = args.interval_secs, + search_interval_secs = args.search_interval_secs, "worker started" ); let interval = Duration::from_secs(args.interval_secs); + let search_interval = Duration::from_secs(args.search_interval_secs); + let github_task = tokio::spawn(async move { run_poller(github, interval).await }); + let github_search_task = + tokio::spawn(async move { run_poller(github_search, search_interval).await }); tokio::signal::ctrl_c().await?; info!("shutdown signal received"); github_task.abort(); + github_search_task.abort(); Ok(()) }