feat: ingest private events; surface public-only
The DB now stores everything GitHub will give us, the API only ever
returns public events (for now).
Endpoint switch in the github poller: when GITHUB_TOKEN is set we
hit /users/{u}/events (public + private), otherwise fall back to
/users/{u}/events/public. Either way each event's top-level `public`
boolean is captured into a new column.
Schema:
migration 0003_event_public.sql adds events.public BOOLEAN NOT NULL
DEFAULT true, plus an index on (public, occurred_at DESC).
Wire:
Event gains a `public: bool` field.
EventQuery gains `include_private: bool` (default false).
list_events and source_summaries gate on it.
moments-api pins include_private = false at every call site —
threading it as a query param is a future-auth concern, not now.
The default-true on the column keeps existing rows correct: the 11
events already in the DB came from /events/public and are genuinely
public.
After this change, clear poller_state so the next worker run does a
fresh backfill via /events:
DELETE FROM poller_state WHERE source = 'github';
Tests: +2 in github poller (private flag captured, default-public
on missing field) — 10 total green.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -96,6 +96,9 @@ async fn list_events(
|
|||||||
from: params.from,
|
from: params.from,
|
||||||
to: params.to,
|
to: params.to,
|
||||||
sources,
|
sources,
|
||||||
|
// Public timeline only — private events stay in the DB but are never
|
||||||
|
// surfaced. A future authenticated path can flip this.
|
||||||
|
include_private: false,
|
||||||
limit,
|
limit,
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -107,7 +110,11 @@ async fn list_events(
|
|||||||
async fn list_sources(
|
async fn list_sources(
|
||||||
State(state): State<AppState>,
|
State(state): State<AppState>,
|
||||||
) -> Result<Json<Vec<SourceSummary>>, ApiError> {
|
) -> Result<Json<Vec<SourceSummary>>, ApiError> {
|
||||||
let summaries = state.store.source_summaries().await.map_err(internal)?;
|
let summaries = state
|
||||||
|
.store
|
||||||
|
.source_summaries(/* include_private */ false)
|
||||||
|
.await
|
||||||
|
.map_err(internal)?;
|
||||||
Ok(Json(summaries))
|
Ok(Json(summaries))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -17,7 +17,7 @@ pub enum StoreError {
|
|||||||
#[async_trait]
|
#[async_trait]
|
||||||
pub trait EventReader: Send + Sync {
|
pub trait EventReader: Send + Sync {
|
||||||
async fn list_events(&self, query: &EventQuery) -> Result<Vec<Event>, StoreError>;
|
async fn list_events(&self, query: &EventQuery) -> Result<Vec<Event>, StoreError>;
|
||||||
async fn source_summaries(&self) -> Result<Vec<SourceSummary>, StoreError>;
|
async fn source_summaries(&self, include_private: bool) -> Result<Vec<SourceSummary>, StoreError>;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Write-side port consumed by `moments-worker`. Idempotent upserts on `id`.
|
/// Write-side port consumed by `moments-worker`. Idempotent upserts on `id`.
|
||||||
|
|||||||
@@ -383,6 +383,7 @@ mod tests {
|
|||||||
source: Source::Github,
|
source: Source::Github,
|
||||||
action: action.into(),
|
action: action.into(),
|
||||||
occurred_at: Utc.with_ymd_and_hms(2026, 4, 14, 10, 0, 0).unwrap(),
|
occurred_at: Utc.with_ymd_and_hms(2026, 4, 14, 10, 0, 0).unwrap(),
|
||||||
|
public: true,
|
||||||
payload,
|
payload,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
2
crates/moments-data/migrations/0003_event_public.sql
Normal file
2
crates/moments-data/migrations/0003_event_public.sql
Normal file
@@ -0,0 +1,2 @@
|
|||||||
|
ALTER TABLE events ADD COLUMN public BOOLEAN NOT NULL DEFAULT true;
|
||||||
|
CREATE INDEX events_public_occurred_at_desc ON events (public, occurred_at DESC);
|
||||||
@@ -58,11 +58,17 @@ impl GithubSource {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn first_page_url(&self) -> String {
|
fn first_page_url(&self) -> String {
|
||||||
// Public events endpoint: works without auth (60/hr unauth, 5000/hr authed).
|
// With a token: hit `/events`, which returns public + private events the
|
||||||
// The non-public `/users/{u}/events` endpoint now requires auth and returns
|
// authenticated user can see. We store everything; the API gates what
|
||||||
// private-repo activity, which we don't want on a public timeline anyway.
|
// gets surfaced to the public timeline via the `public` column.
|
||||||
|
// Without a token: fall back to `/events/public` (anonymous-readable).
|
||||||
|
let endpoint = if self.config.token.is_some() {
|
||||||
|
"events"
|
||||||
|
} else {
|
||||||
|
"events/public"
|
||||||
|
};
|
||||||
format!(
|
format!(
|
||||||
"https://api.github.com/users/{}/events/public?per_page={}",
|
"https://api.github.com/users/{}/{endpoint}?per_page={}",
|
||||||
self.config.user, self.config.per_page
|
self.config.user, self.config.per_page
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
@@ -172,11 +178,17 @@ fn parse_github_event(raw: serde_json::Value) -> Option<Event> {
|
|||||||
let occurred_at = DateTime::parse_from_rfc3339(created_at_str)
|
let occurred_at = DateTime::parse_from_rfc3339(created_at_str)
|
||||||
.ok()?
|
.ok()?
|
||||||
.with_timezone(&Utc);
|
.with_timezone(&Utc);
|
||||||
|
// GitHub marks each event with a top-level `public` boolean. Events from
|
||||||
|
// `/events/public` are always true; `/events` may include false. Default
|
||||||
|
// to true if missing — that matches the safer-of-the-two-mistakes (under-
|
||||||
|
// expose) and the `/events/public` endpoint behaviour.
|
||||||
|
let public = raw.get("public").and_then(serde_json::Value::as_bool).unwrap_or(true);
|
||||||
Some(Event {
|
Some(Event {
|
||||||
id: format!("github:{id}"),
|
id: format!("github:{id}"),
|
||||||
source: Source::Github,
|
source: Source::Github,
|
||||||
action: event_type,
|
action: event_type,
|
||||||
occurred_at,
|
occurred_at,
|
||||||
|
public,
|
||||||
payload: raw,
|
payload: raw,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@@ -208,6 +220,7 @@ mod tests {
|
|||||||
"id": "12345",
|
"id": "12345",
|
||||||
"type": "PushEvent",
|
"type": "PushEvent",
|
||||||
"created_at": "2026-04-15T10:30:00Z",
|
"created_at": "2026-04-15T10:30:00Z",
|
||||||
|
"public": true,
|
||||||
"actor": { "login": "grenade" },
|
"actor": { "login": "grenade" },
|
||||||
"repo": { "name": "grenade/moments" },
|
"repo": { "name": "grenade/moments" },
|
||||||
"payload": { "ref": "refs/heads/main" }
|
"payload": { "ref": "refs/heads/main" }
|
||||||
@@ -216,9 +229,39 @@ mod tests {
|
|||||||
assert_eq!(ev.id, "github:12345");
|
assert_eq!(ev.id, "github:12345");
|
||||||
assert_eq!(ev.source, Source::Github);
|
assert_eq!(ev.source, Source::Github);
|
||||||
assert_eq!(ev.action, "PushEvent");
|
assert_eq!(ev.action, "PushEvent");
|
||||||
|
assert!(ev.public);
|
||||||
assert_eq!(ev.payload, raw);
|
assert_eq!(ev.payload, raw);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn private_event_marked_private() {
|
||||||
|
let raw = serde_json::json!({
|
||||||
|
"id": "67890",
|
||||||
|
"type": "PushEvent",
|
||||||
|
"created_at": "2026-04-15T10:30:00Z",
|
||||||
|
"public": false,
|
||||||
|
"actor": { "login": "grenade" },
|
||||||
|
"repo": { "name": "grenade/private-thing" },
|
||||||
|
"payload": {}
|
||||||
|
});
|
||||||
|
let ev = parse_github_event(raw).expect("parses");
|
||||||
|
assert!(!ev.public);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn missing_public_field_defaults_to_public() {
|
||||||
|
let raw = serde_json::json!({
|
||||||
|
"id": "11111",
|
||||||
|
"type": "PushEvent",
|
||||||
|
"created_at": "2026-04-15T10:30:00Z",
|
||||||
|
"actor": { "login": "grenade" },
|
||||||
|
"repo": { "name": "grenade/x" },
|
||||||
|
"payload": {}
|
||||||
|
});
|
||||||
|
let ev = parse_github_event(raw).expect("parses");
|
||||||
|
assert!(ev.public);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn rejects_event_missing_id() {
|
fn rejects_event_missing_id() {
|
||||||
let raw = serde_json::json!({ "type": "PushEvent", "created_at": "2026-01-01T00:00:00Z" });
|
let raw = serde_json::json!({ "type": "PushEvent", "created_at": "2026-01-01T00:00:00Z" });
|
||||||
|
|||||||
@@ -43,18 +43,20 @@ impl EventReader for PgStore {
|
|||||||
|
|
||||||
let rows = sqlx::query(
|
let rows = sqlx::query(
|
||||||
r#"
|
r#"
|
||||||
SELECT id, source, action, occurred_at, payload
|
SELECT id, source, action, occurred_at, public, payload
|
||||||
FROM events
|
FROM events
|
||||||
WHERE ($1::timestamptz IS NULL OR occurred_at >= $1)
|
WHERE ($1::timestamptz IS NULL OR occurred_at >= $1)
|
||||||
AND ($2::timestamptz IS NULL OR occurred_at < $2)
|
AND ($2::timestamptz IS NULL OR occurred_at < $2)
|
||||||
AND ($3::text[] IS NULL OR source = ANY($3))
|
AND ($3::text[] IS NULL OR source = ANY($3))
|
||||||
|
AND ($4::bool OR public = true)
|
||||||
ORDER BY occurred_at DESC
|
ORDER BY occurred_at DESC
|
||||||
LIMIT $4
|
LIMIT $5
|
||||||
"#,
|
"#,
|
||||||
)
|
)
|
||||||
.bind(query.from)
|
.bind(query.from)
|
||||||
.bind(query.to)
|
.bind(query.to)
|
||||||
.bind(sources.as_deref())
|
.bind(sources.as_deref())
|
||||||
|
.bind(query.include_private)
|
||||||
.bind(query.limit as i64)
|
.bind(query.limit as i64)
|
||||||
.fetch_all(&self.pool)
|
.fetch_all(&self.pool)
|
||||||
.await
|
.await
|
||||||
@@ -68,13 +70,14 @@ impl EventReader for PgStore {
|
|||||||
source: Source::from_str(&source_str).map_err(map_err)?,
|
source: Source::from_str(&source_str).map_err(map_err)?,
|
||||||
action: r.try_get("action").map_err(map_err)?,
|
action: r.try_get("action").map_err(map_err)?,
|
||||||
occurred_at: r.try_get("occurred_at").map_err(map_err)?,
|
occurred_at: r.try_get("occurred_at").map_err(map_err)?,
|
||||||
|
public: r.try_get("public").map_err(map_err)?,
|
||||||
payload: r.try_get("payload").map_err(map_err)?,
|
payload: r.try_get("payload").map_err(map_err)?,
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
.collect()
|
.collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn source_summaries(&self) -> Result<Vec<SourceSummary>, StoreError> {
|
async fn source_summaries(&self, include_private: bool) -> Result<Vec<SourceSummary>, StoreError> {
|
||||||
let rows = sqlx::query(
|
let rows = sqlx::query(
|
||||||
r#"
|
r#"
|
||||||
SELECT source,
|
SELECT source,
|
||||||
@@ -82,10 +85,12 @@ impl EventReader for PgStore {
|
|||||||
MIN(occurred_at) AS earliest,
|
MIN(occurred_at) AS earliest,
|
||||||
MAX(occurred_at) AS latest
|
MAX(occurred_at) AS latest
|
||||||
FROM events
|
FROM events
|
||||||
|
WHERE $1::bool OR public = true
|
||||||
GROUP BY source
|
GROUP BY source
|
||||||
ORDER BY source
|
ORDER BY source
|
||||||
"#,
|
"#,
|
||||||
)
|
)
|
||||||
|
.bind(include_private)
|
||||||
.fetch_all(&self.pool)
|
.fetch_all(&self.pool)
|
||||||
.await
|
.await
|
||||||
.map_err(map_err)?;
|
.map_err(map_err)?;
|
||||||
@@ -187,12 +192,13 @@ impl EventWriter for PgStore {
|
|||||||
for ev in events {
|
for ev in events {
|
||||||
let n = sqlx::query(
|
let n = sqlx::query(
|
||||||
r#"
|
r#"
|
||||||
INSERT INTO events (id, source, action, occurred_at, payload)
|
INSERT INTO events (id, source, action, occurred_at, public, payload)
|
||||||
VALUES ($1, $2, $3, $4, $5)
|
VALUES ($1, $2, $3, $4, $5, $6)
|
||||||
ON CONFLICT (id) DO UPDATE
|
ON CONFLICT (id) DO UPDATE
|
||||||
SET source = EXCLUDED.source,
|
SET source = EXCLUDED.source,
|
||||||
action = EXCLUDED.action,
|
action = EXCLUDED.action,
|
||||||
occurred_at = EXCLUDED.occurred_at,
|
occurred_at = EXCLUDED.occurred_at,
|
||||||
|
public = EXCLUDED.public,
|
||||||
payload = EXCLUDED.payload
|
payload = EXCLUDED.payload
|
||||||
"#,
|
"#,
|
||||||
)
|
)
|
||||||
@@ -200,6 +206,7 @@ impl EventWriter for PgStore {
|
|||||||
.bind(ev.source.as_str())
|
.bind(ev.source.as_str())
|
||||||
.bind(&ev.action)
|
.bind(&ev.action)
|
||||||
.bind(ev.occurred_at)
|
.bind(ev.occurred_at)
|
||||||
|
.bind(ev.public)
|
||||||
.bind(&ev.payload)
|
.bind(&ev.payload)
|
||||||
.execute(&mut *tx)
|
.execute(&mut *tx)
|
||||||
.await
|
.await
|
||||||
|
|||||||
@@ -54,6 +54,10 @@ pub struct Event {
|
|||||||
pub source: Source,
|
pub source: Source,
|
||||||
pub action: String,
|
pub action: String,
|
||||||
pub occurred_at: DateTime<Utc>,
|
pub occurred_at: DateTime<Utc>,
|
||||||
|
/// True when the upstream marks this event as visible to anyone (e.g.
|
||||||
|
/// GitHub's top-level `public` flag). The DB stores everything; the API
|
||||||
|
/// uses this to gate what gets surfaced on the public timeline.
|
||||||
|
pub public: bool,
|
||||||
pub payload: serde_json::Value,
|
pub payload: serde_json::Value,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -63,6 +67,9 @@ pub struct EventQuery {
|
|||||||
pub from: Option<DateTime<Utc>>,
|
pub from: Option<DateTime<Utc>>,
|
||||||
pub to: Option<DateTime<Utc>>,
|
pub to: Option<DateTime<Utc>>,
|
||||||
pub sources: Option<Vec<Source>>,
|
pub sources: Option<Vec<Source>>,
|
||||||
|
/// When false (default), only `public = true` rows are returned. The API
|
||||||
|
/// pins this to false today; a future authenticated path can flip it.
|
||||||
|
pub include_private: bool,
|
||||||
pub limit: u32,
|
pub limit: u32,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user