chore: scaffold moments workspace
Cargo workspace with five crates per architecture conventions:
- moments-entities: Source enum, Event, EventQuery, SourceSummary
- moments-core: EventReader / EventWriter ports
- moments-data: PgStore (sqlx postgres adapter) + 0001_init.sql
- moments-api: axum binary; /v1/{healthz,events,sources}
- moments-worker: skeleton; pollers land in step 2
Sources committed-to for ingestion: github, gitea, hg, bugzilla.
Workstation events explicitly retired (not deferred).
Build + clippy clean. sqlx queries use the runtime API for now;
will switch to compile-time-checked macros + .sqlx offline cache
once magrathea has the moments_{ro,rw} roles and database created.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
17
crates/moments-data/Cargo.toml
Normal file
17
crates/moments-data/Cargo.toml
Normal file
@@ -0,0 +1,17 @@
|
||||
[package]
|
||||
name = "moments-data"
|
||||
version.workspace = true
|
||||
edition.workspace = true
|
||||
rust-version.workspace = true
|
||||
license.workspace = true
|
||||
authors.workspace = true
|
||||
|
||||
[dependencies]
|
||||
moments-entities.workspace = true
|
||||
moments-core.workspace = true
|
||||
sqlx.workspace = true
|
||||
serde_json.workspace = true
|
||||
chrono.workspace = true
|
||||
thiserror.workspace = true
|
||||
tracing.workspace = true
|
||||
async-trait = "0.1"
|
||||
11
crates/moments-data/migrations/0001_init.sql
Normal file
11
crates/moments-data/migrations/0001_init.sql
Normal file
@@ -0,0 +1,11 @@
|
||||
CREATE TABLE events (
|
||||
id TEXT PRIMARY KEY,
|
||||
source TEXT NOT NULL,
|
||||
action TEXT NOT NULL,
|
||||
occurred_at TIMESTAMPTZ NOT NULL,
|
||||
payload JSONB NOT NULL,
|
||||
fetched_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
||||
);
|
||||
|
||||
CREATE INDEX events_occurred_at_desc ON events (occurred_at DESC);
|
||||
CREATE INDEX events_source_occurred_at_desc ON events (source, occurred_at DESC);
|
||||
143
crates/moments-data/src/lib.rs
Normal file
143
crates/moments-data/src/lib.rs
Normal file
@@ -0,0 +1,143 @@
|
||||
use async_trait::async_trait;
|
||||
use chrono::{DateTime, Utc};
|
||||
use moments_core::{EventReader, EventWriter, StoreError};
|
||||
use moments_entities::{Event, EventQuery, Source, SourceSummary};
|
||||
use sqlx::Row;
|
||||
use sqlx::postgres::{PgPool, PgPoolOptions};
|
||||
use std::str::FromStr;
|
||||
|
||||
pub static MIGRATOR: sqlx::migrate::Migrator = sqlx::migrate!("./migrations");
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct PgStore {
|
||||
pool: PgPool,
|
||||
}
|
||||
|
||||
impl PgStore {
|
||||
pub async fn connect(database_url: &str) -> Result<Self, sqlx::Error> {
|
||||
let pool = PgPoolOptions::new()
|
||||
.max_connections(8)
|
||||
.connect(database_url)
|
||||
.await?;
|
||||
Ok(Self { pool })
|
||||
}
|
||||
|
||||
pub async fn migrate(&self) -> Result<(), sqlx::migrate::MigrateError> {
|
||||
MIGRATOR.run(&self.pool).await
|
||||
}
|
||||
}
|
||||
|
||||
fn map_err<E: std::fmt::Display>(e: E) -> StoreError {
|
||||
StoreError::Database(e.to_string())
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl EventReader for PgStore {
|
||||
async fn list_events(&self, query: &EventQuery) -> Result<Vec<Event>, StoreError> {
|
||||
let sources: Option<Vec<String>> = query
|
||||
.sources
|
||||
.as_ref()
|
||||
.map(|s| s.iter().map(|x| x.as_str().to_string()).collect());
|
||||
|
||||
let rows = sqlx::query(
|
||||
r#"
|
||||
SELECT id, source, action, occurred_at, payload
|
||||
FROM events
|
||||
WHERE ($1::timestamptz IS NULL OR occurred_at >= $1)
|
||||
AND ($2::timestamptz IS NULL OR occurred_at < $2)
|
||||
AND ($3::text[] IS NULL OR source = ANY($3))
|
||||
ORDER BY occurred_at DESC
|
||||
LIMIT $4
|
||||
"#,
|
||||
)
|
||||
.bind(query.from)
|
||||
.bind(query.to)
|
||||
.bind(sources.as_deref())
|
||||
.bind(query.limit as i64)
|
||||
.fetch_all(&self.pool)
|
||||
.await
|
||||
.map_err(map_err)?;
|
||||
|
||||
rows.into_iter()
|
||||
.map(|r| {
|
||||
let source_str: String = r.try_get("source").map_err(map_err)?;
|
||||
Ok(Event {
|
||||
id: r.try_get("id").map_err(map_err)?,
|
||||
source: Source::from_str(&source_str).map_err(map_err)?,
|
||||
action: r.try_get("action").map_err(map_err)?,
|
||||
occurred_at: r.try_get("occurred_at").map_err(map_err)?,
|
||||
payload: r.try_get("payload").map_err(map_err)?,
|
||||
})
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
async fn source_summaries(&self) -> Result<Vec<SourceSummary>, StoreError> {
|
||||
let rows = sqlx::query(
|
||||
r#"
|
||||
SELECT source,
|
||||
COUNT(*) AS count,
|
||||
MIN(occurred_at) AS earliest,
|
||||
MAX(occurred_at) AS latest
|
||||
FROM events
|
||||
GROUP BY source
|
||||
ORDER BY source
|
||||
"#,
|
||||
)
|
||||
.fetch_all(&self.pool)
|
||||
.await
|
||||
.map_err(map_err)?;
|
||||
|
||||
rows.into_iter()
|
||||
.map(|r| {
|
||||
let source_str: String = r.try_get("source").map_err(map_err)?;
|
||||
let earliest: Option<DateTime<Utc>> = r.try_get("earliest").map_err(map_err)?;
|
||||
let latest: Option<DateTime<Utc>> = r.try_get("latest").map_err(map_err)?;
|
||||
let count: i64 = r.try_get("count").map_err(map_err)?;
|
||||
Ok(SourceSummary {
|
||||
source: Source::from_str(&source_str).map_err(map_err)?,
|
||||
count,
|
||||
earliest,
|
||||
latest,
|
||||
})
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl EventWriter for PgStore {
|
||||
async fn upsert_events(&self, events: &[Event]) -> Result<usize, StoreError> {
|
||||
if events.is_empty() {
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
let mut tx = self.pool.begin().await.map_err(map_err)?;
|
||||
let mut inserted = 0;
|
||||
for ev in events {
|
||||
let n = sqlx::query(
|
||||
r#"
|
||||
INSERT INTO events (id, source, action, occurred_at, payload)
|
||||
VALUES ($1, $2, $3, $4, $5)
|
||||
ON CONFLICT (id) DO UPDATE
|
||||
SET source = EXCLUDED.source,
|
||||
action = EXCLUDED.action,
|
||||
occurred_at = EXCLUDED.occurred_at,
|
||||
payload = EXCLUDED.payload
|
||||
"#,
|
||||
)
|
||||
.bind(&ev.id)
|
||||
.bind(ev.source.as_str())
|
||||
.bind(&ev.action)
|
||||
.bind(ev.occurred_at)
|
||||
.bind(&ev.payload)
|
||||
.execute(&mut *tx)
|
||||
.await
|
||||
.map_err(map_err)?
|
||||
.rows_affected();
|
||||
inserted += n as usize;
|
||||
}
|
||||
tx.commit().await.map_err(map_err)?;
|
||||
Ok(inserted)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user