pub mod bugzilla; pub mod gitea; pub mod github; pub mod github_repo; pub mod github_search; pub mod hg; use async_trait::async_trait; use chrono::{DateTime, Utc}; use moments_core::{EventReader, EventWriter, PollerState, PollerStateStore, StoreError}; use chrono::NaiveDate; use moments_entities::{DailyCount, Event, EventQuery, ProjectSummary, Source, SourceSummary}; use sqlx::Row; use sqlx::postgres::{PgPool, PgPoolOptions}; use std::str::FromStr; pub static MIGRATOR: sqlx::migrate::Migrator = sqlx::migrate!("./migrations"); #[derive(Clone)] pub struct PgStore { pool: PgPool, } impl PgStore { pub async fn connect(database_url: &str) -> Result { let pool = PgPoolOptions::new() .max_connections(8) .connect(database_url) .await?; Ok(Self { pool }) } pub async fn migrate(&self) -> Result<(), sqlx::migrate::MigrateError> { MIGRATOR.run(&self.pool).await } } fn map_err(e: E) -> StoreError { StoreError::Database(e.to_string()) } #[async_trait] impl EventReader for PgStore { async fn list_events(&self, query: &EventQuery) -> Result, StoreError> { let sources: Option> = query .sources .as_ref() .map(|s| s.iter().map(|x| x.as_str().to_string()).collect()); let rows = sqlx::query( r#" SELECT id, source, action, occurred_at, public, payload FROM events WHERE ($1::timestamptz IS NULL OR occurred_at >= $1) AND ($2::timestamptz IS NULL OR occurred_at < $2) AND ($3::text[] IS NULL OR source = ANY($3)) AND ($4::bool OR public = true) AND ($6::text IS NULL OR (CASE source WHEN 'github' THEN COALESCE( payload->'repo'->>'name', payload->'repository'->>'full_name', payload->>'_repo' ) WHEN 'gitea' THEN COALESCE( payload->'repo'->>'full_name', payload->'repo'->>'name' ) WHEN 'hg' THEN payload->>'_repo' WHEN 'bugzilla' THEN payload->>'product' ELSE NULL END) = $6) ORDER BY occurred_at DESC LIMIT $5 "#, ) .bind(query.from) .bind(query.to) .bind(sources.as_deref()) .bind(query.include_private) .bind(query.limit as i64) .bind(query.repo.as_deref()) .fetch_all(&self.pool) .await .map_err(map_err)?; rows.into_iter() .map(|r| { let source_str: String = r.try_get("source").map_err(map_err)?; Ok(Event { id: r.try_get("id").map_err(map_err)?, source: Source::from_str(&source_str).map_err(map_err)?, action: r.try_get("action").map_err(map_err)?, occurred_at: r.try_get("occurred_at").map_err(map_err)?, public: r.try_get("public").map_err(map_err)?, payload: r.try_get("payload").map_err(map_err)?, }) }) .collect() } async fn source_summaries(&self, include_private: bool) -> Result, StoreError> { let rows = sqlx::query( r#" SELECT source, COUNT(*) AS count, MIN(occurred_at) AS earliest, MAX(occurred_at) AS latest FROM events WHERE $1::bool OR public = true GROUP BY source ORDER BY source "#, ) .bind(include_private) .fetch_all(&self.pool) .await .map_err(map_err)?; rows.into_iter() .map(|r| { let source_str: String = r.try_get("source").map_err(map_err)?; let earliest: Option> = r.try_get("earliest").map_err(map_err)?; let latest: Option> = r.try_get("latest").map_err(map_err)?; let count: i64 = r.try_get("count").map_err(map_err)?; Ok(SourceSummary { source: Source::from_str(&source_str).map_err(map_err)?, count, earliest, latest, }) }) .collect() } async fn list_projects(&self) -> Result, StoreError> { let rows = sqlx::query( r#" SELECT source, repo, host, SUM(commits)::bigint AS commit_count, SUM(issues)::bigint AS issue_count, SUM(prs)::bigint AS pr_count, MIN(occurred_at) AS first_activity, MAX(occurred_at) AS last_activity FROM ( SELECT source, occurred_at, CASE source WHEN 'github' THEN COALESCE( payload->'repo'->>'name', payload->'repository'->>'full_name', payload->>'_repo' ) WHEN 'gitea' THEN COALESCE( payload->'repo'->>'full_name', payload->'repo'->>'name' ) WHEN 'hg' THEN payload->>'_repo' WHEN 'bugzilla' THEN payload->>'product' ELSE NULL END AS repo, CASE source WHEN 'github' THEN 'github.com' WHEN 'gitea' THEN COALESCE(payload->>'_host', 'git.lair.cafe') WHEN 'hg' THEN COALESCE(payload->>'_host', 'hg-edge.mozilla.org') WHEN 'bugzilla' THEN 'bugzilla.mozilla.org' ELSE 'unknown' END AS host, CASE WHEN action IN ('Commit', 'PushEvent', 'commit_repo') THEN 1 ELSE 0 END AS commits, CASE WHEN action IN ('Issue', 'IssuesEvent') THEN 1 ELSE 0 END AS issues, CASE WHEN action IN ('PullRequest', 'PullRequestEvent') THEN 1 ELSE 0 END AS prs FROM events WHERE public = true ) sub WHERE repo IS NOT NULL AND repo != '' GROUP BY source, repo, host ORDER BY MAX(occurred_at) DESC "#, ) .fetch_all(&self.pool) .await .map_err(map_err)?; rows.into_iter() .map(|r| { let source_str: String = r.try_get("source").map_err(map_err)?; Ok(ProjectSummary { source: Source::from_str(&source_str).map_err(map_err)?, repo: r.try_get("repo").map_err(map_err)?, host: r.try_get("host").map_err(map_err)?, commit_count: r.try_get::("commit_count").map_err(map_err).unwrap_or(0), issue_count: r.try_get::("issue_count").map_err(map_err).unwrap_or(0), pr_count: r.try_get::("pr_count").map_err(map_err).unwrap_or(0), first_activity: r.try_get("first_activity").map_err(map_err)?, last_activity: r.try_get("last_activity").map_err(map_err)?, }) }) .collect() } async fn daily_counts(&self, from: NaiveDate, to: NaiveDate) -> Result, StoreError> { let rows = sqlx::query( r#" SELECT d::date AS date, COUNT(e.id)::bigint AS count FROM generate_series($1::date, $2::date, '1 day') d LEFT JOIN events e ON e.occurred_at >= d AND e.occurred_at < d + interval '1 day' AND e.public = true GROUP BY d::date ORDER BY d::date "#, ) .bind(from) .bind(to) .fetch_all(&self.pool) .await .map_err(map_err)?; rows.into_iter() .map(|r| { Ok(DailyCount { date: r.try_get("date").map_err(map_err)?, count: r.try_get("count").map_err(map_err)?, }) }) .collect() } } #[async_trait] impl PollerStateStore for PgStore { async fn load(&self, source: &str) -> Result, StoreError> { let row = sqlx::query( r#" SELECT source, etag, last_modified, last_fetched FROM poller_state WHERE source = $1 "#, ) .bind(source) .fetch_optional(&self.pool) .await .map_err(map_err)?; Ok(match row { None => None, Some(r) => Some(PollerState { source: r.try_get("source").map_err(map_err)?, etag: r.try_get("etag").map_err(map_err)?, last_modified: r.try_get("last_modified").map_err(map_err)?, last_fetched: r.try_get("last_fetched").map_err(map_err)?, }), }) } async fn save( &self, source: &str, etag: Option<&str>, last_modified: Option>, ) -> Result<(), StoreError> { sqlx::query( r#" INSERT INTO poller_state (source, etag, last_modified, last_fetched) VALUES ($1, $2, $3, now()) ON CONFLICT (source) DO UPDATE SET etag = EXCLUDED.etag, last_modified = EXCLUDED.last_modified, last_fetched = EXCLUDED.last_fetched "#, ) .bind(source) .bind(etag) .bind(last_modified) .execute(&self.pool) .await .map_err(map_err)?; Ok(()) } async fn touch(&self, source: &str) -> Result<(), StoreError> { sqlx::query( r#" INSERT INTO poller_state (source, last_fetched) VALUES ($1, now()) ON CONFLICT (source) DO UPDATE SET last_fetched = EXCLUDED.last_fetched "#, ) .bind(source) .execute(&self.pool) .await .map_err(map_err)?; Ok(()) } } #[async_trait] impl EventWriter for PgStore { async fn upsert_events(&self, events: &[Event]) -> Result { if events.is_empty() { return Ok(0); } let mut tx = self.pool.begin().await.map_err(map_err)?; let mut inserted = 0; for ev in events { let n = sqlx::query( r#" INSERT INTO events (id, source, action, occurred_at, public, payload) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (id) DO UPDATE SET source = EXCLUDED.source, action = EXCLUDED.action, occurred_at = EXCLUDED.occurred_at, public = EXCLUDED.public, payload = EXCLUDED.payload "#, ) .bind(&ev.id) .bind(ev.source.as_str()) .bind(&ev.action) .bind(ev.occurred_at) .bind(ev.public) .bind(&ev.payload) .execute(&mut *tx) .await .map_err(map_err)? .rows_affected(); inserted += n as usize; } tx.commit().await.map_err(map_err)?; Ok(inserted) } }