From 86411bb88efd90d2609166bf5f2bf73750b064f3 Mon Sep 17 00:00:00 2001 From: rob thijssen Date: Wed, 20 May 2026 07:53:43 +0300 Subject: [PATCH] fix(worker): dedup gitea events from overlapping user and org feeds MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Gitea writes one Action row per interested user-context. A push to an org repo by user U produces two rows — one with user_id=U, one with user_id=org — differing only in `id` and `user_id`. Polling both the user feed and org feeds (which we do, and need to, since neither alone catches every cross-namespace event) surfaced both rows; the `gitea:{action_row_id}` id gave them distinct ids, so the upsert dedup never fired and ~38% of events on org-repo project pages rendered twice. Switch to a content-derived id keyed on (op_type, act_user_id, repo_id, ref_name, comment_id, created) so the two rows collide on upsert, and add a migration that re-keys existing rows to the same formula while collapsing the duplicates already in the table. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../migrations/0005_dedup_gitea_events.sql | 55 ++++++++++++++ crates/moments-data/src/gitea.rs | 74 ++++++++++++++++++- 2 files changed, 125 insertions(+), 4 deletions(-) create mode 100644 crates/moments-data/migrations/0005_dedup_gitea_events.sql diff --git a/crates/moments-data/migrations/0005_dedup_gitea_events.sql b/crates/moments-data/migrations/0005_dedup_gitea_events.sql new file mode 100644 index 0000000..6d7370e --- /dev/null +++ b/crates/moments-data/migrations/0005_dedup_gitea_events.sql @@ -0,0 +1,55 @@ +-- Collapse duplicate Gitea events introduced by polling both the user +-- activity feed and per-org activity feeds. +-- +-- Gitea writes one Action row per interested user-context: a push to +-- helexa/cortex by user grenade produces two rows, one with +-- user_id=grenade and one with user_id=helexa. Everything else (op_type, +-- act_user_id, repo_id, ref_name, comment_id, created) is identical. +-- Our prior id scheme (gitea:{action_row_id}) gave them different ids, +-- so the upsert-on-id dedup never fired and the timeline rendered each +-- push twice. +-- +-- This migration re-keys every existing gitea row to the same canonical +-- formula `parse_gitea_event` now emits, deleting duplicates encountered +-- along the way. Idempotent: running it again is a no-op because the +-- canonical id of a canonical id is itself. + +-- Snapshot the canonical id for every gitea row. +CREATE TEMP TABLE _gitea_canonical AS +SELECT + id AS old_id, + 'gitea:' + || coalesce(payload->>'op_type', '') || ':' + || coalesce(payload->>'act_user_id', payload->'act_user'->>'id', '0') || ':' + || coalesce(payload->>'repo_id', payload->'repo'->>'id', '0') || ':' + || coalesce(payload->>'ref_name', '') || ':' + || coalesce(payload->>'comment_id', '0') || ':' + || coalesce(payload->>'created', '') + AS new_id +FROM events +WHERE source = 'gitea'; + +-- For each canonical id, keep the row whose current id is lexicographically +-- smallest (stable, arbitrary tie-break) and delete the rest. The "old id +-- already matches the new id" case lands here too — DELETE skips it because +-- rn = 1 for any singleton group. +DELETE FROM events +WHERE id IN ( + SELECT old_id FROM ( + SELECT old_id, new_id, + row_number() OVER (PARTITION BY new_id ORDER BY old_id) AS rn + FROM _gitea_canonical + ) ranked + WHERE rn > 1 +); + +-- Rename remaining rows to the canonical id. Postgres defers PK uniqueness +-- to statement end, so swapping ids across rows in one UPDATE is fine +-- provided the final set is unique (dedup above guarantees that). +UPDATE events e +SET id = c.new_id +FROM _gitea_canonical c +WHERE e.id = c.old_id + AND e.id <> c.new_id; + +DROP TABLE _gitea_canonical; diff --git a/crates/moments-data/src/gitea.rs b/crates/moments-data/src/gitea.rs index bb39fcf..66544c6 100644 --- a/crates/moments-data/src/gitea.rs +++ b/crates/moments-data/src/gitea.rs @@ -276,8 +276,14 @@ impl EventSource for GiteaSource { /// Convert a Gitea activity feed item into our Event row. The host gets /// stamped into the payload as `_host` so the reshape layer can build /// web URLs without needing global config. +/// +/// The id is content-derived rather than using Gitea's `id` field directly: +/// Gitea creates one Action row per interested user-context, so a push to +/// an org repo by user U produces two rows (one under U's context, one +/// under the org's), distinguished only by `id` and `user_id`. Keying on +/// `(op_type, act_user_id, repo_id, ref_name, comment_id, created)` makes +/// those two rows collapse to the same event on upsert. fn parse_gitea_event(item: &Value, host: &str) -> Option { - let id = item.get("id").and_then(Value::as_i64)?; let op_type = item.get("op_type").and_then(Value::as_str)?.to_string(); let created_str = item.get("created").and_then(Value::as_str)?; let occurred_at = DateTime::parse_from_rfc3339(created_str) @@ -285,13 +291,15 @@ fn parse_gitea_event(item: &Value, host: &str) -> Option { .with_timezone(&Utc); let private = item.get("is_private").and_then(Value::as_bool).unwrap_or(false); + let id = gitea_canonical_id(item, &op_type, created_str); + let mut payload = item.clone(); if let Some(obj) = payload.as_object_mut() { obj.insert("_host".into(), Value::String(host.into())); } Some(Event { - id: format!("gitea:{id}"), + id, source: Source::Gitea, action: op_type, occurred_at, @@ -300,6 +308,25 @@ fn parse_gitea_event(item: &Value, host: &str) -> Option { }) } +/// Build the canonical, content-derived id for a Gitea action. Must stay +/// in lockstep with the SQL formula in migration 0005 so back-fill and +/// new writes share the same id space. +fn gitea_canonical_id(item: &Value, op_type: &str, created: &str) -> String { + let act_user_id = item + .get("act_user_id") + .and_then(Value::as_i64) + .or_else(|| item.get("act_user").and_then(|u| u.get("id")).and_then(Value::as_i64)) + .unwrap_or(0); + let repo_id = item + .get("repo_id") + .and_then(Value::as_i64) + .or_else(|| item.get("repo").and_then(|r| r.get("id")).and_then(Value::as_i64)) + .unwrap_or(0); + let ref_name = item.get("ref_name").and_then(Value::as_str).unwrap_or(""); + let comment_id = item.get("comment_id").and_then(Value::as_i64).unwrap_or(0); + format!("gitea:{op_type}:{act_user_id}:{repo_id}:{ref_name}:{comment_id}:{created}") +} + #[cfg(test)] mod tests { use super::*; @@ -310,14 +337,16 @@ mod tests { let raw = json!({ "id": 973, "op_type": "commit_repo", + "act_user_id": 42, + "repo_id": 7, "ref_name": "refs/heads/main", "is_private": false, "content": "{\"Commits\":[{\"Sha1\":\"abc123\"}],\"Len\":1}", "created": "2026-05-03T16:37:45Z", - "repo": { "full_name": "grenade/moments" } + "repo": { "id": 7, "full_name": "grenade/moments" } }); let ev = parse_gitea_event(&raw, "git.lair.cafe").expect("parses"); - assert_eq!(ev.id, "gitea:973"); + assert_eq!(ev.id, "gitea:commit_repo:42:7:refs/heads/main:0:2026-05-03T16:37:45Z"); assert_eq!(ev.source, Source::Gitea); assert_eq!(ev.action, "commit_repo"); assert!(ev.public); @@ -328,6 +357,43 @@ mod tests { ); } + #[test] + fn dup_action_rows_for_user_and_org_contexts_collapse_to_same_id() { + // Gitea creates two Action rows when grenade pushes to helexa/cortex: + // one with user_id=grenade (surfaced by the user feed), one with + // user_id=helexa (surfaced by the org feed). Everything except `id` + // and `user_id` is identical. The canonical id ignores both. + let user_ctx = json!({ + "id": 1322, + "user_id": 42, + "op_type": "commit_repo", + "act_user_id": 42, + "act_user": { "login": "grenade", "id": 42 }, + "repo_id": 99, + "repo": { "id": 99, "full_name": "helexa/cortex" }, + "ref_name": "refs/heads/main", + "comment_id": 0, + "is_private": false, + "created": "2026-05-20T04:32:50Z" + }); + let org_ctx = json!({ + "id": 1323, + "user_id": 7, + "op_type": "commit_repo", + "act_user_id": 42, + "act_user": { "login": "grenade", "id": 42 }, + "repo_id": 99, + "repo": { "id": 99, "full_name": "helexa/cortex" }, + "ref_name": "refs/heads/main", + "comment_id": 0, + "is_private": false, + "created": "2026-05-20T04:32:50Z" + }); + let a = parse_gitea_event(&user_ctx, "git.lair.cafe").expect("parses"); + let b = parse_gitea_event(&org_ctx, "git.lair.cafe").expect("parses"); + assert_eq!(a.id, b.id, "duplicate action rows must collide on id"); + } + #[test] fn org_event_user_filter_predicate() { let by_user = json!({