use std::{net::SocketAddr, sync::Arc, time::Duration}; use axum::{ Json, Router, extract::{Path, Query, State}, http::StatusCode, response::IntoResponse, routing::get, }; use chrono::{DateTime, Datelike, NaiveDate, Utc}; use clap::Parser; use moments_core::{EventReader, reshape}; use moments_data::PgStore; use moments_entities::{DailyCount, EventQuery, LanguageDailyCount, ProjectSummary, RepoLanguage, Source, SourceSummary, TimelineItem}; use serde::Deserialize; use tower_http::{cors::CorsLayer, trace::TraceLayer}; use tracing::info; #[derive(Parser, Debug)] #[command(version, about = "moments read-only HTTP API")] struct Args { #[arg(long, env = "BIND_ADDR", default_value = "127.0.0.1:8080")] bind: SocketAddr, #[arg(long, env = "DATABASE_URL")] database_url: String, } #[derive(Clone)] struct AppState { store: Arc, http: reqwest::Client, } #[tokio::main] async fn main() -> anyhow::Result<()> { init_tracing(); let args = Args::parse(); // The api connects as moments_ro and never writes — migrations are owned // by moments-worker, which is the database owner (moments_rw). Running // migrations from here would fail with `permission denied for schema // public`. The worker must have run at least once before the api accepts // traffic; in deploy this is ordered via systemd dependencies (§3). let store = PgStore::connect(&args.database_url).await?; let http = reqwest::Client::builder() .timeout(Duration::from_secs(15)) .build()?; let state = AppState { store: Arc::new(store), http, }; let app = Router::new() .route("/v1/healthz", get(healthz)) .route("/v1/events", get(list_events)) .route("/v1/sources", get(list_sources)) .route("/v1/projects", get(list_projects)) .route("/v1/activity/daily", get(daily_counts)) .route("/v1/languages/daily", get(language_daily_counts)) .route("/v1/languages/repos", get(repo_languages)) .route("/v1/forge/{source}/{*rest}", get(forge_proxy)) .route("/v1/og/contributions.png", get(og_contributions)) .with_state(state) .layer(TraceLayer::new_for_http()) .layer(CorsLayer::permissive()); info!(addr = %args.bind, "listening"); let listener = tokio::net::TcpListener::bind(args.bind).await?; axum::serve(listener, app).await?; Ok(()) } fn init_tracing() { use tracing_subscriber::{EnvFilter, fmt}; let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")); let json = std::env::var("JOURNAL_STREAM").is_ok(); if json { fmt().with_env_filter(filter).json().init(); } else { fmt().with_env_filter(filter).init(); } } async fn healthz() -> &'static str { "ok" } #[derive(Debug, Deserialize)] struct EventsQueryParams { from: Option>, to: Option>, /// Comma-separated list, e.g. `source=github,gitea`. source: Option, /// Filter to a specific repo, e.g. `repo=grenade/moments`. repo: Option, limit: Option, } async fn list_events( State(state): State, Query(params): Query, ) -> Result>, ApiError> { let sources = params .source .as_deref() .map(parse_sources) .transpose()?; let limit = params.limit.unwrap_or(100).clamp(1, 1000); let query = EventQuery { from: params.from, to: params.to, sources, repo: params.repo, // Public timeline only — private events stay in the DB but are never // surfaced. A future authenticated path can flip this. include_private: false, limit, }; let events = state.store.list_events(&query).await.map_err(internal)?; let items: Vec = events.iter().map(reshape).collect(); Ok(Json(items)) } async fn list_sources( State(state): State, ) -> Result>, ApiError> { let summaries = state .store .source_summaries(/* include_private */ true) .await .map_err(internal)?; Ok(Json(summaries)) } async fn list_projects( State(state): State, ) -> Result>, ApiError> { let projects = state.store.list_projects().await.map_err(internal)?; Ok(Json(projects)) } #[derive(Debug, Deserialize)] struct DailyCountsParams { from: Option, to: Option, } async fn daily_counts( State(state): State, Query(params): Query, ) -> Result>, ApiError> { let to = params.to.unwrap_or_else(|| Utc::now().date_naive()); let from = params.from.unwrap_or_else(|| to - chrono::Duration::days(365)); let counts = state.store.daily_counts(from, to, /* include_private */ true).await.map_err(internal)?; Ok(Json(counts)) } async fn language_daily_counts( State(state): State, Query(params): Query, ) -> Result>, ApiError> { let to = params.to.unwrap_or_else(|| Utc::now().date_naive()); let from = params.from.unwrap_or_else(|| to - chrono::Duration::days(365)); let counts = state.store.language_daily_counts(from, to, /* include_private */ true).await.map_err(internal)?; Ok(Json(counts)) } async fn repo_languages( State(state): State, ) -> Result>, ApiError> { let langs = state.store.repo_languages().await.map_err(internal)?; Ok(Json(langs)) } async fn og_contributions( State(state): State, ) -> Result { // Get date range from source summaries let summaries = state .store .source_summaries(/* include_private */ true) .await .map_err(internal)?; let earliest = summaries .iter() .filter_map(|s| s.earliest) .min() .unwrap_or_else(Utc::now) .date_naive(); let today = Utc::now().date_naive(); let counts = state .store .daily_counts(earliest, today, /* include_private */ true) .await .map_err(internal)?; let projects = state.store.list_projects().await.map_err(internal)?; let repo_count = projects.len(); let png = render_contributions_png(&counts, earliest, today, repo_count).map_err(|e| ApiError { status: StatusCode::INTERNAL_SERVER_ERROR, message: e, })?; Ok(( StatusCode::OK, [ (axum::http::header::CONTENT_TYPE, "image/png"), (axum::http::header::CACHE_CONTROL, "public, max-age=3600"), ], png, )) } fn render_contributions_png( counts: &[DailyCount], from: NaiveDate, to: NaiveDate, repo_count: usize, ) -> Result, String> { use std::collections::HashMap; let count_map: HashMap = counts.iter().map(|d| (d.date, d.count)).collect(); // OG image canvas: 1200x630 let og_w = 1200_f64; let og_h = 630_f64; let padding = 40_f64; let bg = "#2c3e50"; let year_label_w = 50_f64; let max_cols = 53; // Scale cell size to fill available width let avail_w = og_w - 2.0 * padding - year_label_w; let step = (avail_w / max_cols as f64).floor(); let gap = (step * 0.17).round(); let cell = step - gap; let radius = cell / 2.0; let colors = ["rgba(255,255,255,0.05)", "#0e4429", "#006d32", "#26a641", "#39d353"]; // Build weekly data per year struct YearRow { year: i32, weeks: Vec<(NaiveDate, NaiveDate, i64)>, // start, end, count } let start_year = from.year(); let end_year = to.year(); let mut rows: Vec = Vec::new(); for yr in start_year..=end_year { let year_start = NaiveDate::from_ymd_opt(yr, 1, 1).unwrap(); let year_end = if yr == end_year { to } else { NaiveDate::from_ymd_opt(yr, 12, 31).unwrap() }; let offset = year_start.weekday().num_days_from_sunday(); let mut cursor = year_start - chrono::Duration::days(offset as i64); let mut weeks = Vec::new(); while cursor <= year_end { let week_start = cursor; let mut week_count = 0i64; for _ in 0..7 { week_count += count_map.get(&cursor).copied().unwrap_or(0); cursor += chrono::Duration::days(1); } let week_end = cursor - chrono::Duration::days(1); weeks.push((week_start, week_end, week_count)); } rows.push(YearRow { year: yr, weeks }); } // Quantile thresholds let mut non_zero: Vec = rows .iter() .flat_map(|r| r.weeks.iter().map(|w| w.2)) .filter(|&c| c > 0) .collect(); non_zero.sort(); let thresholds = if non_zero.is_empty() { [1i64, 2, 3] } else { let p = |pct: f64| non_zero[(pct * non_zero.len() as f64).min(non_zero.len() as f64 - 1.0) as usize]; [p(0.25), p(0.5), p(0.75)] }; let color_for = |count: i64| -> &str { if count == 0 { colors[0] } else if count <= thresholds[0] { colors[1] } else if count <= thresholds[1] { colors[2] } else if count <= thresholds[2] { colors[3] } else { colors[4] } }; let n_rows = rows.len(); let graph_h = (n_rows as f64) * step; let total: i64 = counts.iter().map(|d| d.count).sum(); let repo_text = if repo_count > 0 { format!(" in {repo_count} repositories") } else { String::new() }; // Layout: headline at top, graph vertically centered in remaining space let offset_x = padding; let headline_y = padding + 36.0; let subtitle_y = headline_y + 28.0; let graph_top = subtitle_y + 16.0; let avail_graph_h = og_h - graph_top - padding; let graph_y = graph_top + (avail_graph_h - graph_h).max(0.0) / 2.0; let mut svg = format!( r#""#, ); // Headline svg.push_str(&format!( r##"rob thijssen"##, x = offset_x + year_label_w, y = headline_y, )); // Subtitle svg.push_str(&format!( r##"{total} contributions since {from}{repo_text}"##, x = offset_x + year_label_w, y = subtitle_y, )); let label_font_size = (step * 0.7).round().max(8.0).min(14.0); for (row_idx, row) in rows.iter().enumerate() { let y_base = graph_y + (row_idx as f64) * step; svg.push_str(&format!( r##"{yr}"##, x = offset_x + year_label_w - 6.0, y = y_base + radius, fs = label_font_size, yr = row.year, )); for (col, (_, _, count)) in row.weeks.iter().enumerate() { let cx = offset_x + year_label_w + (col as f64) * step + radius; let cy = y_base + radius; let fill = color_for(*count); svg.push_str(&format!( r#""#, r = radius - 1.0, )); } } svg.push_str(""); // Rasterize at 1200x630 let mut fontdb = fontdb::Database::new(); fontdb.load_system_fonts(); let mut opts = resvg::usvg::Options::default(); opts.fontdb = std::sync::Arc::new(fontdb); opts.font_family = "Noto Sans".to_owned(); let tree = resvg::usvg::Tree::from_str(&svg, &opts) .map_err(|e| format!("svg parse: {e}"))?; let mut pixmap = resvg::tiny_skia::Pixmap::new(og_w as u32, og_h as u32).ok_or_else(|| "pixmap alloc failed".to_string())?; resvg::render(&tree, resvg::tiny_skia::Transform::default(), &mut pixmap.as_mut()); pixmap .encode_png() .map_err(|e| format!("png encode: {e}")) } /// Allowlisted forge hosts that the proxy may contact. const ALLOWED_HOSTS: &[&str] = &["api.github.com", "git.lair.cafe"]; #[derive(Debug, Deserialize)] struct ForgeProxyParams { host: Option, } /// Proxy requests to forge APIs to avoid CORS issues. /// `GET /v1/forge/{source}/{path}?host=git.lair.cafe` async fn forge_proxy( State(state): State, Path((source, rest)): Path<(String, String)>, Query(params): Query, ) -> Result { let (base, api_prefix) = match source.as_str() { "github" => ("https://api.github.com".to_string(), ""), "gitea" => { let host = params.host.as_deref().unwrap_or("git.lair.cafe"); if !ALLOWED_HOSTS.contains(&host) { return Err(ApiError::bad_request(format!("host not allowed: {host}"))); } (format!("https://{host}"), "/api/v1") } _ => return Err(ApiError::bad_request(format!("unsupported source: {source}"))), }; let url = format!("{base}{api_prefix}/{rest}"); let resp = state .http .get(&url) .header("Accept", "application/json") .header("User-Agent", "moments-api") .send() .await .map_err(|e| { tracing::warn!(url = %url, error = %e, "forge proxy request failed"); ApiError { status: StatusCode::BAD_GATEWAY, message: e.to_string(), } })?; let status = StatusCode::from_u16(resp.status().as_u16()).unwrap_or(StatusCode::BAD_GATEWAY); let body = resp.bytes().await.map_err(|e| ApiError { status: StatusCode::BAD_GATEWAY, message: e.to_string(), })?; Ok(( status, [(axum::http::header::CONTENT_TYPE, "application/json")], body, )) } fn parse_sources(raw: &str) -> Result, ApiError> { raw.split(',') .map(str::trim) .filter(|s| !s.is_empty()) .map(|s| s.parse::().map_err(|e| ApiError::bad_request(e.to_string()))) .collect() } struct ApiError { status: StatusCode, message: String, } impl ApiError { fn bad_request(msg: impl Into) -> Self { Self { status: StatusCode::BAD_REQUEST, message: msg.into(), } } } fn internal(e: E) -> ApiError { let message = e.to_string(); tracing::error!(error = %message, "internal handler error"); ApiError { status: StatusCode::INTERNAL_SERVER_ERROR, message, } } impl IntoResponse for ApiError { fn into_response(self) -> axum::response::Response { (self.status, Json(serde_json::json!({ "error": self.message }))).into_response() } }