feat(worker): add github events poller

Adds the first ingestion source. Page-1 polling is ETag-conditional
(304s don't count against rate limit); the very first run paginates
back through Link "next" pages up to a 10-page safety cap so the
table starts populated rather than waiting for new activity.

Hits /users/{user}/events/public — works without auth, returns the
right scope for a public timeline. Token (GITHUB_TOKEN) is optional;
when present it raises the rate limit from 60 to 5000/hr.

New plumbing:

  moments-core::sources
    - EventSource trait (poll() -> count)
    - PollerStateStore trait (etag persistence port)
    - run_poller driver: tokio interval + jittered exponential backoff

  moments-data::github
    - GithubSource impl, raw payload preserved as JSONB
    - parse_link_next for pagination
    - 4 unit tests covering parser + Link parsing

  migration 0002_poller_state.sql
    - one row per source: source, etag, last_modified, last_fetched

Worker binary spawns one tokio task per source (just github for now)
and aborts on SIGINT. Verified by smoke-curling the upstream endpoint:
ETag and Link headers are present; payload shape matches the parser.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-03 17:59:15 +03:00
parent e40d6b0e44
commit 45ceec2ec7
10 changed files with 489 additions and 9 deletions

4
Cargo.lock generated
View File

@@ -1148,6 +1148,8 @@ dependencies = [
"serde", "serde",
"serde_json", "serde_json",
"thiserror", "thiserror",
"tokio",
"tracing",
] ]
[[package]] [[package]]
@@ -1158,6 +1160,8 @@ dependencies = [
"chrono", "chrono",
"moments-core", "moments-core",
"moments-entities", "moments-entities",
"reqwest",
"serde",
"serde_json", "serde_json",
"sqlx", "sqlx",
"thiserror", "thiserror",

View File

@@ -18,6 +18,7 @@ thiserror = "2"
# core / data # core / data
sqlx = { version = "0.8", default-features = false, features = ["postgres", "runtime-tokio-rustls", "macros", "migrate", "chrono", "json"] } sqlx = { version = "0.8", default-features = false, features = ["postgres", "runtime-tokio-rustls", "macros", "migrate", "chrono", "json"] }
async-trait = "0.1"
# binaries # binaries
tokio = { version = "1", features = ["rt-multi-thread", "macros", "signal", "time"] } tokio = { version = "1", features = ["rt-multi-thread", "macros", "signal", "time"] }

View File

@@ -12,4 +12,6 @@ serde.workspace = true
serde_json.workspace = true serde_json.workspace = true
chrono.workspace = true chrono.workspace = true
thiserror.workspace = true thiserror.workspace = true
async-trait = "0.1" async-trait.workspace = true
tokio = { workspace = true, features = ["rt", "time"] }
tracing.workspace = true

View File

@@ -1,3 +1,7 @@
pub mod sources;
pub use sources::{EventSource, PollerState, PollerStateStore, SourceError, run_poller};
use async_trait::async_trait; use async_trait::async_trait;
use moments_entities::{Event, EventQuery, SourceSummary}; use moments_entities::{Event, EventQuery, SourceSummary};

View File

@@ -0,0 +1,96 @@
use std::{sync::Arc, time::Duration};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use tracing::{debug, info, warn};
use crate::StoreError;
/// A pollable upstream activity feed (github, gitea, hg, bugzilla).
///
/// Implementations are responsible for: fetching from upstream, persisting
/// any incremental-fetch bookkeeping (etag, since-cursor), transforming
/// raw payloads into [`moments_entities::Event`] rows, and writing them.
/// `poll` returns the count of rows ingested on this tick (0 if nothing
/// changed upstream).
#[async_trait]
pub trait EventSource: Send + Sync {
fn name(&self) -> &'static str;
async fn poll(&self) -> Result<usize, SourceError>;
}
#[derive(Debug, thiserror::Error)]
pub enum SourceError {
#[error("http: {0}")]
Http(String),
#[error("parse: {0}")]
Parse(String),
#[error("storage: {0}")]
Storage(#[from] StoreError),
}
/// Persisted per-source bookkeeping for incremental fetch (etag, last-modified).
#[derive(Debug, Clone)]
pub struct PollerState {
pub source: String,
pub etag: Option<String>,
pub last_modified: Option<DateTime<Utc>>,
pub last_fetched: DateTime<Utc>,
}
#[async_trait]
pub trait PollerStateStore: Send + Sync {
async fn load(&self, source: &str) -> Result<Option<PollerState>, StoreError>;
async fn save(
&self,
source: &str,
etag: Option<&str>,
last_modified: Option<DateTime<Utc>>,
) -> Result<(), StoreError>;
async fn touch(&self, source: &str) -> Result<(), StoreError>;
}
/// Drive a single source on a fixed interval until cancelled. Backs off
/// (with jitter) on consecutive failures up to a 64-second ceiling.
pub async fn run_poller(source: Arc<dyn EventSource>, interval: Duration) {
let mut ticker = tokio::time::interval(interval);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
let mut consecutive_failures: u32 = 0;
loop {
ticker.tick().await;
match source.poll().await {
Ok(0) => {
consecutive_failures = 0;
debug!(source = source.name(), "no new events");
}
Ok(count) => {
consecutive_failures = 0;
info!(source = source.name(), count, "ingested");
}
Err(e) => {
consecutive_failures = consecutive_failures.saturating_add(1);
let backoff = backoff_with_jitter(consecutive_failures);
warn!(
source = source.name(),
error = %e,
attempt = consecutive_failures,
backoff_ms = backoff.as_millis() as u64,
"poll failed; backing off"
);
tokio::time::sleep(backoff).await;
}
}
}
}
fn backoff_with_jitter(attempt: u32) -> Duration {
// 1s, 2s, 4s, 8s, 16s, 32s, 64s ... capped
let base_ms: u64 = 1_000u64.saturating_mul(1u64 << attempt.min(6));
// pseudo-random jitter from system time nanos — fine for backoff smoothing.
let jitter_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| (d.subsec_nanos() % 1_000) as u64)
.unwrap_or(0);
Duration::from_millis(base_ms + jitter_ms)
}

View File

@@ -14,4 +14,6 @@ serde_json.workspace = true
chrono.workspace = true chrono.workspace = true
thiserror.workspace = true thiserror.workspace = true
tracing.workspace = true tracing.workspace = true
async-trait = "0.1" async-trait.workspace = true
reqwest.workspace = true
serde.workspace = true

View File

@@ -0,0 +1,6 @@
CREATE TABLE poller_state (
source TEXT PRIMARY KEY,
etag TEXT,
last_modified TIMESTAMPTZ,
last_fetched TIMESTAMPTZ NOT NULL DEFAULT now()
);

View File

@@ -0,0 +1,255 @@
use std::sync::Arc;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use moments_core::{EventSource, EventWriter, PollerStateStore, SourceError};
use moments_entities::{Event, Source};
use reqwest::{Client, StatusCode, header};
use tracing::debug;
const SOURCE_NAME: &str = "github";
const USER_AGENT: &str = concat!(
"moments/",
env!("CARGO_PKG_VERSION"),
" (+https://rob.tn)"
);
/// Cap on initial backfill pagination. GitHub returns ~300 events max
/// across pages; this is a safety net, not an expected limit.
const MAX_BACKFILL_PAGES: usize = 10;
#[derive(Clone, Debug)]
pub struct GithubConfig {
pub user: String,
pub token: Option<String>,
pub per_page: u32,
}
impl Default for GithubConfig {
fn default() -> Self {
Self {
user: "grenade".into(),
token: None,
per_page: 100,
}
}
}
pub struct GithubSource {
client: Client,
writer: Arc<dyn EventWriter>,
state: Arc<dyn PollerStateStore>,
config: GithubConfig,
}
impl GithubSource {
pub fn new(
client: Client,
writer: Arc<dyn EventWriter>,
state: Arc<dyn PollerStateStore>,
config: GithubConfig,
) -> Self {
Self {
client,
writer,
state,
config,
}
}
fn first_page_url(&self) -> String {
// Public events endpoint: works without auth (60/hr unauth, 5000/hr authed).
// The non-public `/users/{u}/events` endpoint now requires auth and returns
// private-repo activity, which we don't want on a public timeline anyway.
format!(
"https://api.github.com/users/{}/events/public?per_page={}",
self.config.user, self.config.per_page
)
}
fn apply_common_headers(&self, mut req: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
req = req
.header(header::ACCEPT, "application/vnd.github+json")
.header("X-GitHub-Api-Version", "2022-11-28")
.header(header::USER_AGENT, USER_AGENT);
if let Some(token) = &self.config.token {
req = req.header(header::AUTHORIZATION, format!("Bearer {token}"));
}
req
}
}
#[async_trait]
impl EventSource for GithubSource {
fn name(&self) -> &'static str {
SOURCE_NAME
}
async fn poll(&self) -> Result<usize, SourceError> {
let prior = self.state.load(SOURCE_NAME).await?;
let prior_etag = prior.as_ref().and_then(|s| s.etag.clone());
let first_run = prior.is_none();
let mut url = self.first_page_url();
let mut total = 0usize;
let mut latest_etag: Option<String> = None;
let mut page_idx = 0usize;
loop {
let mut req = self.client.get(&url);
req = self.apply_common_headers(req);
// ETag conditional only on the first page; following Link "next"
// pages are historical and don't change.
if page_idx == 0 {
if let Some(etag) = &prior_etag {
req = req.header(header::IF_NONE_MATCH, etag);
}
}
let resp = req
.send()
.await
.map_err(|e| SourceError::Http(e.to_string()))?;
if resp.status() == StatusCode::NOT_MODIFIED {
// Only reachable on page 1, and only when we sent an ETag.
debug!(source = SOURCE_NAME, "304 not modified");
self.state.touch(SOURCE_NAME).await?;
return Ok(0);
}
if !resp.status().is_success() {
return Err(SourceError::Http(format!(
"{} {}",
resp.status(),
resp.url()
)));
}
if page_idx == 0 {
latest_etag = resp
.headers()
.get(header::ETAG)
.and_then(|v| v.to_str().ok())
.map(str::to_string);
}
let next_url = parse_link_next(resp.headers().get(header::LINK));
let raw_events: Vec<serde_json::Value> = resp
.json()
.await
.map_err(|e| SourceError::Parse(e.to_string()))?;
let events: Vec<Event> = raw_events
.into_iter()
.filter_map(parse_github_event)
.collect();
total += self.writer.upsert_events(&events).await?;
page_idx += 1;
// Subsequent runs only fetch page 1; the historical pages don't
// change and re-fetching them on every tick is waste.
if !first_run {
break;
}
if page_idx >= MAX_BACKFILL_PAGES {
break;
}
match next_url {
Some(u) => url = u,
None => break,
}
}
self.state.save(SOURCE_NAME, latest_etag.as_deref(), None).await?;
Ok(total)
}
}
fn parse_github_event(raw: serde_json::Value) -> Option<Event> {
let id = raw.get("id")?.as_str()?.to_string();
let event_type = raw.get("type")?.as_str()?.to_string();
let created_at_str = raw.get("created_at")?.as_str()?;
let occurred_at = DateTime::parse_from_rfc3339(created_at_str)
.ok()?
.with_timezone(&Utc);
Some(Event {
id: format!("github:{id}"),
source: Source::Github,
action: event_type,
occurred_at,
payload: raw,
})
}
/// Parse the `next` URL out of a GitHub `Link` header.
/// Format: `<https://...?page=2>; rel="next", <https://...?page=10>; rel="last"`.
fn parse_link_next(header: Option<&header::HeaderValue>) -> Option<String> {
let raw = header?.to_str().ok()?;
for part in raw.split(',') {
let part = part.trim();
// Each part: `<url>; rel="next"`
let (url_part, rel_part) = part.split_once(';')?;
let url = url_part.trim().trim_start_matches('<').trim_end_matches('>');
let rel = rel_part.trim();
if rel.eq_ignore_ascii_case("rel=\"next\"") {
return Some(url.to_string());
}
}
None
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parses_minimal_event() {
let raw = serde_json::json!({
"id": "12345",
"type": "PushEvent",
"created_at": "2026-04-15T10:30:00Z",
"actor": { "login": "grenade" },
"repo": { "name": "grenade/moments" },
"payload": { "ref": "refs/heads/main" }
});
let ev = parse_github_event(raw.clone()).expect("parses");
assert_eq!(ev.id, "github:12345");
assert_eq!(ev.source, Source::Github);
assert_eq!(ev.action, "PushEvent");
assert_eq!(ev.payload, raw);
}
#[test]
fn rejects_event_missing_id() {
let raw = serde_json::json!({ "type": "PushEvent", "created_at": "2026-01-01T00:00:00Z" });
assert!(parse_github_event(raw).is_none());
}
#[test]
fn extracts_next_link() {
let mut h = header::HeaderMap::new();
h.insert(
header::LINK,
r#"<https://api.github.com/users/grenade/events?page=2>; rel="next", <https://api.github.com/users/grenade/events?page=10>; rel="last""#
.parse()
.unwrap(),
);
let next = parse_link_next(h.get(header::LINK));
assert_eq!(
next.as_deref(),
Some("https://api.github.com/users/grenade/events?page=2")
);
}
#[test]
fn no_next_link_when_only_prev() {
let mut h = header::HeaderMap::new();
h.insert(
header::LINK,
r#"<https://api.github.com/users/grenade/events?page=1>; rel="prev""#
.parse()
.unwrap(),
);
assert!(parse_link_next(h.get(header::LINK)).is_none());
}
}

