Files
moments/crates/moments-api/src/main.rs
rob thijssen 3761333ac4
Some checks failed
deploy / Build api + worker + web (push) Failing after 5m59s
deploy / Deploy moments-api to nikola (push) Has been skipped
deploy / Deploy moments-worker to frootmig (push) Has been skipped
deploy / Deploy web to oolon (push) Has been skipped
fix: make the workspace pass the CI lint/test gate
The new Gitea Actions build gate runs `cargo fmt --check`, `clippy -D warnings`,
and `cargo test` — stricter than the old deploy.sh, which only `cargo build`d.
That surfaced pre-existing drift that never compiled under the test/clippy
profile:

- apply rustfmt across the workspace (formatting only, no logic changes)
- moments-data: add the missing `prune_events` to the test-only `NoopWriter`
  stub (the EventWriter trait gained it with the blog-prune feature; a plain
  `cargo build` never compiles the `#[cfg(test)]` stub, so it went stale)
- moments-api: `.max().min()` -> `.clamp()`, and build `usvg::Options` with
  struct-update syntax instead of post-Default field assignment

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01X7zF7Kf4JqDwa6M8Qgge9M
2026-06-25 13:00:40 +03:00

613 lines
19 KiB
Rust

use std::{net::SocketAddr, sync::Arc, time::Duration};
use axum::{
Json, Router,
extract::{Path, Query, State},
http::StatusCode,
response::IntoResponse,
routing::get,
};
use chrono::{DateTime, Datelike, NaiveDate, Utc};
use clap::Parser;
use moments_core::{EventReader, reshape};
use moments_data::PgStore;
use moments_entities::{
BlogPost, BlogPostSummary, DailyCount, Event, EventQuery, HourlyAvg, LanguageDailyCount,
ProjectSummary, RepoLanguage, Source, SourceSummary, TimelineItem,
};
use serde::Deserialize;
use tower_http::{cors::CorsLayer, trace::TraceLayer};
use tracing::info;
#[derive(Parser, Debug)]
#[command(version, about = "moments read-only HTTP API")]
struct Args {
#[arg(long, env = "BIND_ADDR", default_value = "127.0.0.1:8080")]
bind: SocketAddr,
#[arg(long, env = "DATABASE_URL")]
database_url: String,
}
#[derive(Clone)]
struct AppState {
store: Arc<PgStore>,
http: reqwest::Client,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
init_tracing();
let args = Args::parse();
// The api connects as moments_ro and never writes — migrations are owned
// by moments-worker, which is the database owner (moments_rw). Running
// migrations from here would fail with `permission denied for schema
// public`. The worker must have run at least once before the api accepts
// traffic; in deploy this is ordered via systemd dependencies (§3).
let store = PgStore::connect(&args.database_url).await?;
let http = reqwest::Client::builder()
.timeout(Duration::from_secs(15))
.build()?;
let state = AppState {
store: Arc::new(store),
http,
};
let app = Router::new()
.route("/v1/healthz", get(healthz))
.route("/v1/events", get(list_events))
.route("/v1/sources", get(list_sources))
.route("/v1/projects", get(list_projects))
.route("/v1/blog", get(list_blog_posts))
.route("/v1/blog/{slug}", get(get_blog_post))
.route("/v1/activity/daily", get(daily_counts))
.route("/v1/activity/hourly", get(hourly_avgs))
.route("/v1/languages/daily", get(language_daily_counts))
.route("/v1/languages/repos", get(repo_languages))
.route("/v1/forge/{source}/{*rest}", get(forge_proxy))
.route("/v1/og/contributions.png", get(og_contributions))
.with_state(state)
.layer(TraceLayer::new_for_http())
.layer(CorsLayer::permissive());
info!(addr = %args.bind, "listening");
let listener = tokio::net::TcpListener::bind(args.bind).await?;
axum::serve(listener, app).await?;
Ok(())
}
fn init_tracing() {
use tracing_subscriber::{EnvFilter, fmt};
let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
let json = std::env::var("JOURNAL_STREAM").is_ok();
if json {
fmt().with_env_filter(filter).json().init();
} else {
fmt().with_env_filter(filter).init();
}
}
async fn healthz() -> &'static str {
"ok"
}
#[derive(Debug, Deserialize)]
struct EventsQueryParams {
from: Option<DateTime<Utc>>,
to: Option<DateTime<Utc>>,
/// Comma-separated list, e.g. `source=github,gitea`.
source: Option<String>,
/// Filter to a specific repo, e.g. `repo=grenade/moments`.
repo: Option<String>,
limit: Option<u32>,
}
async fn list_events(
State(state): State<AppState>,
Query(params): Query<EventsQueryParams>,
) -> Result<Json<Vec<TimelineItem>>, ApiError> {
let sources = params.source.as_deref().map(parse_sources).transpose()?;
let limit = params.limit.unwrap_or(100).clamp(1, 1000);
let query = EventQuery {
from: params.from,
to: params.to,
sources,
repo: params.repo,
// Public timeline only — private events stay in the DB but are never
// surfaced. A future authenticated path can flip this.
include_private: false,
limit,
};
let events = state.store.list_events(&query).await.map_err(internal)?;
let items: Vec<TimelineItem> = events.iter().map(reshape).collect();
Ok(Json(items))
}
async fn list_sources(State(state): State<AppState>) -> Result<Json<Vec<SourceSummary>>, ApiError> {
let summaries = state
.store
.source_summaries(/* include_private */ true)
.await
.map_err(internal)?;
Ok(Json(summaries))
}
async fn list_projects(
State(state): State<AppState>,
) -> Result<Json<Vec<ProjectSummary>>, ApiError> {
let projects = state.store.list_projects().await.map_err(internal)?;
Ok(Json(projects))
}
/// All public blog events, newest first. Blog posts live in the events
/// table (`source = 'blog'`); the payload carries the frontmatter fields
/// and the full markdown body.
async fn blog_events(state: &AppState) -> Result<Vec<Event>, ApiError> {
let query = EventQuery {
sources: Some(vec![Source::Blog]),
// Drafts are stored with public = false and stay invisible here.
include_private: false,
limit: 1000,
..Default::default()
};
state.store.list_events(&query).await.map_err(internal)
}
fn payload_str<'a>(event: &'a Event, key: &str) -> &'a str {
event
.payload
.get(key)
.and_then(|v| v.as_str())
.unwrap_or("")
}
async fn list_blog_posts(
State(state): State<AppState>,
) -> Result<Json<Vec<BlogPostSummary>>, ApiError> {
let posts = blog_events(&state)
.await?
.iter()
.map(|ev| BlogPostSummary {
slug: payload_str(ev, "slug").to_string(),
title: payload_str(ev, "title").to_string(),
published_at: ev.occurred_at,
excerpt: moments_core::presentation::blog::excerpt(payload_str(ev, "markdown")),
})
.collect();
Ok(Json(posts))
}
async fn get_blog_post(
State(state): State<AppState>,
Path(slug): Path<String>,
) -> Result<Json<BlogPost>, ApiError> {
let id = format!("blog:{slug}");
let events = blog_events(&state).await?;
let ev = events.iter().find(|ev| ev.id == id).ok_or(ApiError {
status: StatusCode::NOT_FOUND,
message: "no such post".into(),
})?;
Ok(Json(BlogPost {
slug: payload_str(ev, "slug").to_string(),
title: payload_str(ev, "title").to_string(),
published_at: ev.occurred_at,
markdown: payload_str(ev, "markdown").to_string(),
host: payload_str(ev, "_host").to_string(),
repo: payload_str(ev, "_repo").to_string(),
branch: payload_str(ev, "_branch").to_string(),
}))
}
#[derive(Debug, Deserialize)]
struct DailyCountsParams {
from: Option<NaiveDate>,
to: Option<NaiveDate>,
}
async fn daily_counts(
State(state): State<AppState>,
Query(params): Query<DailyCountsParams>,
) -> Result<Json<Vec<DailyCount>>, ApiError> {
let to = params.to.unwrap_or_else(|| Utc::now().date_naive());
let from = params
.from
.unwrap_or_else(|| to - chrono::Duration::days(365));
let counts = state
.store
.daily_counts(from, to, /* include_private */ true)
.await
.map_err(internal)?;
Ok(Json(counts))
}
async fn language_daily_counts(
State(state): State<AppState>,
Query(params): Query<DailyCountsParams>,
) -> Result<Json<Vec<LanguageDailyCount>>, ApiError> {
let to = params.to.unwrap_or_else(|| Utc::now().date_naive());
let from = params
.from
.unwrap_or_else(|| to - chrono::Duration::days(365));
let counts = state
.store
.language_daily_counts(from, to, /* include_private */ true)
.await
.map_err(internal)?;
Ok(Json(counts))
}
#[derive(Debug, Deserialize)]
struct HourlyAvgsParams {
from: Option<NaiveDate>,
to: Option<NaiveDate>,
/// IANA timezone name (e.g. "Europe/Helsinki"). Defaults to UTC.
/// Hour buckets are computed in this zone so the chart matches the
/// clock the user sees.
tz: Option<String>,
}
async fn hourly_avgs(
State(state): State<AppState>,
Query(params): Query<HourlyAvgsParams>,
) -> Result<Json<Vec<HourlyAvg>>, ApiError> {
let to = params.to.unwrap_or_else(|| Utc::now().date_naive());
let from = params
.from
.unwrap_or_else(|| to - chrono::Duration::days(365));
let tz = params.tz.as_deref().unwrap_or("UTC");
// Validate the tz string before handing it to postgres — a bad name
// here would surface as an opaque 500 from the DB. chrono-tz would do
// it for free but we don't depend on it; instead reject obvious shell
// injection vectors (the value is bound, not interpolated, so this is
// belt-and-braces).
if tz.len() > 64
|| tz
.chars()
.any(|c| !(c.is_ascii_alphanumeric() || matches!(c, '/' | '_' | '+' | '-')))
{
return Err(ApiError {
status: StatusCode::BAD_REQUEST,
message: "invalid tz".into(),
});
}
let avgs = state
.store
.hourly_avgs(from, to, tz, /* include_private */ true)
.await
.map_err(internal)?;
Ok(Json(avgs))
}
async fn repo_languages(
State(state): State<AppState>,
) -> Result<Json<Vec<RepoLanguage>>, ApiError> {
let langs = state.store.repo_languages().await.map_err(internal)?;
Ok(Json(langs))
}
async fn og_contributions(State(state): State<AppState>) -> Result<impl IntoResponse, ApiError> {
// Get date range from source summaries
let summaries = state
.store
.source_summaries(/* include_private */ true)
.await
.map_err(internal)?;
let earliest = summaries
.iter()
.filter_map(|s| s.earliest)
.min()
.unwrap_or_else(Utc::now)
.date_naive();
let today = Utc::now().date_naive();
let counts = state
.store
.daily_counts(earliest, today, /* include_private */ true)
.await
.map_err(internal)?;
let projects = state.store.list_projects().await.map_err(internal)?;
let repo_count = projects.len();
let png =
render_contributions_png(&counts, earliest, today, repo_count).map_err(|e| ApiError {
status: StatusCode::INTERNAL_SERVER_ERROR,
message: e,
})?;
Ok((
StatusCode::OK,
[
(axum::http::header::CONTENT_TYPE, "image/png"),
(axum::http::header::CACHE_CONTROL, "public, max-age=3600"),
],
png,
))
}
fn render_contributions_png(
counts: &[DailyCount],
from: NaiveDate,
to: NaiveDate,
repo_count: usize,
) -> Result<Vec<u8>, String> {
use std::collections::HashMap;
let count_map: HashMap<NaiveDate, i64> = counts.iter().map(|d| (d.date, d.count)).collect();
// OG image canvas: 1200x630
let og_w = 1200_f64;
let og_h = 630_f64;
let padding = 40_f64;
let bg = "#2c3e50";
let year_label_w = 50_f64;
let max_cols = 53;
// Scale cell size to fill available width
let avail_w = og_w - 2.0 * padding - year_label_w;
let step = (avail_w / max_cols as f64).floor();
let gap = (step * 0.17).round();
let cell = step - gap;
let radius = cell / 2.0;
let colors = [
"rgba(255,255,255,0.05)",
"#0e4429",
"#006d32",
"#26a641",
"#39d353",
];
// Build weekly data per year
struct YearRow {
year: i32,
weeks: Vec<(NaiveDate, NaiveDate, i64)>, // start, end, count
}
let start_year = from.year();
let end_year = to.year();
let mut rows: Vec<YearRow> = Vec::new();
for yr in start_year..=end_year {
let year_start = NaiveDate::from_ymd_opt(yr, 1, 1).unwrap();
let year_end = if yr == end_year {
to
} else {
NaiveDate::from_ymd_opt(yr, 12, 31).unwrap()
};
let offset = year_start.weekday().num_days_from_sunday();
let mut cursor = year_start - chrono::Duration::days(offset as i64);
let mut weeks = Vec::new();
while cursor <= year_end {
let week_start = cursor;
let mut week_count = 0i64;
for _ in 0..7 {
week_count += count_map.get(&cursor).copied().unwrap_or(0);
cursor += chrono::Duration::days(1);
}
let week_end = cursor - chrono::Duration::days(1);
weeks.push((week_start, week_end, week_count));
}
rows.push(YearRow { year: yr, weeks });
}
// Quantile thresholds
let mut non_zero: Vec<i64> = rows
.iter()
.flat_map(|r| r.weeks.iter().map(|w| w.2))
.filter(|&c| c > 0)
.collect();
non_zero.sort();
let thresholds = if non_zero.is_empty() {
[1i64, 2, 3]
} else {
let p = |pct: f64| {
non_zero[(pct * non_zero.len() as f64).min(non_zero.len() as f64 - 1.0) as usize]
};
[p(0.25), p(0.5), p(0.75)]
};
let color_for = |count: i64| -> &str {
if count == 0 {
colors[0]
} else if count <= thresholds[0] {
colors[1]
} else if count <= thresholds[1] {
colors[2]
} else if count <= thresholds[2] {
colors[3]
} else {
colors[4]
}
};
let n_rows = rows.len();
let graph_h = (n_rows as f64) * step;
let total: i64 = counts.iter().map(|d| d.count).sum();
let repo_text = if repo_count > 0 {
format!(" in {repo_count} repositories")
} else {
String::new()
};
// Layout: headline at top, graph vertically centered in remaining space
let offset_x = padding;
let headline_y = padding + 36.0;
let subtitle_y = headline_y + 28.0;
let graph_top = subtitle_y + 16.0;
let avail_graph_h = og_h - graph_top - padding;
let graph_y = graph_top + (avail_graph_h - graph_h).max(0.0) / 2.0;
let mut svg = format!(
r#"<svg xmlns="http://www.w3.org/2000/svg" width="{og_w}" height="{og_h}" viewBox="0 0 {og_w} {og_h}"><rect width="100%" height="100%" fill="{bg}"/>"#,
);
// Headline
svg.push_str(&format!(
r##"<text x="{x}" y="{y}" fill="#ecf0f1" font-family="sans-serif" font-size="36" font-weight="bold">rob thijssen</text>"##,
x = offset_x + year_label_w,
y = headline_y,
));
// Subtitle
svg.push_str(&format!(
r##"<text x="{x}" y="{y}" fill="#ecf0f1" font-family="sans-serif" font-size="16" opacity="0.6">{total} contributions since {from}{repo_text}</text>"##,
x = offset_x + year_label_w,
y = subtitle_y,
));
let label_font_size = (step * 0.7).round().clamp(8.0, 14.0);
for (row_idx, row) in rows.iter().enumerate() {
let y_base = graph_y + (row_idx as f64) * step;
svg.push_str(&format!(
r##"<text x="{x}" y="{y}" text-anchor="end" dominant-baseline="central" fill="#ecf0f1" font-family="sans-serif" font-size="{fs}" opacity="0.6">{yr}</text>"##,
x = offset_x + year_label_w - 6.0,
y = y_base + radius,
fs = label_font_size,
yr = row.year,
));
for (col, (_, _, count)) in row.weeks.iter().enumerate() {
let cx = offset_x + year_label_w + (col as f64) * step + radius;
let cy = y_base + radius;
let fill = color_for(*count);
svg.push_str(&format!(
r#"<circle cx="{cx}" cy="{cy}" r="{r}" fill="{fill}"/>"#,
r = radius - 1.0,
));
}
}
svg.push_str("</svg>");
// Rasterize at 1200x630
let mut fontdb = fontdb::Database::new();
fontdb.load_system_fonts();
let opts = resvg::usvg::Options {
fontdb: std::sync::Arc::new(fontdb),
font_family: "Noto Sans".to_owned(),
..Default::default()
};
let tree = resvg::usvg::Tree::from_str(&svg, &opts).map_err(|e| format!("svg parse: {e}"))?;
let mut pixmap = resvg::tiny_skia::Pixmap::new(og_w as u32, og_h as u32)
.ok_or_else(|| "pixmap alloc failed".to_string())?;
resvg::render(
&tree,
resvg::tiny_skia::Transform::default(),
&mut pixmap.as_mut(),
);
pixmap.encode_png().map_err(|e| format!("png encode: {e}"))
}
/// Allowlisted forge hosts that the proxy may contact.
const ALLOWED_HOSTS: &[&str] = &["api.github.com", "git.lair.cafe"];
#[derive(Debug, Deserialize)]
struct ForgeProxyParams {
host: Option<String>,
}
/// Proxy requests to forge APIs to avoid CORS issues.
/// `GET /v1/forge/{source}/{path}?host=git.lair.cafe`
async fn forge_proxy(
State(state): State<AppState>,
Path((source, rest)): Path<(String, String)>,
Query(params): Query<ForgeProxyParams>,
) -> Result<impl IntoResponse, ApiError> {
let (base, api_prefix) = match source.as_str() {
"github" => ("https://api.github.com".to_string(), ""),
"gitea" => {
let host = params.host.as_deref().unwrap_or("git.lair.cafe");
if !ALLOWED_HOSTS.contains(&host) {
return Err(ApiError::bad_request(format!("host not allowed: {host}")));
}
(format!("https://{host}"), "/api/v1")
}
_ => {
return Err(ApiError::bad_request(format!(
"unsupported source: {source}"
)));
}
};
let url = format!("{base}{api_prefix}/{rest}");
let resp = state
.http
.get(&url)
.header("Accept", "application/json")
.header("User-Agent", "moments-api")
.send()
.await
.map_err(|e| {
tracing::warn!(url = %url, error = %e, "forge proxy request failed");
ApiError {
status: StatusCode::BAD_GATEWAY,
message: e.to_string(),
}
})?;
let status = StatusCode::from_u16(resp.status().as_u16()).unwrap_or(StatusCode::BAD_GATEWAY);
let body = resp.bytes().await.map_err(|e| ApiError {
status: StatusCode::BAD_GATEWAY,
message: e.to_string(),
})?;
Ok((
status,
[(axum::http::header::CONTENT_TYPE, "application/json")],
body,
))
}
fn parse_sources(raw: &str) -> Result<Vec<Source>, ApiError> {
raw.split(',')
.map(str::trim)
.filter(|s| !s.is_empty())
.map(|s| {
s.parse::<Source>()
.map_err(|e| ApiError::bad_request(e.to_string()))
})
.collect()
}
struct ApiError {
status: StatusCode,
message: String,
}
impl ApiError {
fn bad_request(msg: impl Into<String>) -> Self {
Self {
status: StatusCode::BAD_REQUEST,
message: msg.into(),
}
}
}
fn internal<E: std::fmt::Display>(e: E) -> ApiError {
let message = e.to_string();
tracing::error!(error = %message, "internal handler error");
ApiError {
status: StatusCode::INTERNAL_SERVER_ERROR,
message,
}
}
impl IntoResponse for ApiError {
fn into_response(self) -> axum::response::Response {
(
self.status,
Json(serde_json::json!({ "error": self.message })),
)
.into_response()
}
}