Add repo filter param to /v1/events (SQL COALESCE across payload shapes per source). New /project/:source/* route renders a filtered activity timeline for a single repo. Dashboard cards link to the drill-down page. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
172 lines
4.7 KiB
Rust
172 lines
4.7 KiB
Rust
use std::{net::SocketAddr, sync::Arc};
|
|
|
|
use axum::{
|
|
Json, Router,
|
|
extract::{Query, State},
|
|
http::StatusCode,
|
|
response::IntoResponse,
|
|
routing::get,
|
|
};
|
|
use chrono::{DateTime, Utc};
|
|
use clap::Parser;
|
|
use moments_core::{EventReader, reshape};
|
|
use moments_data::PgStore;
|
|
use moments_entities::{EventQuery, ProjectSummary, Source, SourceSummary, TimelineItem};
|
|
use serde::Deserialize;
|
|
use tower_http::{cors::CorsLayer, trace::TraceLayer};
|
|
use tracing::info;
|
|
|
|
#[derive(Parser, Debug)]
|
|
#[command(version, about = "moments read-only HTTP API")]
|
|
struct Args {
|
|
#[arg(long, env = "BIND_ADDR", default_value = "127.0.0.1:8080")]
|
|
bind: SocketAddr,
|
|
|
|
#[arg(long, env = "DATABASE_URL")]
|
|
database_url: String,
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
struct AppState {
|
|
store: Arc<PgStore>,
|
|
}
|
|
|
|
#[tokio::main]
|
|
async fn main() -> anyhow::Result<()> {
|
|
init_tracing();
|
|
let args = Args::parse();
|
|
|
|
// The api connects as moments_ro and never writes — migrations are owned
|
|
// by moments-worker, which is the database owner (moments_rw). Running
|
|
// migrations from here would fail with `permission denied for schema
|
|
// public`. The worker must have run at least once before the api accepts
|
|
// traffic; in deploy this is ordered via systemd dependencies (§3).
|
|
let store = PgStore::connect(&args.database_url).await?;
|
|
let state = AppState {
|
|
store: Arc::new(store),
|
|
};
|
|
|
|
let app = Router::new()
|
|
.route("/v1/healthz", get(healthz))
|
|
.route("/v1/events", get(list_events))
|
|
.route("/v1/sources", get(list_sources))
|
|
.route("/v1/projects", get(list_projects))
|
|
.with_state(state)
|
|
.layer(TraceLayer::new_for_http())
|
|
.layer(CorsLayer::permissive());
|
|
|
|
info!(addr = %args.bind, "listening");
|
|
let listener = tokio::net::TcpListener::bind(args.bind).await?;
|
|
axum::serve(listener, app).await?;
|
|
Ok(())
|
|
}
|
|
|
|
fn init_tracing() {
|
|
use tracing_subscriber::{EnvFilter, fmt};
|
|
let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
|
|
let json = std::env::var("JOURNAL_STREAM").is_ok();
|
|
if json {
|
|
fmt().with_env_filter(filter).json().init();
|
|
} else {
|
|
fmt().with_env_filter(filter).init();
|
|
}
|
|
}
|
|
|
|
async fn healthz() -> &'static str {
|
|
"ok"
|
|
}
|
|
|
|
#[derive(Debug, Deserialize)]
|
|
struct EventsQueryParams {
|
|
from: Option<DateTime<Utc>>,
|
|
to: Option<DateTime<Utc>>,
|
|
/// Comma-separated list, e.g. `source=github,gitea`.
|
|
source: Option<String>,
|
|
/// Filter to a specific repo, e.g. `repo=grenade/moments`.
|
|
repo: Option<String>,
|
|
limit: Option<u32>,
|
|
}
|
|
|
|
async fn list_events(
|
|
State(state): State<AppState>,
|
|
Query(params): Query<EventsQueryParams>,
|
|
) -> Result<Json<Vec<TimelineItem>>, ApiError> {
|
|
let sources = params
|
|
.source
|
|
.as_deref()
|
|
.map(parse_sources)
|
|
.transpose()?;
|
|
|
|
let limit = params.limit.unwrap_or(100).clamp(1, 1000);
|
|
|
|
let query = EventQuery {
|
|
from: params.from,
|
|
to: params.to,
|
|
sources,
|
|
repo: params.repo,
|
|
// Public timeline only — private events stay in the DB but are never
|
|
// surfaced. A future authenticated path can flip this.
|
|
include_private: false,
|
|
limit,
|
|
};
|
|
|
|
let events = state.store.list_events(&query).await.map_err(internal)?;
|
|
let items: Vec<TimelineItem> = events.iter().map(reshape).collect();
|
|
Ok(Json(items))
|
|
}
|
|
|
|
async fn list_sources(
|
|
State(state): State<AppState>,
|
|
) -> Result<Json<Vec<SourceSummary>>, ApiError> {
|
|
let summaries = state
|
|
.store
|
|
.source_summaries(/* include_private */ false)
|
|
.await
|
|
.map_err(internal)?;
|
|
Ok(Json(summaries))
|
|
}
|
|
|
|
async fn list_projects(
|
|
State(state): State<AppState>,
|
|
) -> Result<Json<Vec<ProjectSummary>>, ApiError> {
|
|
let projects = state.store.list_projects().await.map_err(internal)?;
|
|
Ok(Json(projects))
|
|
}
|
|
|
|
fn parse_sources(raw: &str) -> Result<Vec<Source>, ApiError> {
|
|
raw.split(',')
|
|
.map(str::trim)
|
|
.filter(|s| !s.is_empty())
|
|
.map(|s| s.parse::<Source>().map_err(|e| ApiError::bad_request(e.to_string())))
|
|
.collect()
|
|
}
|
|
|
|
struct ApiError {
|
|
status: StatusCode,
|
|
message: String,
|
|
}
|
|
|
|
impl ApiError {
|
|
fn bad_request(msg: impl Into<String>) -> Self {
|
|
Self {
|
|
status: StatusCode::BAD_REQUEST,
|
|
message: msg.into(),
|
|
}
|
|
}
|
|
}
|
|
|
|
fn internal<E: std::fmt::Display>(e: E) -> ApiError {
|
|
let message = e.to_string();
|
|
tracing::error!(error = %message, "internal handler error");
|
|
ApiError {
|
|
status: StatusCode::INTERNAL_SERVER_ERROR,
|
|
message,
|
|
}
|
|
}
|
|
|
|
impl IntoResponse for ApiError {
|
|
fn into_response(self) -> axum::response::Response {
|
|
(self.status, Json(serde_json::json!({ "error": self.message }))).into_response()
|
|
}
|
|
}
|