View File

@@ -1,6 +1,8 @@
pub mod github;
use async_trait::async_trait; use async_trait::async_trait;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use moments_core::{EventReader, EventWriter, StoreError}; use moments_core::{EventReader, EventWriter, PollerState, PollerStateStore, StoreError};
use moments_entities::{Event, EventQuery, Source, SourceSummary}; use moments_entities::{Event, EventQuery, Source, SourceSummary};
use sqlx::Row; use sqlx::Row;
use sqlx::postgres::{PgPool, PgPoolOptions}; use sqlx::postgres::{PgPool, PgPoolOptions};
@@ -105,6 +107,74 @@ impl EventReader for PgStore {
} }
} }
#[async_trait]
impl PollerStateStore for PgStore {
async fn load(&self, source: &str) -> Result<Option<PollerState>, StoreError> {
let row = sqlx::query(
r#"
SELECT source, etag, last_modified, last_fetched
FROM poller_state
WHERE source = $1
"#,
)
.bind(source)
.fetch_optional(&self.pool)
.await
.map_err(map_err)?;
Ok(match row {
None => None,
Some(r) => Some(PollerState {
source: r.try_get("source").map_err(map_err)?,
etag: r.try_get("etag").map_err(map_err)?,
last_modified: r.try_get("last_modified").map_err(map_err)?,
last_fetched: r.try_get("last_fetched").map_err(map_err)?,
}),
})
}
async fn save(
&self,
source: &str,
etag: Option<&str>,
last_modified: Option<DateTime<Utc>>,
) -> Result<(), StoreError> {
sqlx::query(
r#"
INSERT INTO poller_state (source, etag, last_modified, last_fetched)
VALUES ($1, $2, $3, now())
ON CONFLICT (source) DO UPDATE
SET etag = EXCLUDED.etag,
last_modified = EXCLUDED.last_modified,
last_fetched = EXCLUDED.last_fetched
"#,
)
.bind(source)
.bind(etag)
.bind(last_modified)
.execute(&self.pool)
.await
.map_err(map_err)?;
Ok(())
}
async fn touch(&self, source: &str) -> Result<(), StoreError> {
sqlx::query(
r#"
INSERT INTO poller_state (source, last_fetched)
VALUES ($1, now())
ON CONFLICT (source) DO UPDATE
SET last_fetched = EXCLUDED.last_fetched
"#,
)
.bind(source)
.execute(&self.pool)
.await
.map_err(map_err)?;
Ok(())
}
}
#[async_trait] #[async_trait]
impl EventWriter for PgStore { impl EventWriter for PgStore {
async fn upsert_events(&self, events: &[Event]) -> Result<usize, StoreError> { async fn upsert_events(&self, events: &[Event]) -> Result<usize, StoreError> {

View File

@@ -1,5 +1,12 @@
use std::{sync::Arc, time::Duration};
use clap::Parser; use clap::Parser;
use moments_data::PgStore; use moments_core::{EventSource, run_poller};
use moments_data::{
PgStore,
github::{GithubConfig, GithubSource},
};
use reqwest::Client;
use tracing::info; use tracing::info;
#[derive(Parser, Debug)] #[derive(Parser, Debug)]
@@ -7,6 +14,17 @@ use tracing::info;
struct Args { struct Args {
#[arg(long, env = "DATABASE_URL")] #[arg(long, env = "DATABASE_URL")]
database_url: String, database_url: String,
#[arg(long, env = "GITHUB_USER", default_value = "grenade")]
github_user: String,
/// Optional GitHub token. Higher rate limit and access to private events.
#[arg(long, env = "GITHUB_TOKEN")]
github_token: Option<String>,
/// Seconds between poll attempts per source.
#[arg(long, env = "POLL_INTERVAL_SECS", default_value = "600")]
interval_secs: u64,
} }
#[tokio::main] #[tokio::main]
@@ -14,14 +32,36 @@ async fn main() -> anyhow::Result<()> {
init_tracing(); init_tracing();
let args = Args::parse(); let args = Args::parse();
let store = PgStore::connect(&args.database_url).await?; let store = Arc::new(PgStore::connect(&args.database_url).await?);
store.migrate().await?; store.migrate().await?;
info!("worker started — pollers will land in step 2"); let http = Client::builder()
.timeout(Duration::from_secs(30))
.build()?;
// Pollers (github, gitea, hg, bugzilla) land in subsequent steps. let github = Arc::new(GithubSource::new(
// For now this binary only verifies it can reach the database. http.clone(),
let _ = store; store.clone(),
store.clone(),
GithubConfig {
user: args.github_user.clone(),
token: args.github_token.clone(),
per_page: 100,
},
)) as Arc<dyn EventSource>;
info!(
github_user = args.github_user,
interval_secs = args.interval_secs,
"worker started"
);
let interval = Duration::from_secs(args.interval_secs);
let github_task = tokio::spawn(async move { run_poller(github, interval).await });
tokio::signal::ctrl_c().await?;
info!("shutdown signal received");
github_task.abort();
Ok(()) Ok(())
} }