pub mod bugzilla; pub mod gitea; pub mod github; pub mod github_repo; pub mod github_search; pub mod hg; use async_trait::async_trait; use chrono::{DateTime, Utc}; use moments_core::{EventReader, EventWriter, PollerState, PollerStateStore, StoreError}; use moments_entities::{Event, EventQuery, Source, SourceSummary}; use sqlx::Row; use sqlx::postgres::{PgPool, PgPoolOptions}; use std::str::FromStr; pub static MIGRATOR: sqlx::migrate::Migrator = sqlx::migrate!("./migrations"); #[derive(Clone)] pub struct PgStore { pool: PgPool, } impl PgStore { pub async fn connect(database_url: &str) -> Result { let pool = PgPoolOptions::new() .max_connections(8) .connect(database_url) .await?; Ok(Self { pool }) } pub async fn migrate(&self) -> Result<(), sqlx::migrate::MigrateError> { MIGRATOR.run(&self.pool).await } } fn map_err(e: E) -> StoreError { StoreError::Database(e.to_string()) } #[async_trait] impl EventReader for PgStore { async fn list_events(&self, query: &EventQuery) -> Result, StoreError> { let sources: Option> = query .sources .as_ref() .map(|s| s.iter().map(|x| x.as_str().to_string()).collect()); let rows = sqlx::query( r#" SELECT id, source, action, occurred_at, public, payload FROM events WHERE ($1::timestamptz IS NULL OR occurred_at >= $1) AND ($2::timestamptz IS NULL OR occurred_at < $2) AND ($3::text[] IS NULL OR source = ANY($3)) AND ($4::bool OR public = true) ORDER BY occurred_at DESC LIMIT $5 "#, ) .bind(query.from) .bind(query.to) .bind(sources.as_deref()) .bind(query.include_private) .bind(query.limit as i64) .fetch_all(&self.pool) .await .map_err(map_err)?; rows.into_iter() .map(|r| { let source_str: String = r.try_get("source").map_err(map_err)?; Ok(Event { id: r.try_get("id").map_err(map_err)?, source: Source::from_str(&source_str).map_err(map_err)?, action: r.try_get("action").map_err(map_err)?, occurred_at: r.try_get("occurred_at").map_err(map_err)?, public: r.try_get("public").map_err(map_err)?, payload: r.try_get("payload").map_err(map_err)?, }) }) .collect() } async fn source_summaries(&self, include_private: bool) -> Result, StoreError> { let rows = sqlx::query( r#" SELECT source, COUNT(*) AS count, MIN(occurred_at) AS earliest, MAX(occurred_at) AS latest FROM events WHERE $1::bool OR public = true GROUP BY source ORDER BY source "#, ) .bind(include_private) .fetch_all(&self.pool) .await .map_err(map_err)?; rows.into_iter() .map(|r| { let source_str: String = r.try_get("source").map_err(map_err)?; let earliest: Option> = r.try_get("earliest").map_err(map_err)?; let latest: Option> = r.try_get("latest").map_err(map_err)?; let count: i64 = r.try_get("count").map_err(map_err)?; Ok(SourceSummary { source: Source::from_str(&source_str).map_err(map_err)?, count, earliest, latest, }) }) .collect() } } #[async_trait] impl PollerStateStore for PgStore { async fn load(&self, source: &str) -> Result, StoreError> { let row = sqlx::query( r#" SELECT source, etag, last_modified, last_fetched FROM poller_state WHERE source = $1 "#, ) .bind(source) .fetch_optional(&self.pool) .await .map_err(map_err)?; Ok(match row { None => None, Some(r) => Some(PollerState { source: r.try_get("source").map_err(map_err)?, etag: r.try_get("etag").map_err(map_err)?, last_modified: r.try_get("last_modified").map_err(map_err)?, last_fetched: r.try_get("last_fetched").map_err(map_err)?, }), }) } async fn save( &self, source: &str, etag: Option<&str>, last_modified: Option>, ) -> Result<(), StoreError> { sqlx::query( r#" INSERT INTO poller_state (source, etag, last_modified, last_fetched) VALUES ($1, $2, $3, now()) ON CONFLICT (source) DO UPDATE SET etag = EXCLUDED.etag, last_modified = EXCLUDED.last_modified, last_fetched = EXCLUDED.last_fetched "#, ) .bind(source) .bind(etag) .bind(last_modified) .execute(&self.pool) .await .map_err(map_err)?; Ok(()) } async fn touch(&self, source: &str) -> Result<(), StoreError> { sqlx::query( r#" INSERT INTO poller_state (source, last_fetched) VALUES ($1, now()) ON CONFLICT (source) DO UPDATE SET last_fetched = EXCLUDED.last_fetched "#, ) .bind(source) .execute(&self.pool) .await .map_err(map_err)?; Ok(()) } } #[async_trait] impl EventWriter for PgStore { async fn upsert_events(&self, events: &[Event]) -> Result { if events.is_empty() { return Ok(0); } let mut tx = self.pool.begin().await.map_err(map_err)?; let mut inserted = 0; for ev in events { let n = sqlx::query( r#" INSERT INTO events (id, source, action, occurred_at, public, payload) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (id) DO UPDATE SET source = EXCLUDED.source, action = EXCLUDED.action, occurred_at = EXCLUDED.occurred_at, public = EXCLUDED.public, payload = EXCLUDED.payload "#, ) .bind(&ev.id) .bind(ev.source.as_str()) .bind(&ev.action) .bind(ev.occurred_at) .bind(ev.public) .bind(&ev.payload) .execute(&mut *tx) .await .map_err(map_err)? .rows_affected(); inserted += n as usize; } tx.commit().await.map_err(map_err)?; Ok(inserted) } }