diff --git a/crates/moments-data/src/gitea.rs b/crates/moments-data/src/gitea.rs index fe96562..37c34be 100644 --- a/crates/moments-data/src/gitea.rs +++ b/crates/moments-data/src/gitea.rs @@ -71,10 +71,17 @@ impl GiteaSource { } } - fn page_url(&self, page: u32) -> String { + fn user_feed_base_url(&self) -> String { format!( - "https://{}/api/v1/users/{}/activities/feeds?only-performed-by=true&limit={}&page={}", - self.config.host, self.config.user, self.config.per_page, page + "https://{}/api/v1/users/{}/activities/feeds?only-performed-by=true&limit={}", + self.config.host, self.config.user, self.config.per_page + ) + } + + fn org_feed_base_url(&self, org: &str) -> String { + format!( + "https://{}/api/v1/orgs/{}/activities/feeds?limit={}", + self.config.host, org, self.config.per_page ) } @@ -87,22 +94,51 @@ impl GiteaSource { } req } -} -#[async_trait] -impl EventSource for GiteaSource { - fn name(&self) -> &'static str { - SOURCE_NAME + /// Discover organizations the authenticated user belongs to. + /// Returns an empty vec if no token is configured or the request fails. + async fn discover_orgs(&self) -> Result, SourceError> { + if self.config.token.is_none() { + return Ok(vec![]); + } + let url = format!("https://{}/api/v1/user/orgs", self.config.host); + let req = self.apply_headers(self.client.get(&url)); + let resp = req + .send() + .await + .map_err(|e| SourceError::Http(e.to_string()))?; + if !resp.status().is_success() { + tracing::warn!(status = %resp.status(), "failed to discover gitea orgs"); + return Ok(vec![]); + } + let orgs: Vec = resp + .json() + .await + .map_err(|e| SourceError::Parse(e.to_string()))?; + Ok(orgs + .iter() + .filter_map(|o| o.get("username").and_then(Value::as_str).map(String::from)) + .collect()) } - async fn poll(&self) -> Result { - let prior = self.state.load(SOURCE_NAME).await?; + /// Poll a single activity feed, paginating on first run. When `filter_user` + /// is true, only events performed by `self.config.user` are ingested (used + /// for org feeds which contain all members' activity). + /// + /// `base_url` should contain everything except the `&page=N` suffix. + async fn poll_feed( + &self, + state_key: &str, + base_url: &str, + filter_user: bool, + ) -> Result { + let prior = self.state.load(state_key).await?; let first_run = prior.is_none(); let max_pages = if first_run { MAX_BACKFILL_PAGES } else { 1 }; let mut total = 0usize; for page in 1..=max_pages { - let url = self.page_url(page); + let url = format!("{base_url}&page={page}"); let req = self.apply_headers(self.client.get(&url)); let resp = req .send() @@ -121,6 +157,16 @@ impl EventSource for GiteaSource { let events: Vec = items .iter() + .filter(|it| { + if !filter_user { + return true; + } + it.get("act_user") + .and_then(|u| u.get("login")) + .and_then(Value::as_str) + .map(|login| login.eq_ignore_ascii_case(&self.config.user)) + .unwrap_or(false) + }) .filter_map(|it| parse_gitea_event(it, &self.config.host)) .collect(); total += self.writer.upsert_events(&events).await?; @@ -130,8 +176,37 @@ impl EventSource for GiteaSource { } } - self.state.touch(SOURCE_NAME).await?; - debug!(ingested = total, "gitea poll complete"); + self.state.touch(state_key).await?; + Ok(total) + } +} + +#[async_trait] +impl EventSource for GiteaSource { + fn name(&self) -> &'static str { + SOURCE_NAME + } + + async fn poll(&self) -> Result { + // Poll user's own activity feed (existing behavior). + let user_url = self.user_feed_base_url(); + let mut total = self.poll_feed(SOURCE_NAME, &user_url, false).await?; + + // Discover orgs and poll each org's activity feed, filtering for + // events performed by this user. + let orgs = self.discover_orgs().await?; + for org in &orgs { + let state_key = format!("gitea:org:{org}"); + let org_url = self.org_feed_base_url(org); + match self.poll_feed(&state_key, &org_url, true).await { + Ok(n) => total += n, + Err(e) => { + tracing::warn!(org = %org, error = %e, "failed to poll org feed"); + } + } + } + + debug!(ingested = total, orgs = orgs.len(), "gitea poll complete"); Ok(total) } } @@ -191,6 +266,37 @@ mod tests { ); } + #[test] + fn org_event_user_filter_predicate() { + let by_user = json!({ + "id": 500, "op_type": "commit_repo", "is_private": false, + "created": "2026-05-03T10:00:00Z", + "act_user": { "login": "grenade" }, + "repo": { "full_name": "myorg/somerepo" } + }); + let by_other = json!({ + "id": 501, "op_type": "commit_repo", "is_private": false, + "created": "2026-05-03T10:01:00Z", + "act_user": { "login": "otherperson" }, + "repo": { "full_name": "myorg/somerepo" } + }); + // Both parse as valid events + assert!(parse_gitea_event(&by_user, "git.lair.cafe").is_some()); + assert!(parse_gitea_event(&by_other, "git.lair.cafe").is_some()); + // The user-filter predicate used by poll_feed + let is_user = |item: &Value, user: &str| -> bool { + item.get("act_user") + .and_then(|u| u.get("login")) + .and_then(Value::as_str) + .map(|login| login.eq_ignore_ascii_case(user)) + .unwrap_or(false) + }; + assert!(is_user(&by_user, "grenade")); + assert!(!is_user(&by_other, "grenade")); + // Case-insensitive match + assert!(is_user(&by_user, "Grenade")); + } + #[test] fn private_event_marked_private() { let raw = json!({