From f750e8de472fad34e66eba19a07c4479345f2ca2 Mon Sep 17 00:00:00 2001 From: rob thijssen Date: Sun, 3 May 2026 19:41:55 +0300 Subject: [PATCH] feat(worker): add gitea activity feed poller MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Hits /api/v1/users/{user}/activities/feeds?only-performed-by=true on the configured gitea host (default git.lair.cafe). Page-1 polling on a 10-min cadence; first run paginates back through up to 20 pages (1000 items) to seed history. Gitea has no ETag support on this endpoint, so each tick is a fresh fetch — relying on idempotent upsert by `gitea:` for dedup. Reshape covers the gitea op_type set: commit_repo → "pushed N commits to repo:branch" + commits body, parsing the JSON-encoded `content` field push_tag → "tagged X in repo" create_repo → "created repo" rename/transfer/delete_branch/delete_tag/star/fork — straightforward create/close/reopen_issue → "{verb} issue #N in repo: title" create/close/reopen_pull_request → "{verb} pull request #N" merge_pull_request → GitMerge icon comment_issue, comment_pull → markdown body from comment.body approve/reject_pull_request, publish_release fallback for anything else (mirror_sync_*, future op_types) Issue / PR / release events use gitea's pipe-separated `|` content field; pushes have JSON-encoded content. Host stamping: parse_gitea_event injects `_host` into each row's payload so the reshape layer can construct web URLs without a config dependency. Multi-host gitea would still work as long as each source instance has its own host configured. Worker config: GITEA_HOST default git.lair.cafe GITEA_USER default grenade GITEA_TOKEN optional (raises rate limit; required for private repo activity to surface) GITEA_POLL_INTERVAL_SECS default 600 Tests: +2 in moments-data (commit_repo parses, private flag captured), +4 in moments-core (commit_repo with body, create_issue pipe-content, merge icon swap, fallback) — 27 total green. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --- crates/moments-core/src/presentation.rs | 4 +- crates/moments-core/src/presentation/gitea.rs | 496 ++++++++++++++++++ crates/moments-data/src/gitea.rs | 206 ++++++++ crates/moments-data/src/lib.rs | 1 + crates/moments-worker/src/main.rs | 34 +- 5 files changed, 739 insertions(+), 2 deletions(-) create mode 100644 crates/moments-core/src/presentation/gitea.rs create mode 100644 crates/moments-data/src/gitea.rs diff --git a/crates/moments-core/src/presentation.rs b/crates/moments-core/src/presentation.rs index 0321a72..ffbeab5 100644 --- a/crates/moments-core/src/presentation.rs +++ b/crates/moments-core/src/presentation.rs @@ -5,12 +5,14 @@ use moments_entities::{Event, Source, TimelineIcon, TimelineItem, TitleSegment}; +mod gitea; mod github; pub fn reshape(event: &Event) -> TimelineItem { match event.source { Source::Github => github::reshape(event), - Source::Gitea | Source::Hg | Source::Bugzilla => generic_fallback(event), + Source::Gitea => gitea::reshape(event), + Source::Hg | Source::Bugzilla => generic_fallback(event), } } diff --git a/crates/moments-core/src/presentation/gitea.rs b/crates/moments-core/src/presentation/gitea.rs new file mode 100644 index 0000000..77e7d55 --- /dev/null +++ b/crates/moments-core/src/presentation/gitea.rs @@ -0,0 +1,496 @@ +use moments_entities::{ + CommitSummary, Event, Source, TimelineBody, TimelineIcon, TimelineItem, TitleSegment, +}; +use serde_json::Value; + +const FALLBACK_HOST: &str = "git.lair.cafe"; + +pub(crate) fn reshape(event: &Event) -> TimelineItem { + let p = &event.payload; + let host = p + .get("_host") + .and_then(Value::as_str) + .unwrap_or(FALLBACK_HOST); + let repo = p + .get("repo") + .and_then(|r| r.get("full_name")) + .and_then(Value::as_str); + let actor = p + .get("act_user") + .and_then(|u| u.get("login")) + .and_then(Value::as_str); + let ref_name = p.get("ref_name").and_then(Value::as_str); + let content = p.get("content").and_then(Value::as_str); + let comment = p.get("comment"); + + let (icon, title, subtitle, body) = match event.action.as_str() { + "commit_repo" => commit_repo(host, repo, ref_name, content), + "push_tag" => push_tag(host, repo, ref_name), + "create_repo" => create_repo(host, repo), + "rename_repo" => rename_repo(host, repo, content), + "transfer_repo" => transfer_repo(host, repo, content), + "fork_repo" => fork_repo(host, repo), + "delete_branch" => delete_branch(host, repo, ref_name), + "delete_tag" => delete_tag(host, repo, ref_name), + "star_repo" => star_repo(host, repo), + "create_issue" => issue_action("opened", TimelineIcon::Issue, host, repo, content), + "close_issue" => issue_action("closed", TimelineIcon::Issue, host, repo, content), + "reopen_issue" => issue_action("reopened", TimelineIcon::Issue, host, repo, content), + "comment_issue" => comment_on_issue(host, repo, content, comment), + "create_pull_request" => { + pr_action("opened", TimelineIcon::PullRequest, host, repo, content) + } + "close_pull_request" => { + pr_action("closed", TimelineIcon::PullRequest, host, repo, content) + } + "reopen_pull_request" => { + pr_action("reopened", TimelineIcon::PullRequest, host, repo, content) + } + "merge_pull_request" | "auto_merge_pull_request" => { + pr_action("merged", TimelineIcon::GitMerge, host, repo, content) + } + "comment_pull" => comment_on_pr(host, repo, content, comment), + "approve_pull_request" => { + pr_action("approved", TimelineIcon::PullRequest, host, repo, content) + } + "reject_pull_request" => { + pr_action( + "requested changes on", + TimelineIcon::PullRequest, + host, + repo, + content, + ) + } + "publish_release" => publish_release(host, repo, content), + _ => fallback(host, repo, &event.action), + }; + + let title = if let Some(actor_login) = actor { + let mut segs = Vec::with_capacity(title.len() + 2); + segs.push(TitleSegment::link( + actor_login.to_string(), + format!("https://{host}/{actor_login}"), + )); + segs.push(TitleSegment::text(" ")); + segs.extend(title); + segs + } else { + title + }; + + TimelineItem { + id: event.id.clone(), + source: Source::Gitea, + action: event.action.clone(), + occurred_at: event.occurred_at, + icon, + title, + subtitle, + body, + } +} + +type Reshaped = ( + TimelineIcon, + Vec<TitleSegment>, + Option<Vec<TitleSegment>>, + Option<TimelineBody>, +); + +fn repo_link(host: &str, repo: &str) -> TitleSegment { + TitleSegment::link(repo.to_string(), format!("https://{host}/{repo}")) +} + +fn commit_url(host: &str, repo: &str, sha: &str) -> String { + format!("https://{host}/{repo}/commit/{sha}") +} + +fn issue_url(host: &str, repo: &str, index: i64) -> String { + format!("https://{host}/{repo}/issues/{index}") +} + +fn pr_url(host: &str, repo: &str, index: i64) -> String { + format!("https://{host}/{repo}/pulls/{index}") +} + +fn ref_branch(r: &str) -> &str { + r.strip_prefix("refs/heads/").unwrap_or(r) +} + +fn ref_tag(r: &str) -> &str { + r.strip_prefix("refs/tags/").unwrap_or(r) +} + +/// Parse `<index>|<title>` content used by issue / PR / release events. +fn parse_pipe_content(content: Option<&str>) -> Option<(i64, &str)> { + let s = content?; + let (idx_str, title) = s.split_once('|')?; + let idx: i64 = idx_str.parse().ok()?; + Some((idx, title)) +} + +fn commit_repo( + host: &str, + repo: Option<&str>, + ref_name: Option<&str>, + content: Option<&str>, +) -> Reshaped { + let repo = repo.unwrap_or("(unknown repo)"); + let branch = ref_name.map(ref_branch).unwrap_or(""); + + // content is JSON-encoded { Commits, HeadCommit, CompareURL, Len }. + let parsed: Option<Value> = content.and_then(|s| serde_json::from_str(s).ok()); + let commits: Vec<CommitSummary> = parsed + .as_ref() + .and_then(|v| v.get("Commits")) + .and_then(Value::as_array) + .map(|arr| { + arr.iter() + .filter_map(|c| { + let sha = c.get("Sha1").and_then(Value::as_str)?; + let message = c + .get("Message") + .and_then(Value::as_str) + .unwrap_or("") + .lines() + .next() + .unwrap_or("") + .to_string(); + let author = c + .get("AuthorName") + .and_then(Value::as_str) + .map(str::to_string); + Some(CommitSummary { + short_sha: sha.chars().take(7).collect(), + sha: sha.to_string(), + message, + url: commit_url(host, repo, sha), + author, + }) + }) + .collect() + }) + .unwrap_or_default(); + + let count = parsed + .as_ref() + .and_then(|v| v.get("Len")) + .and_then(Value::as_i64) + .unwrap_or(commits.len() as i64); + + let title = if count > 0 { + let plural = if count == 1 { "" } else { "s" }; + vec![ + TitleSegment::text(format!("pushed {count} commit{plural} to ")), + repo_link(host, repo), + TitleSegment::text(format!(":{branch}")), + ] + } else { + vec![ + TitleSegment::text("pushed to "), + repo_link(host, repo), + TitleSegment::text(format!(":{branch}")), + ] + }; + + let body = (!commits.is_empty()).then_some(TimelineBody::Commits { commits }); + (TimelineIcon::GitPush, title, None, body) +} + +fn push_tag(host: &str, repo: Option<&str>, ref_name: Option<&str>) -> Reshaped { + let repo = repo.unwrap_or("(unknown repo)"); + let tag = ref_name.map(ref_tag).unwrap_or(""); + let title = vec![ + TitleSegment::text(format!("tagged {tag} in ")), + repo_link(host, repo), + ]; + (TimelineIcon::Release, title, None, None) +} + +fn create_repo(host: &str, repo: Option<&str>) -> Reshaped { + let repo = repo.unwrap_or("(unknown repo)"); + let title = vec![TitleSegment::text("created "), repo_link(host, repo)]; + (TimelineIcon::GitBranchCreate, title, None, None) +} + +fn rename_repo(host: &str, repo: Option<&str>, content: Option<&str>) -> Reshaped { + let repo = repo.unwrap_or("(unknown repo)"); + let mut title = vec![TitleSegment::text("renamed ")]; + if let Some(old) = content { + title.push(TitleSegment::text(format!("{old} → "))); + } + title.push(repo_link(host, repo)); + (TimelineIcon::Generic, title, None, None) +} + +fn transfer_repo(host: &str, repo: Option<&str>, content: Option<&str>) -> Reshaped { + let repo = repo.unwrap_or("(unknown repo)"); + let mut title = vec![TitleSegment::text("transferred ")]; + if let Some(prev) = content { + title.push(TitleSegment::text(format!("{prev} to "))); + } else { + title.push(TitleSegment::text("to ")); + } + title.push(repo_link(host, repo)); + (TimelineIcon::Generic, title, None, None) +} + +fn fork_repo(host: &str, repo: Option<&str>) -> Reshaped { + let repo = repo.unwrap_or("(unknown repo)"); + let title = vec![TitleSegment::text("forked "), repo_link(host, repo)]; + (TimelineIcon::GitFork, title, None, None) +} + +fn delete_branch(host: &str, repo: Option<&str>, ref_name: Option<&str>) -> Reshaped { + let repo = repo.unwrap_or("(unknown repo)"); + let branch = ref_name.map(ref_branch).unwrap_or(""); + let title = vec![ + TitleSegment::text(format!("deleted branch {branch} in ")), + repo_link(host, repo), + ]; + (TimelineIcon::GitBranchDelete, title, None, None) +} + +fn delete_tag(host: &str, repo: Option<&str>, ref_name: Option<&str>) -> Reshaped { + let repo = repo.unwrap_or("(unknown repo)"); + let tag = ref_name.map(ref_tag).unwrap_or(""); + let title = vec![ + TitleSegment::text(format!("deleted tag {tag} in ")), + repo_link(host, repo), + ]; + (TimelineIcon::GitBranchDelete, title, None, None) +} + +fn star_repo(host: &str, repo: Option<&str>) -> Reshaped { + let repo = repo.unwrap_or("(unknown repo)"); + let title = vec![TitleSegment::text("starred "), repo_link(host, repo)]; + (TimelineIcon::Star, title, None, None) +} + +fn issue_action( + verb: &str, + icon: TimelineIcon, + host: &str, + repo: Option<&str>, + content: Option<&str>, +) -> Reshaped { + let repo = repo.unwrap_or("(unknown repo)"); + let (idx, issue_title) = parse_pipe_content(content).unwrap_or((0, "")); + let title = vec![ + TitleSegment::text(format!("{verb} issue ")), + TitleSegment::link(format!("#{idx}"), issue_url(host, repo, idx)), + TitleSegment::text(" in "), + repo_link(host, repo), + ]; + let subtitle = + (!issue_title.is_empty()).then(|| vec![TitleSegment::text(issue_title.to_string())]); + (icon, title, subtitle, None) +} + +fn pr_action( + verb: &str, + icon: TimelineIcon, + host: &str, + repo: Option<&str>, + content: Option<&str>, +) -> Reshaped { + let repo = repo.unwrap_or("(unknown repo)"); + let (idx, pr_title) = parse_pipe_content(content).unwrap_or((0, "")); + let title = vec![ + TitleSegment::text(format!("{verb} pull request ")), + TitleSegment::link(format!("#{idx}"), pr_url(host, repo, idx)), + TitleSegment::text(" in "), + repo_link(host, repo), + ]; + let subtitle = + (!pr_title.is_empty()).then(|| vec![TitleSegment::text(pr_title.to_string())]); + (icon, title, subtitle, None) +} + +fn comment_on_issue( + host: &str, + repo: Option<&str>, + content: Option<&str>, + comment: Option<&Value>, +) -> Reshaped { + let repo = repo.unwrap_or("(unknown repo)"); + let (idx, issue_title) = parse_pipe_content(content).unwrap_or((0, "")); + let body_text = comment + .and_then(|c| c.get("body")) + .and_then(Value::as_str) + .unwrap_or(""); + let title = vec![ + TitleSegment::text("commented on "), + TitleSegment::link(format!("#{idx}"), issue_url(host, repo, idx)), + TitleSegment::text(" in "), + repo_link(host, repo), + ]; + let subtitle = + (!issue_title.is_empty()).then(|| vec![TitleSegment::text(issue_title.to_string())]); + let body = (!body_text.is_empty()).then(|| TimelineBody::Markdown { + text: body_text.to_string(), + }); + (TimelineIcon::Comment, title, subtitle, body) +} + +fn comment_on_pr( + host: &str, + repo: Option<&str>, + content: Option<&str>, + comment: Option<&Value>, +) -> Reshaped { + let repo = repo.unwrap_or("(unknown repo)"); + let (idx, pr_title) = parse_pipe_content(content).unwrap_or((0, "")); + let body_text = comment + .and_then(|c| c.get("body")) + .and_then(Value::as_str) + .unwrap_or(""); + let title = vec![ + TitleSegment::text("commented on "), + TitleSegment::link(format!("#{idx}"), pr_url(host, repo, idx)), + TitleSegment::text(" in "), + repo_link(host, repo), + ]; + let subtitle = + (!pr_title.is_empty()).then(|| vec![TitleSegment::text(pr_title.to_string())]); + let body = (!body_text.is_empty()).then(|| TimelineBody::Markdown { + text: body_text.to_string(), + }); + (TimelineIcon::Comment, title, subtitle, body) +} + +fn publish_release(host: &str, repo: Option<&str>, content: Option<&str>) -> Reshaped { + let repo = repo.unwrap_or("(unknown repo)"); + let name = content.unwrap_or(""); + let title = if name.is_empty() { + vec![TitleSegment::text("published a release in "), repo_link(host, repo)] + } else { + vec![ + TitleSegment::text(format!("released {name} in ")), + repo_link(host, repo), + ] + }; + (TimelineIcon::Release, title, None, None) +} + +fn fallback(host: &str, repo: Option<&str>, action: &str) -> Reshaped { + let title = match repo { + Some(r) => vec![ + TitleSegment::text(format!("{action} on ")), + repo_link(host, r), + ], + None => vec![TitleSegment::text(action.to_string())], + }; + (TimelineIcon::Generic, title, None, None) +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono::{TimeZone, Utc}; + use serde_json::json; + + fn ev(action: &str, payload: Value) -> Event { + Event { + id: "gitea:1".into(), + source: Source::Gitea, + action: action.into(), + occurred_at: Utc.with_ymd_and_hms(2026, 5, 3, 16, 37, 45).unwrap(), + public: true, + payload, + } + } + + fn render(item: &TimelineItem) -> String { + item.title + .iter() + .map(|s| match s { + TitleSegment::Text { text } => text.clone(), + TitleSegment::Link { text, .. } => text.clone(), + }) + .collect() + } + + #[test] + fn commit_repo_with_commits_body() { + let raw = json!({ + "_host": "git.lair.cafe", + "act_user": { "login": "grenade" }, + "repo": { "full_name": "grenade/moments" }, + "ref_name": "refs/heads/main", + "content": "{\"Commits\":[{\"Sha1\":\"abcdef1234\",\"Message\":\"first\",\"AuthorName\":\"rob\"}],\"Len\":1}" + }); + let item = reshape(&ev("commit_repo", raw)); + assert_eq!(item.icon, TimelineIcon::GitPush); + let r = render(&item); + assert!( + r.contains("pushed 1 commit to grenade/moments:main"), + "got: {r}" + ); + match item.body.unwrap() { + TimelineBody::Commits { commits } => { + assert_eq!(commits.len(), 1); + assert_eq!(commits[0].short_sha, "abcdef1"); + assert_eq!( + commits[0].url, + "https://git.lair.cafe/grenade/moments/commit/abcdef1234" + ); + } + _ => panic!("expected Commits body"), + } + } + + #[test] + fn create_issue_uses_pipe_content() { + let raw = json!({ + "_host": "git.lair.cafe", + "act_user": { "login": "grenade" }, + "repo": { "full_name": "grenade/moments" }, + "content": "1|implement per-repo enumeration for full commit history" + }); + let item = reshape(&ev("create_issue", raw)); + assert_eq!(item.icon, TimelineIcon::Issue); + let r = render(&item); + assert!( + r.contains("opened issue #1 in grenade/moments"), + "got: {r}" + ); + assert_eq!( + item.subtitle.unwrap(), + vec![TitleSegment::text( + "implement per-repo enumeration for full commit history" + )] + ); + } + + #[test] + fn merge_pull_request_uses_merge_icon() { + let raw = json!({ + "_host": "git.lair.cafe", + "act_user": { "login": "grenade" }, + "repo": { "full_name": "grenade/moments" }, + "content": "7|wire it up" + }); + let item = reshape(&ev("merge_pull_request", raw)); + assert_eq!(item.icon, TimelineIcon::GitMerge); + let r = render(&item); + assert!( + r.contains("merged pull request #7 in grenade/moments"), + "got: {r}" + ); + } + + #[test] + fn fallback_for_unknown_op_type() { + let raw = json!({ + "_host": "git.lair.cafe", + "act_user": { "login": "grenade" }, + "repo": { "full_name": "grenade/x" } + }); + let item = reshape(&ev("mirror_sync_push", raw)); + assert_eq!(item.icon, TimelineIcon::Generic); + let r = render(&item); + assert!(r.contains("mirror_sync_push on grenade/x"), "got: {r}"); + } +} diff --git a/crates/moments-data/src/gitea.rs b/crates/moments-data/src/gitea.rs new file mode 100644 index 0000000..fe96562 --- /dev/null +++ b/crates/moments-data/src/gitea.rs @@ -0,0 +1,206 @@ +//! Gitea activity feed ingestion. +//! +//! Hits `/api/v1/users/{user}/activities/feeds?only-performed-by=true` +//! which returns events the user themselves caused (not received events +//! from others they follow). No ETag support upstream, so each tick fetches +//! page 1 and relies on idempotent upsert. First run paginates further to +//! seed history. +//! +//! Each item carries a self-contained payload — including the event-emitting +//! host — so the reshape layer can construct URLs without needing config. + +use std::sync::Arc; + +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use moments_core::{EventSource, EventWriter, PollerStateStore, SourceError}; +use moments_entities::{Event, Source}; +use reqwest::{Client, header}; +use serde_json::Value; +use tracing::debug; + +const SOURCE_NAME: &str = "gitea"; +const USER_AGENT: &str = concat!( + "moments/", + env!("CARGO_PKG_VERSION"), + " (+https://rob.tn)" +); +const MAX_BACKFILL_PAGES: u32 = 20; + +#[derive(Clone, Debug)] +pub struct GiteaConfig { + /// e.g. `git.lair.cafe`. Used to construct URLs the API doesn't return + /// directly (issue / PR / commit web links) and stamped into each event + /// payload for the reshape layer. + pub host: String, + pub user: String, + pub token: Option<String>, + pub per_page: u32, +} + +impl Default for GiteaConfig { + fn default() -> Self { + Self { + host: "git.lair.cafe".into(), + user: "grenade".into(), + token: None, + per_page: 50, + } + } +} + +pub struct GiteaSource { + client: Client, + writer: Arc<dyn EventWriter>, + state: Arc<dyn PollerStateStore>, + config: GiteaConfig, +} + +impl GiteaSource { + pub fn new( + client: Client, + writer: Arc<dyn EventWriter>, + state: Arc<dyn PollerStateStore>, + config: GiteaConfig, + ) -> Self { + Self { + client, + writer, + state, + config, + } + } + + fn page_url(&self, page: u32) -> String { + format!( + "https://{}/api/v1/users/{}/activities/feeds?only-performed-by=true&limit={}&page={}", + self.config.host, self.config.user, self.config.per_page, page + ) + } + + fn apply_headers(&self, mut req: reqwest::RequestBuilder) -> reqwest::RequestBuilder { + req = req + .header(header::ACCEPT, "application/json") + .header(header::USER_AGENT, USER_AGENT); + if let Some(token) = &self.config.token { + req = req.header(header::AUTHORIZATION, format!("token {token}")); + } + req + } +} + +#[async_trait] +impl EventSource for GiteaSource { + fn name(&self) -> &'static str { + SOURCE_NAME + } + + async fn poll(&self) -> Result<usize, SourceError> { + let prior = self.state.load(SOURCE_NAME).await?; + let first_run = prior.is_none(); + let max_pages = if first_run { MAX_BACKFILL_PAGES } else { 1 }; + + let mut total = 0usize; + for page in 1..=max_pages { + let url = self.page_url(page); + let req = self.apply_headers(self.client.get(&url)); + let resp = req + .send() + .await + .map_err(|e| SourceError::Http(e.to_string()))?; + if !resp.status().is_success() { + return Err(SourceError::Http(format!("{} GET {}", resp.status(), url))); + } + let items: Vec<Value> = resp + .json() + .await + .map_err(|e| SourceError::Parse(e.to_string()))?; + if items.is_empty() { + break; + } + + let events: Vec<Event> = items + .iter() + .filter_map(|it| parse_gitea_event(it, &self.config.host)) + .collect(); + total += self.writer.upsert_events(&events).await?; + + if items.len() < self.config.per_page as usize { + break; + } + } + + self.state.touch(SOURCE_NAME).await?; + debug!(ingested = total, "gitea poll complete"); + Ok(total) + } +} + +/// Convert a Gitea activity feed item into our Event row. The host gets +/// stamped into the payload as `_host` so the reshape layer can build +/// web URLs without needing global config. +fn parse_gitea_event(item: &Value, host: &str) -> Option<Event> { + let id = item.get("id").and_then(Value::as_i64)?; + let op_type = item.get("op_type").and_then(Value::as_str)?.to_string(); + let created_str = item.get("created").and_then(Value::as_str)?; + let occurred_at = DateTime::parse_from_rfc3339(created_str) + .ok()? + .with_timezone(&Utc); + let private = item.get("is_private").and_then(Value::as_bool).unwrap_or(false); + + let mut payload = item.clone(); + if let Some(obj) = payload.as_object_mut() { + obj.insert("_host".into(), Value::String(host.into())); + } + + Some(Event { + id: format!("gitea:{id}"), + source: Source::Gitea, + action: op_type, + occurred_at, + public: !private, + payload, + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + #[test] + fn parse_commit_repo() { + let raw = json!({ + "id": 973, + "op_type": "commit_repo", + "ref_name": "refs/heads/main", + "is_private": false, + "content": "{\"Commits\":[{\"Sha1\":\"abc123\"}],\"Len\":1}", + "created": "2026-05-03T16:37:45Z", + "repo": { "full_name": "grenade/moments" } + }); + let ev = parse_gitea_event(&raw, "git.lair.cafe").expect("parses"); + assert_eq!(ev.id, "gitea:973"); + assert_eq!(ev.source, Source::Gitea); + assert_eq!(ev.action, "commit_repo"); + assert!(ev.public); + // host stamped into payload + assert_eq!( + ev.payload.get("_host").and_then(|v| v.as_str()), + Some("git.lair.cafe") + ); + } + + #[test] + fn private_event_marked_private() { + let raw = json!({ + "id": 100, + "op_type": "commit_repo", + "is_private": true, + "created": "2026-05-03T00:00:00Z", + "repo": { "full_name": "grenade/private" } + }); + let ev = parse_gitea_event(&raw, "git.lair.cafe").expect("parses"); + assert!(!ev.public); + } +} diff --git a/crates/moments-data/src/lib.rs b/crates/moments-data/src/lib.rs index 789d5eb..e828076 100644 --- a/crates/moments-data/src/lib.rs +++ b/crates/moments-data/src/lib.rs @@ -1,3 +1,4 @@ +pub mod gitea; pub mod github; pub mod github_search; diff --git a/crates/moments-worker/src/main.rs b/crates/moments-worker/src/main.rs index 4477087..2293330 100644 --- a/crates/moments-worker/src/main.rs +++ b/crates/moments-worker/src/main.rs @@ -4,6 +4,7 @@ use clap::Parser; use moments_core::{EventSource, run_poller}; use moments_data::{ PgStore, + gitea::{GiteaConfig, GiteaSource}, github::{GithubConfig, GithubSource}, github_search::{GithubSearchConfig, GithubSearchSource}, }; @@ -31,6 +32,19 @@ struct Args { /// Defaults to 24h — this is a backfill, not a live feed. #[arg(long, env = "SEARCH_POLL_INTERVAL_SECS", default_value = "86400")] search_interval_secs: u64, + + #[arg(long, env = "GITEA_HOST", default_value = "git.lair.cafe")] + gitea_host: String, + + #[arg(long, env = "GITEA_USER", default_value = "grenade")] + gitea_user: String, + + #[arg(long, env = "GITEA_TOKEN")] + gitea_token: Option<String>, + + /// Seconds between Gitea activity-feed polls. + #[arg(long, env = "GITEA_POLL_INTERVAL_SECS", default_value = "600")] + gitea_interval_secs: u64, } #[tokio::main] @@ -67,24 +81,42 @@ async fn main() -> anyhow::Result<()> { }, )) as Arc<dyn EventSource>; + let gitea = Arc::new(GiteaSource::new( + http.clone(), + store.clone(), + store.clone(), + GiteaConfig { + host: args.gitea_host.clone(), + user: args.gitea_user.clone(), + token: args.gitea_token.clone(), + ..Default::default() + }, + )) as Arc<dyn EventSource>; + info!( github_user = args.github_user, - interval_secs = args.interval_secs, + gitea_host = args.gitea_host, + gitea_user = args.gitea_user, + events_interval_secs = args.interval_secs, search_interval_secs = args.search_interval_secs, + gitea_interval_secs = args.gitea_interval_secs, "worker started" ); let interval = Duration::from_secs(args.interval_secs); let search_interval = Duration::from_secs(args.search_interval_secs); + let gitea_interval = Duration::from_secs(args.gitea_interval_secs); let github_task = tokio::spawn(async move { run_poller(github, interval).await }); let github_search_task = tokio::spawn(async move { run_poller(github_search, search_interval).await }); + let gitea_task = tokio::spawn(async move { run_poller(gitea, gitea_interval).await }); tokio::signal::ctrl_c().await?; info!("shutdown signal received"); github_task.abort(); github_search_task.abort(); + gitea_task.abort(); Ok(()) }