feat(worker): add gitea activity feed poller

Hits /api/v1/users/{user}/activities/feeds?only-performed-by=true
on the configured gitea host (default git.lair.cafe). Page-1 polling
on a 10-min cadence; first run paginates back through up to 20
pages (1000 items) to seed history.

Gitea has no ETag support on this endpoint, so each tick is a fresh
fetch — relying on idempotent upsert by `gitea:<id>` for dedup.

Reshape covers the gitea op_type set:
  commit_repo  → "pushed N commits to repo:branch" + commits body,
                  parsing the JSON-encoded `content` field
  push_tag     → "tagged X in repo"
  create_repo  → "created repo"
  rename/transfer/delete_branch/delete_tag/star/fork — straightforward
  create/close/reopen_issue        → "{verb} issue #N in repo: title"
  create/close/reopen_pull_request → "{verb} pull request #N"
  merge_pull_request               → GitMerge icon
  comment_issue, comment_pull      → markdown body from comment.body
  approve/reject_pull_request, publish_release
  fallback for anything else (mirror_sync_*, future op_types)

Issue / PR / release events use gitea's pipe-separated
`<index>|<title>` content field; pushes have JSON-encoded content.

Host stamping: parse_gitea_event injects `_host` into each row's
payload so the reshape layer can construct web URLs without a
config dependency. Multi-host gitea would still work as long as
each source instance has its own host configured.

Worker config:
  GITEA_HOST                  default git.lair.cafe
  GITEA_USER                  default grenade
  GITEA_TOKEN                 optional (raises rate limit; required
                                for private repo activity to surface)
  GITEA_POLL_INTERVAL_SECS    default 600

Tests: +2 in moments-data (commit_repo parses, private flag
captured), +4 in moments-core (commit_repo with body, create_issue
pipe-content, merge icon swap, fallback) — 27 total green.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-03 19:41:55 +03:00
parent 4355353395
commit f750e8de47
5 changed files with 739 additions and 2 deletions

View File

