feat(gitea): poll org activity feeds to capture cross-namespace events
The user activity feed only returns events from the user's own namespace. This adds org discovery via /api/v1/user/orgs and polls each org's activity feed, filtering for events by the configured user. Per-org poller state keys enable independent backfill. Org feed errors are non-fatal to avoid disrupting the user feed poll. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -71,10 +71,17 @@ impl GiteaSource {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn page_url(&self, page: u32) -> String {
|
fn user_feed_base_url(&self) -> String {
|
||||||
format!(
|
format!(
|
||||||
"https://{}/api/v1/users/{}/activities/feeds?only-performed-by=true&limit={}&page={}",
|
"https://{}/api/v1/users/{}/activities/feeds?only-performed-by=true&limit={}",
|
||||||
self.config.host, self.config.user, self.config.per_page, page
|
self.config.host, self.config.user, self.config.per_page
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn org_feed_base_url(&self, org: &str) -> String {
|
||||||
|
format!(
|
||||||
|
"https://{}/api/v1/orgs/{}/activities/feeds?limit={}",
|
||||||
|
self.config.host, org, self.config.per_page
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -87,22 +94,51 @@ impl GiteaSource {
|
|||||||
}
|
}
|
||||||
req
|
req
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Discover organizations the authenticated user belongs to.
|
||||||
|
/// Returns an empty vec if no token is configured or the request fails.
|
||||||
|
async fn discover_orgs(&self) -> Result<Vec<String>, SourceError> {
|
||||||
|
if self.config.token.is_none() {
|
||||||
|
return Ok(vec![]);
|
||||||
|
}
|
||||||
|
let url = format!("https://{}/api/v1/user/orgs", self.config.host);
|
||||||
|
let req = self.apply_headers(self.client.get(&url));
|
||||||
|
let resp = req
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.map_err(|e| SourceError::Http(e.to_string()))?;
|
||||||
|
if !resp.status().is_success() {
|
||||||
|
tracing::warn!(status = %resp.status(), "failed to discover gitea orgs");
|
||||||
|
return Ok(vec![]);
|
||||||
|
}
|
||||||
|
let orgs: Vec<Value> = resp
|
||||||
|
.json()
|
||||||
|
.await
|
||||||
|
.map_err(|e| SourceError::Parse(e.to_string()))?;
|
||||||
|
Ok(orgs
|
||||||
|
.iter()
|
||||||
|
.filter_map(|o| o.get("username").and_then(Value::as_str).map(String::from))
|
||||||
|
.collect())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
/// Poll a single activity feed, paginating on first run. When `filter_user`
|
||||||
impl EventSource for GiteaSource {
|
/// is true, only events performed by `self.config.user` are ingested (used
|
||||||
fn name(&self) -> &'static str {
|
/// for org feeds which contain all members' activity).
|
||||||
SOURCE_NAME
|
///
|
||||||
}
|
/// `base_url` should contain everything except the `&page=N` suffix.
|
||||||
|
async fn poll_feed(
|
||||||
async fn poll(&self) -> Result<usize, SourceError> {
|
&self,
|
||||||
let prior = self.state.load(SOURCE_NAME).await?;
|
state_key: &str,
|
||||||
|
base_url: &str,
|
||||||
|
filter_user: bool,
|
||||||
|
) -> Result<usize, SourceError> {
|
||||||
|
let prior = self.state.load(state_key).await?;
|
||||||
let first_run = prior.is_none();
|
let first_run = prior.is_none();
|
||||||
let max_pages = if first_run { MAX_BACKFILL_PAGES } else { 1 };
|
let max_pages = if first_run { MAX_BACKFILL_PAGES } else { 1 };
|
||||||
|
|
||||||
let mut total = 0usize;
|
let mut total = 0usize;
|
||||||
for page in 1..=max_pages {
|
for page in 1..=max_pages {
|
||||||
let url = self.page_url(page);
|
let url = format!("{base_url}&page={page}");
|
||||||
let req = self.apply_headers(self.client.get(&url));
|
let req = self.apply_headers(self.client.get(&url));
|
||||||
let resp = req
|
let resp = req
|
||||||
.send()
|
.send()
|
||||||
@@ -121,6 +157,16 @@ impl EventSource for GiteaSource {
|
|||||||
|
|
||||||
let events: Vec<Event> = items
|
let events: Vec<Event> = items
|
||||||
.iter()
|
.iter()
|
||||||
|
.filter(|it| {
|
||||||
|
if !filter_user {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
it.get("act_user")
|
||||||
|
.and_then(|u| u.get("login"))
|
||||||
|
.and_then(Value::as_str)
|
||||||
|
.map(|login| login.eq_ignore_ascii_case(&self.config.user))
|
||||||
|
.unwrap_or(false)
|
||||||
|
})
|
||||||
.filter_map(|it| parse_gitea_event(it, &self.config.host))
|
.filter_map(|it| parse_gitea_event(it, &self.config.host))
|
||||||
.collect();
|
.collect();
|
||||||
total += self.writer.upsert_events(&events).await?;
|
total += self.writer.upsert_events(&events).await?;
|
||||||
@@ -130,8 +176,37 @@ impl EventSource for GiteaSource {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
self.state.touch(SOURCE_NAME).await?;
|
self.state.touch(state_key).await?;
|
||||||
debug!(ingested = total, "gitea poll complete");
|
Ok(total)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl EventSource for GiteaSource {
|
||||||
|
fn name(&self) -> &'static str {
|
||||||
|
SOURCE_NAME
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn poll(&self) -> Result<usize, SourceError> {
|
||||||
|
// Poll user's own activity feed (existing behavior).
|
||||||
|
let user_url = self.user_feed_base_url();
|
||||||
|
let mut total = self.poll_feed(SOURCE_NAME, &user_url, false).await?;
|
||||||
|
|
||||||
|
// Discover orgs and poll each org's activity feed, filtering for
|
||||||
|
// events performed by this user.
|
||||||
|
let orgs = self.discover_orgs().await?;
|
||||||
|
for org in &orgs {
|
||||||
|
let state_key = format!("gitea:org:{org}");
|
||||||
|
let org_url = self.org_feed_base_url(org);
|
||||||
|
match self.poll_feed(&state_key, &org_url, true).await {
|
||||||
|
Ok(n) => total += n,
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!(org = %org, error = %e, "failed to poll org feed");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
debug!(ingested = total, orgs = orgs.len(), "gitea poll complete");
|
||||||
Ok(total)
|
Ok(total)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -191,6 +266,37 @@ mod tests {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn org_event_user_filter_predicate() {
|
||||||
|
let by_user = json!({
|
||||||
|
"id": 500, "op_type": "commit_repo", "is_private": false,
|
||||||
|
"created": "2026-05-03T10:00:00Z",
|
||||||
|
"act_user": { "login": "grenade" },
|
||||||
|
"repo": { "full_name": "myorg/somerepo" }
|
||||||
|
});
|
||||||
|
let by_other = json!({
|
||||||
|
"id": 501, "op_type": "commit_repo", "is_private": false,
|
||||||
|
"created": "2026-05-03T10:01:00Z",
|
||||||
|
"act_user": { "login": "otherperson" },
|
||||||
|
"repo": { "full_name": "myorg/somerepo" }
|
||||||
|
});
|
||||||
|
// Both parse as valid events
|
||||||
|
assert!(parse_gitea_event(&by_user, "git.lair.cafe").is_some());
|
||||||
|
assert!(parse_gitea_event(&by_other, "git.lair.cafe").is_some());
|
||||||
|
// The user-filter predicate used by poll_feed
|
||||||
|
let is_user = |item: &Value, user: &str| -> bool {
|
||||||
|
item.get("act_user")
|
||||||
|
.and_then(|u| u.get("login"))
|
||||||
|
.and_then(Value::as_str)
|
||||||
|
.map(|login| login.eq_ignore_ascii_case(user))
|
||||||
|
.unwrap_or(false)
|
||||||
|
};
|
||||||
|
assert!(is_user(&by_user, "grenade"));
|
||||||
|
assert!(!is_user(&by_other, "grenade"));
|
||||||
|
// Case-insensitive match
|
||||||
|
assert!(is_user(&by_user, "Grenade"));
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn private_event_marked_private() {
|
fn private_event_marked_private() {
|
||||||
let raw = json!({
|
let raw = json!({
|
||||||
|
|||||||
Reference in New Issue
Block a user