Files
moments/crates/moments-data/src/github.rs
rob thijssen 3c0253519f feat: ingest private events; surface public-only
The DB now stores everything GitHub will give us, the API only ever
returns public events (for now).

Endpoint switch in the github poller: when GITHUB_TOKEN is set we
hit /users/{u}/events (public + private), otherwise fall back to
/users/{u}/events/public. Either way each event's top-level `public`
boolean is captured into a new column.

Schema:
  migration 0003_event_public.sql adds events.public BOOLEAN NOT NULL
  DEFAULT true, plus an index on (public, occurred_at DESC).

Wire:
  Event gains a `public: bool` field.
  EventQuery gains `include_private: bool` (default false).
  list_events and source_summaries gate on it.
  moments-api pins include_private = false at every call site —
  threading it as a query param is a future-auth concern, not now.

The default-true on the column keeps existing rows correct: the 11
events already in the DB came from /events/public and are genuinely
public.

After this change, clear poller_state so the next worker run does a
fresh backfill via /events:

  DELETE FROM poller_state WHERE source = 'github';

Tests: +2 in github poller (private flag captured, default-public
on missing field) — 10 total green.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-03 18:33:40 +03:00

299 lines
9.3 KiB
Rust

use std::sync::Arc;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use moments_core::{EventSource, EventWriter, PollerStateStore, SourceError};
use moments_entities::{Event, Source};
use reqwest::{Client, StatusCode, header};
use tracing::debug;
const SOURCE_NAME: &str = "github";
const USER_AGENT: &str = concat!(
"moments/",
env!("CARGO_PKG_VERSION"),
" (+https://rob.tn)"
);
/// Cap on initial backfill pagination. GitHub returns ~300 events max
/// across pages; this is a safety net, not an expected limit.
const MAX_BACKFILL_PAGES: usize = 10;
#[derive(Clone, Debug)]
pub struct GithubConfig {
pub user: String,
pub token: Option<String>,
pub per_page: u32,
}
impl Default for GithubConfig {
fn default() -> Self {
Self {
user: "grenade".into(),
token: None,
per_page: 100,
}
}
}
pub struct GithubSource {
client: Client,
writer: Arc<dyn EventWriter>,
state: Arc<dyn PollerStateStore>,
config: GithubConfig,
}
impl GithubSource {
pub fn new(
client: Client,
writer: Arc<dyn EventWriter>,
state: Arc<dyn PollerStateStore>,
config: GithubConfig,
) -> Self {
Self {
client,
writer,
state,
config,
}
}
fn first_page_url(&self) -> String {
// With a token: hit `/events`, which returns public + private events the
// authenticated user can see. We store everything; the API gates what
// gets surfaced to the public timeline via the `public` column.
// Without a token: fall back to `/events/public` (anonymous-readable).
let endpoint = if self.config.token.is_some() {
"events"
} else {
"events/public"
};
format!(
"https://api.github.com/users/{}/{endpoint}?per_page={}",
self.config.user, self.config.per_page
)
}
fn apply_common_headers(&self, mut req: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
req = req
.header(header::ACCEPT, "application/vnd.github+json")
.header("X-GitHub-Api-Version", "2022-11-28")
.header(header::USER_AGENT, USER_AGENT);
if let Some(token) = &self.config.token {
req = req.header(header::AUTHORIZATION, format!("Bearer {token}"));
}
req
}
}
#[async_trait]
impl EventSource for GithubSource {
fn name(&self) -> &'static str {
SOURCE_NAME
}
async fn poll(&self) -> Result<usize, SourceError> {
let prior = self.state.load(SOURCE_NAME).await?;
let prior_etag = prior.as_ref().and_then(|s| s.etag.clone());
let first_run = prior.is_none();
let mut url = self.first_page_url();
let mut total = 0usize;
let mut latest_etag: Option<String> = None;
let mut page_idx = 0usize;
loop {
let mut req = self.client.get(&url);
req = self.apply_common_headers(req);
// ETag conditional only on the first page; following Link "next"
// pages are historical and don't change.
if page_idx == 0 {
if let Some(etag) = &prior_etag {
req = req.header(header::IF_NONE_MATCH, etag);
}
}
let resp = req
.send()
.await
.map_err(|e| SourceError::Http(e.to_string()))?;
if resp.status() == StatusCode::NOT_MODIFIED {
// Only reachable on page 1, and only when we sent an ETag.
debug!(source = SOURCE_NAME, "304 not modified");
self.state.touch(SOURCE_NAME).await?;
return Ok(0);
}
if !resp.status().is_success() {
return Err(SourceError::Http(format!(
"{} {}",
resp.status(),
resp.url()
)));
}
if page_idx == 0 {
latest_etag = resp
.headers()
.get(header::ETAG)
.and_then(|v| v.to_str().ok())
.map(str::to_string);
}
let next_url = parse_link_next(resp.headers().get(header::LINK));
let raw_events: Vec<serde_json::Value> = resp
.json()
.await
.map_err(|e| SourceError::Parse(e.to_string()))?;
let events: Vec<Event> = raw_events
.into_iter()
.filter_map(parse_github_event)
.collect();
total += self.writer.upsert_events(&events).await?;
page_idx += 1;
// Subsequent runs only fetch page 1; the historical pages don't
// change and re-fetching them on every tick is waste.
if !first_run {
break;
}
if page_idx >= MAX_BACKFILL_PAGES {
break;
}
match next_url {
Some(u) => url = u,
None => break,
}
}
self.state.save(SOURCE_NAME, latest_etag.as_deref(), None).await?;
Ok(total)
}
}
fn parse_github_event(raw: serde_json::Value) -> Option<Event> {
let id = raw.get("id")?.as_str()?.to_string();
let event_type = raw.get("type")?.as_str()?.to_string();
let created_at_str = raw.get("created_at")?.as_str()?;
let occurred_at = DateTime::parse_from_rfc3339(created_at_str)
.ok()?
.with_timezone(&Utc);
// GitHub marks each event with a top-level `public` boolean. Events from
// `/events/public` are always true; `/events` may include false. Default
// to true if missing — that matches the safer-of-the-two-mistakes (under-
// expose) and the `/events/public` endpoint behaviour.
let public = raw.get("public").and_then(serde_json::Value::as_bool).unwrap_or(true);
Some(Event {
id: format!("github:{id}"),
source: Source::Github,
action: event_type,
occurred_at,
public,
payload: raw,
})
}
/// Parse the `next` URL out of a GitHub `Link` header.
/// Format: `<https://...?page=2>; rel="next", <https://...?page=10>; rel="last"`.
fn parse_link_next(header: Option<&header::HeaderValue>) -> Option<String> {
let raw = header?.to_str().ok()?;
for part in raw.split(',') {
let part = part.trim();
// Each part: `<url>; rel="next"`
let (url_part, rel_part) = part.split_once(';')?;
let url = url_part.trim().trim_start_matches('<').trim_end_matches('>');
let rel = rel_part.trim();
if rel.eq_ignore_ascii_case("rel=\"next\"") {
return Some(url.to_string());
}
}
None
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parses_minimal_event() {
let raw = serde_json::json!({
"id": "12345",
"type": "PushEvent",
"created_at": "2026-04-15T10:30:00Z",
"public": true,
"actor": { "login": "grenade" },
"repo": { "name": "grenade/moments" },
"payload": { "ref": "refs/heads/main" }
});
let ev = parse_github_event(raw.clone()).expect("parses");
assert_eq!(ev.id, "github:12345");
assert_eq!(ev.source, Source::Github);
assert_eq!(ev.action, "PushEvent");
assert!(ev.public);
assert_eq!(ev.payload, raw);
}
#[test]
fn private_event_marked_private() {
let raw = serde_json::json!({
"id": "67890",
"type": "PushEvent",
"created_at": "2026-04-15T10:30:00Z",
"public": false,
"actor": { "login": "grenade" },
"repo": { "name": "grenade/private-thing" },
"payload": {}
});
let ev = parse_github_event(raw).expect("parses");
assert!(!ev.public);
}
#[test]
fn missing_public_field_defaults_to_public() {
let raw = serde_json::json!({
"id": "11111",
"type": "PushEvent",
"created_at": "2026-04-15T10:30:00Z",
"actor": { "login": "grenade" },
"repo": { "name": "grenade/x" },
"payload": {}
});
let ev = parse_github_event(raw).expect("parses");
assert!(ev.public);
}
#[test]
fn rejects_event_missing_id() {
let raw = serde_json::json!({ "type": "PushEvent", "created_at": "2026-01-01T00:00:00Z" });
assert!(parse_github_event(raw).is_none());
}
#[test]
fn extracts_next_link() {
let mut h = header::HeaderMap::new();
h.insert(
header::LINK,
r#"<https://api.github.com/users/grenade/events?page=2>; rel="next", <https://api.github.com/users/grenade/events?page=10>; rel="last""#
.parse()
.unwrap(),
);
let next = parse_link_next(h.get(header::LINK));
assert_eq!(
next.as_deref(),
Some("https://api.github.com/users/grenade/events?page=2")
);
}
#[test]
fn no_next_link_when_only_prev() {
let mut h = header::HeaderMap::new();
h.insert(
header::LINK,
r#"<https://api.github.com/users/grenade/events?page=1>; rel="prev""#
.parse()
.unwrap(),
);
assert!(parse_link_next(h.get(header::LINK)).is_none());
}
}