//! Gitea activity feed ingestion. //! //! Hits `/api/v1/users/{user}/activities/feeds?only-performed-by=true` //! which returns events the user themselves caused (not received events //! from others they follow). No ETag support upstream, so each tick fetches //! page 1 and relies on idempotent upsert. First run paginates further to //! seed history. //! //! Each item carries a self-contained payload — including the event-emitting //! host — so the reshape layer can construct URLs without needing config. use std::collections::HashSet; use std::sync::Arc; use async_trait::async_trait; use chrono::{DateTime, Utc}; use moments_core::{EventSource, EventWriter, PollerStateStore, SourceError}; use moments_entities::{Event, RepoLanguage, Source}; use reqwest::{Client, header}; use serde_json::Value; use tracing::debug; const SOURCE_NAME: &str = "gitea"; const USER_AGENT: &str = concat!( "moments/", env!("CARGO_PKG_VERSION"), " (+https://rob.tn)" ); const MAX_BACKFILL_PAGES: u32 = 20; #[derive(Clone, Debug)] pub struct GiteaConfig { /// e.g. `git.lair.cafe`. Used to construct URLs the API doesn't return /// directly (issue / PR / commit web links) and stamped into each event /// payload for the reshape layer. pub host: String, pub user: String, pub token: Option, pub per_page: u32, } impl Default for GiteaConfig { fn default() -> Self { Self { host: "git.lair.cafe".into(), user: "grenade".into(), token: None, per_page: 50, } } } pub struct GiteaSource { client: Client, writer: Arc, state: Arc, config: GiteaConfig, } impl GiteaSource { pub fn new( client: Client, writer: Arc, state: Arc, config: GiteaConfig, ) -> Self { Self { client, writer, state, config, } } fn user_feed_base_url(&self) -> String { format!( "https://{}/api/v1/users/{}/activities/feeds?only-performed-by=true&limit={}", self.config.host, self.config.user, self.config.per_page ) } fn org_feed_base_url(&self, org: &str) -> String { format!( "https://{}/api/v1/orgs/{}/activities/feeds?limit={}", self.config.host, org, self.config.per_page ) } fn apply_headers(&self, mut req: reqwest::RequestBuilder) -> reqwest::RequestBuilder { req = req .header(header::ACCEPT, "application/json") .header(header::USER_AGENT, USER_AGENT); if let Some(token) = &self.config.token { req = req.header(header::AUTHORIZATION, format!("token {token}")); } req } /// Discover organizations the authenticated user belongs to. /// Returns an empty vec if no token is configured or the request fails. async fn discover_orgs(&self) -> Result, SourceError> { if self.config.token.is_none() { return Ok(vec![]); } let url = format!("https://{}/api/v1/user/orgs", self.config.host); let req = self.apply_headers(self.client.get(&url)); let resp = req .send() .await .map_err(|e| SourceError::Http(e.to_string()))?; if !resp.status().is_success() { tracing::warn!(status = %resp.status(), "failed to discover gitea orgs"); return Ok(vec![]); } let orgs: Vec = resp .json() .await .map_err(|e| SourceError::Parse(e.to_string()))?; Ok(orgs .iter() .filter_map(|o| o.get("username").and_then(Value::as_str).map(String::from)) .collect()) } /// Poll a single activity feed, paginating on first run. When `filter_user` /// is true, only events performed by `self.config.user` are ingested (used /// for org feeds which contain all members' activity). /// /// `base_url` should contain everything except the `&page=N` suffix. /// Returns (ingested_count, set_of_repo_full_names). async fn poll_feed( &self, state_key: &str, base_url: &str, filter_user: bool, ) -> Result<(usize, HashSet), SourceError> { let prior = self.state.load(state_key).await?; let first_run = prior.is_none(); let max_pages = if first_run { MAX_BACKFILL_PAGES } else { 1 }; let mut total = 0usize; let mut repos = HashSet::new(); for page in 1..=max_pages { let url = format!("{base_url}&page={page}"); let req = self.apply_headers(self.client.get(&url)); let resp = req .send() .await .map_err(|e| SourceError::Http(e.to_string()))?; if !resp.status().is_success() { return Err(SourceError::Http(format!("{} GET {}", resp.status(), url))); } let items: Vec = resp .json() .await .map_err(|e| SourceError::Parse(e.to_string()))?; if items.is_empty() { break; } // Collect repo names from feed items for item in &items { if let Some(name) = item .get("repo") .and_then(|r| r.get("full_name")) .and_then(Value::as_str) { repos.insert(name.to_string()); } } let events: Vec = items .iter() .filter(|it| { if !filter_user { return true; } it.get("act_user") .and_then(|u| u.get("login")) .and_then(Value::as_str) .map(|login| login.eq_ignore_ascii_case(&self.config.user)) .unwrap_or(false) }) .filter_map(|it| parse_gitea_event(it, &self.config.host)) .collect(); total += self.writer.upsert_events(&events).await?; if items.len() < self.config.per_page as usize { break; } } self.state.touch(state_key).await?; Ok((total, repos)) } /// Fetch language breakdowns for the given repos via the Gitea REST API. async fn fetch_languages(&self, repos: &HashSet) -> Result { let mut total = 0usize; for repo in repos { let url = format!( "https://{}/api/v1/repos/{}/languages", self.config.host, repo ); let req = self.apply_headers(self.client.get(&url)); let resp = req .send() .await .map_err(|e| SourceError::Http(e.to_string()))?; if !resp.status().is_success() { tracing::warn!(repo = %repo, status = %resp.status(), "gitea language fetch failed; skipping"); continue; } let lang_map: std::collections::HashMap = resp .json() .await .map_err(|e| SourceError::Parse(e.to_string()))?; let languages: Vec = lang_map .into_iter() .map(|(language, bytes)| RepoLanguage { source: Source::Gitea, repo: repo.clone(), language, bytes, color: None, // Gitea doesn't return colors }) .collect(); total += self.writer.upsert_repo_languages(&languages).await?; } debug!(total, repos = repos.len(), "gitea repo languages updated"); Ok(total) } } #[async_trait] impl EventSource for GiteaSource { fn name(&self) -> &'static str { SOURCE_NAME } async fn poll(&self) -> Result { let mut all_repos = HashSet::new(); // Poll user's own activity feed (existing behavior). let user_url = self.user_feed_base_url(); let (mut total, repos) = self.poll_feed(SOURCE_NAME, &user_url, false).await?; all_repos.extend(repos); // Discover orgs and poll each org's activity feed, filtering for // events performed by this user. let orgs = self.discover_orgs().await?; for org in &orgs { let state_key = format!("gitea:org:{org}"); let org_url = self.org_feed_base_url(org); match self.poll_feed(&state_key, &org_url, true).await { Ok((n, repos)) => { total += n; all_repos.extend(repos); } Err(e) => { tracing::warn!(org = %org, error = %e, "failed to poll org feed"); } } } if let Err(e) = self.fetch_languages(&all_repos).await { tracing::warn!(error = %e, "gitea language fetch failed; continuing"); } debug!(ingested = total, orgs = orgs.len(), "gitea poll complete"); Ok(total) } } /// Convert a Gitea activity feed item into our Event row. The host gets /// stamped into the payload as `_host` so the reshape layer can build /// web URLs without needing global config. fn parse_gitea_event(item: &Value, host: &str) -> Option { let id = item.get("id").and_then(Value::as_i64)?; let op_type = item.get("op_type").and_then(Value::as_str)?.to_string(); let created_str = item.get("created").and_then(Value::as_str)?; let occurred_at = DateTime::parse_from_rfc3339(created_str) .ok()? .with_timezone(&Utc); let private = item.get("is_private").and_then(Value::as_bool).unwrap_or(false); let mut payload = item.clone(); if let Some(obj) = payload.as_object_mut() { obj.insert("_host".into(), Value::String(host.into())); } Some(Event { id: format!("gitea:{id}"), source: Source::Gitea, action: op_type, occurred_at, public: !private, payload, }) } #[cfg(test)] mod tests { use super::*; use serde_json::json; #[test] fn parse_commit_repo() { let raw = json!({ "id": 973, "op_type": "commit_repo", "ref_name": "refs/heads/main", "is_private": false, "content": "{\"Commits\":[{\"Sha1\":\"abc123\"}],\"Len\":1}", "created": "2026-05-03T16:37:45Z", "repo": { "full_name": "grenade/moments" } }); let ev = parse_gitea_event(&raw, "git.lair.cafe").expect("parses"); assert_eq!(ev.id, "gitea:973"); assert_eq!(ev.source, Source::Gitea); assert_eq!(ev.action, "commit_repo"); assert!(ev.public); // host stamped into payload assert_eq!( ev.payload.get("_host").and_then(|v| v.as_str()), Some("git.lair.cafe") ); } #[test] fn org_event_user_filter_predicate() { let by_user = json!({ "id": 500, "op_type": "commit_repo", "is_private": false, "created": "2026-05-03T10:00:00Z", "act_user": { "login": "grenade" }, "repo": { "full_name": "myorg/somerepo" } }); let by_other = json!({ "id": 501, "op_type": "commit_repo", "is_private": false, "created": "2026-05-03T10:01:00Z", "act_user": { "login": "otherperson" }, "repo": { "full_name": "myorg/somerepo" } }); // Both parse as valid events assert!(parse_gitea_event(&by_user, "git.lair.cafe").is_some()); assert!(parse_gitea_event(&by_other, "git.lair.cafe").is_some()); // The user-filter predicate used by poll_feed let is_user = |item: &Value, user: &str| -> bool { item.get("act_user") .and_then(|u| u.get("login")) .and_then(Value::as_str) .map(|login| login.eq_ignore_ascii_case(user)) .unwrap_or(false) }; assert!(is_user(&by_user, "grenade")); assert!(!is_user(&by_other, "grenade")); // Case-insensitive match assert!(is_user(&by_user, "Grenade")); } #[test] fn private_event_marked_private() { let raw = json!({ "id": 100, "op_type": "commit_repo", "is_private": true, "created": "2026-05-03T00:00:00Z", "repo": { "full_name": "grenade/private" } }); let ev = parse_gitea_event(&raw, "git.lair.cafe").expect("parses"); assert!(!ev.public); } }