Files
moments/crates/moments-data/src/lib.rs
rob thijssen e4052c4c9a feat(worker): add github search api source for historical backfill
The Events API is hard-capped at 90 days (15 events for grenade
right now). The Search API has its own 1000-result-per-query cap
but reaches the start of the user's GitHub history — for grenade,
430 issues/PRs going back to 2012-08-08.

  GET /search/issues?q=author:<user>&sort=created&order=desc

Polled hourly by default but defaults to 24h interval since this is
backfill, not a live feed. After the first run most upserts are
no-ops. Stored as Source::Github with action "Issue" or "PullRequest"
(distinguished by the .pull_request field on the search item),
keyed `github-issue:<owner>/<repo>#<n>`.

/search/commits is deliberately not used: GitHub matches the same
commit across every fork that contains it, so 275k of grenade's
"commits" are mostly duplicated fork hits in repos he never authored
to. If commit history becomes valuable we should enumerate his repos
and walk per-repo /commits?author= instead.

Visibility: search/issues items don't carry .private, so we lookup
/repos/{full_name} once per unique repo encountered (cached for the
duration of the poll). Failure to resolve is treated as private —
better to under-expose than over-expose on the public timeline.

Reshape: presentation/github.rs gains an Issue/PullRequest path that
extracts from the search item shape (html_url, number, title, state,
.pull_request.merged_at) rather than the events-API wrapper. Merged
PRs use the GitMerge icon, mirroring the events-API path.

Worker now spawns two tokio tasks (events + search), aborts both
on SIGINT. New env: SEARCH_POLL_INTERVAL_SECS (default 86400).

Tests: +2 in moments-data (URL parsing), +2 in moments-core
(search Issue + merged-PR reshape) — 14 total green.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-03 18:49:06 +03:00

222 lines
6.9 KiB
Rust

