diff --git a/Cargo.lock b/Cargo.lock index e3c3523..a4dcf49 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1148,6 +1148,8 @@ dependencies = [ "serde", "serde_json", "thiserror", + "tokio", + "tracing", ] [[package]] @@ -1158,6 +1160,8 @@ dependencies = [ "chrono", "moments-core", "moments-entities", + "reqwest", + "serde", "serde_json", "sqlx", "thiserror", diff --git a/Cargo.toml b/Cargo.toml index 82baf0a..63535f8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ thiserror = "2" # core / data sqlx = { version = "0.8", default-features = false, features = ["postgres", "runtime-tokio-rustls", "macros", "migrate", "chrono", "json"] } +async-trait = "0.1" # binaries tokio = { version = "1", features = ["rt-multi-thread", "macros", "signal", "time"] } diff --git a/crates/moments-core/Cargo.toml b/crates/moments-core/Cargo.toml index 821d925..d20e483 100644 --- a/crates/moments-core/Cargo.toml +++ b/crates/moments-core/Cargo.toml @@ -12,4 +12,6 @@ serde.workspace = true serde_json.workspace = true chrono.workspace = true thiserror.workspace = true -async-trait = "0.1" +async-trait.workspace = true +tokio = { workspace = true, features = ["rt", "time"] } +tracing.workspace = true diff --git a/crates/moments-core/src/lib.rs b/crates/moments-core/src/lib.rs index ecce040..42be16d 100644 --- a/crates/moments-core/src/lib.rs +++ b/crates/moments-core/src/lib.rs @@ -1,3 +1,7 @@ +pub mod sources; + +pub use sources::{EventSource, PollerState, PollerStateStore, SourceError, run_poller}; + use async_trait::async_trait; use moments_entities::{Event, EventQuery, SourceSummary}; diff --git a/crates/moments-core/src/sources.rs b/crates/moments-core/src/sources.rs new file mode 100644 index 0000000..a07a1b3 --- /dev/null +++ b/crates/moments-core/src/sources.rs @@ -0,0 +1,96 @@ +use std::{sync::Arc, time::Duration}; + +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use tracing::{debug, info, warn}; + +use crate::StoreError; + +/// A pollable upstream activity feed (github, gitea, hg, bugzilla). +/// +/// Implementations are responsible for: fetching from upstream, persisting +/// any incremental-fetch bookkeeping (etag, since-cursor), transforming +/// raw payloads into [`moments_entities::Event`] rows, and writing them. +/// `poll` returns the count of rows ingested on this tick (0 if nothing +/// changed upstream). +#[async_trait] +pub trait EventSource: Send + Sync { + fn name(&self) -> &'static str; + async fn poll(&self) -> Result; +} + +#[derive(Debug, thiserror::Error)] +pub enum SourceError { + #[error("http: {0}")] + Http(String), + #[error("parse: {0}")] + Parse(String), + #[error("storage: {0}")] + Storage(#[from] StoreError), +} + +/// Persisted per-source bookkeeping for incremental fetch (etag, last-modified). +#[derive(Debug, Clone)] +pub struct PollerState { + pub source: String, + pub etag: Option, + pub last_modified: Option>, + pub last_fetched: DateTime, +} + +#[async_trait] +pub trait PollerStateStore: Send + Sync { + async fn load(&self, source: &str) -> Result, StoreError>; + async fn save( + &self, + source: &str, + etag: Option<&str>, + last_modified: Option>, + ) -> Result<(), StoreError>; + async fn touch(&self, source: &str) -> Result<(), StoreError>; +} + +/// Drive a single source on a fixed interval until cancelled. Backs off +/// (with jitter) on consecutive failures up to a 64-second ceiling. +pub async fn run_poller(source: Arc, interval: Duration) { + let mut ticker = tokio::time::interval(interval); + ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + + let mut consecutive_failures: u32 = 0; + loop { + ticker.tick().await; + match source.poll().await { + Ok(0) => { + consecutive_failures = 0; + debug!(source = source.name(), "no new events"); + } + Ok(count) => { + consecutive_failures = 0; + info!(source = source.name(), count, "ingested"); + } + Err(e) => { + consecutive_failures = consecutive_failures.saturating_add(1); + let backoff = backoff_with_jitter(consecutive_failures); + warn!( + source = source.name(), + error = %e, + attempt = consecutive_failures, + backoff_ms = backoff.as_millis() as u64, + "poll failed; backing off" + ); + tokio::time::sleep(backoff).await; + } + } + } +} + +fn backoff_with_jitter(attempt: u32) -> Duration { + // 1s, 2s, 4s, 8s, 16s, 32s, 64s ... capped + let base_ms: u64 = 1_000u64.saturating_mul(1u64 << attempt.min(6)); + // pseudo-random jitter from system time nanos — fine for backoff smoothing. + let jitter_ms = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| (d.subsec_nanos() % 1_000) as u64) + .unwrap_or(0); + Duration::from_millis(base_ms + jitter_ms) +} diff --git a/crates/moments-data/Cargo.toml b/crates/moments-data/Cargo.toml index 79e7ca8..7505022 100644 --- a/crates/moments-data/Cargo.toml +++ b/crates/moments-data/Cargo.toml @@ -14,4 +14,6 @@ serde_json.workspace = true chrono.workspace = true thiserror.workspace = true tracing.workspace = true -async-trait = "0.1" +async-trait.workspace = true +reqwest.workspace = true +serde.workspace = true diff --git a/crates/moments-data/migrations/0002_poller_state.sql b/crates/moments-data/migrations/0002_poller_state.sql new file mode 100644 index 0000000..d44d110 --- /dev/null +++ b/crates/moments-data/migrations/0002_poller_state.sql @@ -0,0 +1,6 @@ +CREATE TABLE poller_state ( + source TEXT PRIMARY KEY, + etag TEXT, + last_modified TIMESTAMPTZ, + last_fetched TIMESTAMPTZ NOT NULL DEFAULT now() +); diff --git a/crates/moments-data/src/github.rs b/crates/moments-data/src/github.rs new file mode 100644 index 0000000..6923853 --- /dev/null +++ b/crates/moments-data/src/github.rs @@ -0,0 +1,255 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use moments_core::{EventSource, EventWriter, PollerStateStore, SourceError}; +use moments_entities::{Event, Source}; +use reqwest::{Client, StatusCode, header}; +use tracing::debug; + +const SOURCE_NAME: &str = "github"; +const USER_AGENT: &str = concat!( + "moments/", + env!("CARGO_PKG_VERSION"), + " (+https://rob.tn)" +); + +/// Cap on initial backfill pagination. GitHub returns ~300 events max +/// across pages; this is a safety net, not an expected limit. +const MAX_BACKFILL_PAGES: usize = 10; + +#[derive(Clone, Debug)] +pub struct GithubConfig { + pub user: String, + pub token: Option, + pub per_page: u32, +} + +impl Default for GithubConfig { + fn default() -> Self { + Self { + user: "grenade".into(), + token: None, + per_page: 100, + } + } +} + +pub struct GithubSource { + client: Client, + writer: Arc, + state: Arc, + config: GithubConfig, +} + +impl GithubSource { + pub fn new( + client: Client, + writer: Arc, + state: Arc, + config: GithubConfig, + ) -> Self { + Self { + client, + writer, + state, + config, + } + } + + fn first_page_url(&self) -> String { + // Public events endpoint: works without auth (60/hr unauth, 5000/hr authed). + // The non-public `/users/{u}/events` endpoint now requires auth and returns + // private-repo activity, which we don't want on a public timeline anyway. + format!( + "https://api.github.com/users/{}/events/public?per_page={}", + self.config.user, self.config.per_page + ) + } + + fn apply_common_headers(&self, mut req: reqwest::RequestBuilder) -> reqwest::RequestBuilder { + req = req + .header(header::ACCEPT, "application/vnd.github+json") + .header("X-GitHub-Api-Version", "2022-11-28") + .header(header::USER_AGENT, USER_AGENT); + if let Some(token) = &self.config.token { + req = req.header(header::AUTHORIZATION, format!("Bearer {token}")); + } + req + } +} + +#[async_trait] +impl EventSource for GithubSource { + fn name(&self) -> &'static str { + SOURCE_NAME + } + + async fn poll(&self) -> Result { + let prior = self.state.load(SOURCE_NAME).await?; + let prior_etag = prior.as_ref().and_then(|s| s.etag.clone()); + let first_run = prior.is_none(); + + let mut url = self.first_page_url(); + let mut total = 0usize; + let mut latest_etag: Option = None; + let mut page_idx = 0usize; + + loop { + let mut req = self.client.get(&url); + req = self.apply_common_headers(req); + // ETag conditional only on the first page; following Link "next" + // pages are historical and don't change. + if page_idx == 0 { + if let Some(etag) = &prior_etag { + req = req.header(header::IF_NONE_MATCH, etag); + } + } + + let resp = req + .send() + .await + .map_err(|e| SourceError::Http(e.to_string()))?; + + if resp.status() == StatusCode::NOT_MODIFIED { + // Only reachable on page 1, and only when we sent an ETag. + debug!(source = SOURCE_NAME, "304 not modified"); + self.state.touch(SOURCE_NAME).await?; + return Ok(0); + } + if !resp.status().is_success() { + return Err(SourceError::Http(format!( + "{} {}", + resp.status(), + resp.url() + ))); + } + + if page_idx == 0 { + latest_etag = resp + .headers() + .get(header::ETAG) + .and_then(|v| v.to_str().ok()) + .map(str::to_string); + } + + let next_url = parse_link_next(resp.headers().get(header::LINK)); + let raw_events: Vec = resp + .json() + .await + .map_err(|e| SourceError::Parse(e.to_string()))?; + let events: Vec = raw_events + .into_iter() + .filter_map(parse_github_event) + .collect(); + total += self.writer.upsert_events(&events).await?; + + page_idx += 1; + + // Subsequent runs only fetch page 1; the historical pages don't + // change and re-fetching them on every tick is waste. + if !first_run { + break; + } + if page_idx >= MAX_BACKFILL_PAGES { + break; + } + match next_url { + Some(u) => url = u, + None => break, + } + } + + self.state.save(SOURCE_NAME, latest_etag.as_deref(), None).await?; + Ok(total) + } +} + +fn parse_github_event(raw: serde_json::Value) -> Option { + let id = raw.get("id")?.as_str()?.to_string(); + let event_type = raw.get("type")?.as_str()?.to_string(); + let created_at_str = raw.get("created_at")?.as_str()?; + let occurred_at = DateTime::parse_from_rfc3339(created_at_str) + .ok()? + .with_timezone(&Utc); + Some(Event { + id: format!("github:{id}"), + source: Source::Github, + action: event_type, + occurred_at, + payload: raw, + }) +} + +/// Parse the `next` URL out of a GitHub `Link` header. +/// Format: `; rel="next", ; rel="last"`. +fn parse_link_next(header: Option<&header::HeaderValue>) -> Option { + let raw = header?.to_str().ok()?; + for part in raw.split(',') { + let part = part.trim(); + // Each part: `; rel="next"` + let (url_part, rel_part) = part.split_once(';')?; + let url = url_part.trim().trim_start_matches('<').trim_end_matches('>'); + let rel = rel_part.trim(); + if rel.eq_ignore_ascii_case("rel=\"next\"") { + return Some(url.to_string()); + } + } + None +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parses_minimal_event() { + let raw = serde_json::json!({ + "id": "12345", + "type": "PushEvent", + "created_at": "2026-04-15T10:30:00Z", + "actor": { "login": "grenade" }, + "repo": { "name": "grenade/moments" }, + "payload": { "ref": "refs/heads/main" } + }); + let ev = parse_github_event(raw.clone()).expect("parses"); + assert_eq!(ev.id, "github:12345"); + assert_eq!(ev.source, Source::Github); + assert_eq!(ev.action, "PushEvent"); + assert_eq!(ev.payload, raw); + } + + #[test] + fn rejects_event_missing_id() { + let raw = serde_json::json!({ "type": "PushEvent", "created_at": "2026-01-01T00:00:00Z" }); + assert!(parse_github_event(raw).is_none()); + } + + #[test] + fn extracts_next_link() { + let mut h = header::HeaderMap::new(); + h.insert( + header::LINK, + r#"; rel="next", ; rel="last""# + .parse() + .unwrap(), + ); + let next = parse_link_next(h.get(header::LINK)); + assert_eq!( + next.as_deref(), + Some("https://api.github.com/users/grenade/events?page=2") + ); + } + + #[test] + fn no_next_link_when_only_prev() { + let mut h = header::HeaderMap::new(); + h.insert( + header::LINK, + r#"; rel="prev""# + .parse() + .unwrap(), + ); + assert!(parse_link_next(h.get(header::LINK)).is_none()); + } +} diff --git a/crates/moments-data/src/lib.rs b/crates/moments-data/src/lib.rs index a85bf5a..5f3c11f 100644 --- a/crates/moments-data/src/lib.rs +++ b/crates/moments-data/src/lib.rs @@ -1,6 +1,8 @@ +pub mod github; + use async_trait::async_trait; use chrono::{DateTime, Utc}; -use moments_core::{EventReader, EventWriter, StoreError}; +use moments_core::{EventReader, EventWriter, PollerState, PollerStateStore, StoreError}; use moments_entities::{Event, EventQuery, Source, SourceSummary}; use sqlx::Row; use sqlx::postgres::{PgPool, PgPoolOptions}; @@ -105,6 +107,74 @@ impl EventReader for PgStore { } } +#[async_trait] +impl PollerStateStore for PgStore { + async fn load(&self, source: &str) -> Result, StoreError> { + let row = sqlx::query( + r#" + SELECT source, etag, last_modified, last_fetched + FROM poller_state + WHERE source = $1 + "#, + ) + .bind(source) + .fetch_optional(&self.pool) + .await + .map_err(map_err)?; + + Ok(match row { + None => None, + Some(r) => Some(PollerState { + source: r.try_get("source").map_err(map_err)?, + etag: r.try_get("etag").map_err(map_err)?, + last_modified: r.try_get("last_modified").map_err(map_err)?, + last_fetched: r.try_get("last_fetched").map_err(map_err)?, + }), + }) + } + + async fn save( + &self, + source: &str, + etag: Option<&str>, + last_modified: Option>, + ) -> Result<(), StoreError> { + sqlx::query( + r#" + INSERT INTO poller_state (source, etag, last_modified, last_fetched) + VALUES ($1, $2, $3, now()) + ON CONFLICT (source) DO UPDATE + SET etag = EXCLUDED.etag, + last_modified = EXCLUDED.last_modified, + last_fetched = EXCLUDED.last_fetched + "#, + ) + .bind(source) + .bind(etag) + .bind(last_modified) + .execute(&self.pool) + .await + .map_err(map_err)?; + Ok(()) + } + + async fn touch(&self, source: &str) -> Result<(), StoreError> { + sqlx::query( + r#" + INSERT INTO poller_state (source, last_fetched) + VALUES ($1, now()) + ON CONFLICT (source) DO UPDATE + SET last_fetched = EXCLUDED.last_fetched + "#, + ) + .bind(source) + .execute(&self.pool) + .await + .map_err(map_err)?; + Ok(()) + } +} + #[async_trait] impl EventWriter for PgStore { async fn upsert_events(&self, events: &[Event]) -> Result { diff --git a/crates/moments-worker/src/main.rs b/crates/moments-worker/src/main.rs index 0218a74..00a9d3f 100644 --- a/crates/moments-worker/src/main.rs +++ b/crates/moments-worker/src/main.rs @@ -1,5 +1,12 @@ +use std::{sync::Arc, time::Duration}; + use clap::Parser; -use moments_data::PgStore; +use moments_core::{EventSource, run_poller}; +use moments_data::{ + PgStore, + github::{GithubConfig, GithubSource}, +}; +use reqwest::Client; use tracing::info; #[derive(Parser, Debug)] @@ -7,6 +14,17 @@ use tracing::info; struct Args { #[arg(long, env = "DATABASE_URL")] database_url: String, + + #[arg(long, env = "GITHUB_USER", default_value = "grenade")] + github_user: String, + + /// Optional GitHub token. Higher rate limit and access to private events. + #[arg(long, env = "GITHUB_TOKEN")] + github_token: Option, + + /// Seconds between poll attempts per source. + #[arg(long, env = "POLL_INTERVAL_SECS", default_value = "600")] + interval_secs: u64, } #[tokio::main] @@ -14,14 +32,36 @@ async fn main() -> anyhow::Result<()> { init_tracing(); let args = Args::parse(); - let store = PgStore::connect(&args.database_url).await?; + let store = Arc::new(PgStore::connect(&args.database_url).await?); store.migrate().await?; - info!("worker started — pollers will land in step 2"); + let http = Client::builder() + .timeout(Duration::from_secs(30)) + .build()?; - // Pollers (github, gitea, hg, bugzilla) land in subsequent steps. - // For now this binary only verifies it can reach the database. - let _ = store; + let github = Arc::new(GithubSource::new( + http.clone(), + store.clone(), + store.clone(), + GithubConfig { + user: args.github_user.clone(), + token: args.github_token.clone(), + per_page: 100, + }, + )) as Arc; + + info!( + github_user = args.github_user, + interval_secs = args.interval_secs, + "worker started" + ); + + let interval = Duration::from_secs(args.interval_secs); + let github_task = tokio::spawn(async move { run_poller(github, interval).await }); + + tokio::signal::ctrl_c().await?; + info!("shutdown signal received"); + github_task.abort(); Ok(()) }