@@ -0,0 +1,206 @@
//! Gitea activity feed ingestion.
//!
//! Hits `/api/v1/users/{user}/activities/feeds?only-performed-by=true`
//! which returns events the user themselves caused (not received events
//! from others they follow). No ETag support upstream, so each tick fetches
//! page 1 and relies on idempotent upsert. First run paginates further to
//! seed history.
//!
//! Each item carries a self-contained payload — including the event-emitting
//! host — so the reshape layer can construct URLs without needing config.
use std::sync::Arc;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use moments_core::{EventSource, EventWriter, PollerStateStore, SourceError};
use moments_entities::{Event, Source};
use reqwest::{Client, header};
use serde_json::Value;
use tracing::debug;
const SOURCE_NAME: &str = "gitea";
const USER_AGENT: &str = concat!(
"moments/",
env!("CARGO_PKG_VERSION"),
" (+https://rob.tn)"
);
const MAX_BACKFILL_PAGES: u32 = 20;
#[derive(Clone, Debug)]
pub struct GiteaConfig {
/// e.g. `git.lair.cafe`. Used to construct URLs the API doesn't return
/// directly (issue / PR / commit web links) and stamped into each event
/// payload for the reshape layer.
pub host: String,
pub user: String,
pub token: Option<String>,
pub per_page: u32,
}
impl Default for GiteaConfig {
fn default() -> Self {
Self {
host: "git.lair.cafe".into(),
user: "grenade".into(),
token: None,
per_page: 50,
}
}
}
pub struct GiteaSource {
client: Client,
writer: Arc<dyn EventWriter>,
state: Arc<dyn PollerStateStore>,
config: GiteaConfig,
}
impl GiteaSource {
pub fn new(
client: Client,
writer: Arc<dyn EventWriter>,
state: Arc<dyn PollerStateStore>,
config: GiteaConfig,
) -> Self {
Self {
client,
writer,
state,
config,
}
}
fn page_url(&self, page: u32) -> String {
format!(
"https://{}/api/v1/users/{}/activities/feeds?only-performed-by=true&limit={}&page={}",
self.config.host, self.config.user, self.config.per_page, page
)
}
fn apply_headers(&self, mut req: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
req = req
.header(header::ACCEPT, "application/json")
.header(header::USER_AGENT, USER_AGENT);
if let Some(token) = &self.config.token {
req = req.header(header::AUTHORIZATION, format!("token {token}"));
}
req
}
}
#[async_trait]
impl EventSource for GiteaSource {
fn name(&self) -> &'static str {
SOURCE_NAME
}
async fn poll(&self) -> Result<usize, SourceError> {
let prior = self.state.load(SOURCE_NAME).await?;
let first_run = prior.is_none();
let max_pages = if first_run { MAX_BACKFILL_PAGES } else { 1 };
let mut total = 0usize;
for page in 1..=max_pages {
let url = self.page_url(page);
let req = self.apply_headers(self.client.get(&url));
let resp = req
.send()
.await
.map_err(|e| SourceError::Http(e.to_string()))?;
if !resp.status().is_success() {
return Err(SourceError::Http(format!("{} GET {}", resp.status(), url)));
}
let items: Vec<Value> = resp
.json()
.await
.map_err(|e| SourceError::Parse(e.to_string()))?;
if items.is_empty() {
break;
}
let events: Vec<Event> = items
.iter()
.filter_map(|it| parse_gitea_event(it, &self.config.host))
.collect();
total += self.writer.upsert_events(&events).await?;
if items.len() < self.config.per_page as usize {
break;
}
}
self.state.touch(SOURCE_NAME).await?;
debug!(ingested = total, "gitea poll complete");
Ok(total)
}
}
/// Convert a Gitea activity feed item into our Event row. The host gets
/// stamped into the payload as `_host` so the reshape layer can build
/// web URLs without needing global config.
fn parse_gitea_event(item: &Value, host: &str) -> Option<Event> {
let id = item.get("id").and_then(Value::as_i64)?;
let op_type = item.get("op_type").and_then(Value::as_str)?.to_string();
let created_str = item.get("created").and_then(Value::as_str)?;
let occurred_at = DateTime::parse_from_rfc3339(created_str)
.ok()?
.with_timezone(&Utc);
let private = item.get("is_private").and_then(Value::as_bool).unwrap_or(false);
let mut payload = item.clone();
if let Some(obj) = payload.as_object_mut() {
obj.insert("_host".into(), Value::String(host.into()));
}
Some(Event {
id: format!("gitea:{id}"),
source: Source::Gitea,
action: op_type,
occurred_at,
public: !private,
payload,
})
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn parse_commit_repo() {
let raw = json!({
"id": 973,
"op_type": "commit_repo",
"ref_name": "refs/heads/main",
"is_private": false,
"content": "{\"Commits\":[{\"Sha1\":\"abc123\"}],\"Len\":1}",
"created": "2026-05-03T16:37:45Z",
"repo": { "full_name": "grenade/moments" }
});
let ev = parse_gitea_event(&raw, "git.lair.cafe").expect("parses");
assert_eq!(ev.id, "gitea:973");
assert_eq!(ev.source, Source::Gitea);
assert_eq!(ev.action, "commit_repo");
assert!(ev.public);
// host stamped into payload
assert_eq!(
ev.payload.get("_host").and_then(|v| v.as_str()),
Some("git.lair.cafe")
);
}
#[test]
fn private_event_marked_private() {
let raw = json!({
"id": 100,
"op_type": "commit_repo",
"is_private": true,
"created": "2026-05-03T00:00:00Z",
"repo": { "full_name": "grenade/private" }
});
let ev = parse_gitea_event(&raw, "git.lair.cafe").expect("parses");
assert!(!ev.public);
}
}

View File

@@ -1,3 +1,4 @@
pub mod gitea;
pub mod github;
pub mod github_search;