pub mod github;
pub mod github_search;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use moments_core::{EventReader, EventWriter, PollerState, PollerStateStore, StoreError};
use moments_entities::{Event, EventQuery, Source, SourceSummary};
use sqlx::Row;
use sqlx::postgres::{PgPool, PgPoolOptions};
use std::str::FromStr;
pub static MIGRATOR: sqlx::migrate::Migrator = sqlx::migrate!("./migrations");
#[derive(Clone)]
pub struct PgStore {
pool: PgPool,
}
impl PgStore {
pub async fn connect(database_url: &str) -> Result<Self, sqlx::Error> {
let pool = PgPoolOptions::new()
.max_connections(8)
.connect(database_url)
.await?;
Ok(Self { pool })
}
pub async fn migrate(&self) -> Result<(), sqlx::migrate::MigrateError> {
MIGRATOR.run(&self.pool).await
}
}
fn map_err<E: std::fmt::Display>(e: E) -> StoreError {
StoreError::Database(e.to_string())
}
#[async_trait]
impl EventReader for PgStore {
async fn list_events(&self, query: &EventQuery) -> Result<Vec<Event>, StoreError> {
let sources: Option<Vec<String>> = query
.sources
.as_ref()
.map(|s| s.iter().map(|x| x.as_str().to_string()).collect());
let rows = sqlx::query(
r#"
SELECT id, source, action, occurred_at, public, payload
FROM events
WHERE ($1::timestamptz IS NULL OR occurred_at >= $1)
AND ($2::timestamptz IS NULL OR occurred_at < $2)
AND ($3::text[] IS NULL OR source = ANY($3))
AND ($4::bool OR public = true)
ORDER BY occurred_at DESC
LIMIT $5
"#,
)
.bind(query.from)
.bind(query.to)
.bind(sources.as_deref())
.bind(query.include_private)
.bind(query.limit as i64)
.fetch_all(&self.pool)
.await
.map_err(map_err)?;
rows.into_iter()
.map(|r| {
let source_str: String = r.try_get("source").map_err(map_err)?;
Ok(Event {
id: r.try_get("id").map_err(map_err)?,
source: Source::from_str(&source_str).map_err(map_err)?,
action: r.try_get("action").map_err(map_err)?,
occurred_at: r.try_get("occurred_at").map_err(map_err)?,
public: r.try_get("public").map_err(map_err)?,
payload: r.try_get("payload").map_err(map_err)?,
})
})
.collect()
}
async fn source_summaries(&self, include_private: bool) -> Result<Vec<SourceSummary>, StoreError> {
let rows = sqlx::query(
r#"
SELECT source,
COUNT(*) AS count,
MIN(occurred_at) AS earliest,
MAX(occurred_at) AS latest
FROM events
WHERE $1::bool OR public = true
GROUP BY source
ORDER BY source
"#,
)
.bind(include_private)
.fetch_all(&self.pool)
.await
.map_err(map_err)?;
rows.into_iter()
.map(|r| {
let source_str: String = r.try_get("source").map_err(map_err)?;
let earliest: Option<DateTime<Utc>> = r.try_get("earliest").map_err(map_err)?;
let latest: Option<DateTime<Utc>> = r.try_get("latest").map_err(map_err)?;
let count: i64 = r.try_get("count").map_err(map_err)?;
Ok(SourceSummary {
source: Source::from_str(&source_str).map_err(map_err)?,
count,
earliest,
latest,
})
})
.collect()
}
}
#[async_trait]
impl PollerStateStore for PgStore {
async fn load(&self, source: &str) -> Result<Option<PollerState>, StoreError> {
let row = sqlx::query(
r#"
SELECT source, etag, last_modified, last_fetched
FROM poller_state
WHERE source = $1
"#,
)
.bind(source)
.fetch_optional(&self.pool)
.await
.map_err(map_err)?;
Ok(match row {
None => None,
Some(r) => Some(PollerState {
source: r.try_get("source").map_err(map_err)?,
etag: r.try_get("etag").map_err(map_err)?,
last_modified: r.try_get("last_modified").map_err(map_err)?,
last_fetched: r.try_get("last_fetched").map_err(map_err)?,
}),
})
}
async fn save(
&self,
source: &str,
etag: Option<&str>,
last_modified: Option<DateTime<Utc>>,
) -> Result<(), StoreError> {
sqlx::query(
r#"
INSERT INTO poller_state (source, etag, last_modified, last_fetched)
VALUES ($1, $2, $3, now())
ON CONFLICT (source) DO UPDATE
SET etag = EXCLUDED.etag,
last_modified = EXCLUDED.last_modified,
last_fetched = EXCLUDED.last_fetched
"#,
)
.bind(source)
.bind(etag)
.bind(last_modified)
.execute(&self.pool)
.await
.map_err(map_err)?;
Ok(())
}
async fn touch(&self, source: &str) -> Result<(), StoreError> {
sqlx::query(
r#"
INSERT INTO poller_state (source, last_fetched)
VALUES ($1, now())
ON CONFLICT (source) DO UPDATE
SET last_fetched = EXCLUDED.last_fetched
"#,
)
.bind(source)
.execute(&self.pool)
.await
.map_err(map_err)?;
Ok(())
}
}
#[async_trait]
impl EventWriter for PgStore {
async fn upsert_events(&self, events: &[Event]) -> Result<usize, StoreError> {
if events.is_empty() {
return Ok(0);
}
let mut tx = self.pool.begin().await.map_err(map_err)?;
let mut inserted = 0;
for ev in events {
let n = sqlx::query(
r#"
INSERT INTO events (id, source, action, occurred_at, public, payload)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (id) DO UPDATE
SET source = EXCLUDED.source,
action = EXCLUDED.action,
occurred_at = EXCLUDED.occurred_at,
public = EXCLUDED.public,
payload = EXCLUDED.payload
"#,
)
.bind(&ev.id)
.bind(ev.source.as_str())
.bind(&ev.action)
.bind(ev.occurred_at)
.bind(ev.public)
.bind(&ev.payload)
.execute(&mut *tx)
.await
.map_err(map_err)?
.rows_affected();
inserted += n as usize;
}
tx.commit().await.map_err(map_err)?;
Ok(inserted)
}
}