fix(worker): dedup gitea events from overlapping user and org feeds
Gitea writes one Action row per interested user-context. A push to an
org repo by user U produces two rows — one with user_id=U, one with
user_id=org — differing only in `id` and `user_id`. Polling both the
user feed and org feeds (which we do, and need to, since neither alone
catches every cross-namespace event) surfaced both rows; the
`gitea:{action_row_id}` id gave them distinct ids, so the upsert dedup
never fired and ~38% of events on org-repo project pages rendered
twice. Switch to a content-derived id keyed on (op_type, act_user_id,
repo_id, ref_name, comment_id, created) so the two rows collide on
upsert, and add a migration that re-keys existing rows to the same
formula while collapsing the duplicates already in the table.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
55
crates/moments-data/migrations/0005_dedup_gitea_events.sql
Normal file
55
crates/moments-data/migrations/0005_dedup_gitea_events.sql
Normal file
@@ -0,0 +1,55 @@
|
||||
-- Collapse duplicate Gitea events introduced by polling both the user
|
||||
-- activity feed and per-org activity feeds.
|
||||
--
|
||||
-- Gitea writes one Action row per interested user-context: a push to
|
||||
-- helexa/cortex by user grenade produces two rows, one with
|
||||
-- user_id=grenade and one with user_id=helexa. Everything else (op_type,
|
||||
-- act_user_id, repo_id, ref_name, comment_id, created) is identical.
|
||||
-- Our prior id scheme (gitea:{action_row_id}) gave them different ids,
|
||||
-- so the upsert-on-id dedup never fired and the timeline rendered each
|
||||
-- push twice.
|
||||
--
|
||||
-- This migration re-keys every existing gitea row to the same canonical
|
||||
-- formula `parse_gitea_event` now emits, deleting duplicates encountered
|
||||
-- along the way. Idempotent: running it again is a no-op because the
|
||||
-- canonical id of a canonical id is itself.
|
||||
|
||||
-- Snapshot the canonical id for every gitea row.
|
||||
CREATE TEMP TABLE _gitea_canonical AS
|
||||
SELECT
|
||||
id AS old_id,
|
||||
'gitea:'
|
||||
|| coalesce(payload->>'op_type', '') || ':'
|
||||
|| coalesce(payload->>'act_user_id', payload->'act_user'->>'id', '0') || ':'
|
||||
|| coalesce(payload->>'repo_id', payload->'repo'->>'id', '0') || ':'
|
||||
|| coalesce(payload->>'ref_name', '') || ':'
|
||||
|| coalesce(payload->>'comment_id', '0') || ':'
|
||||
|| coalesce(payload->>'created', '')
|
||||
AS new_id
|
||||
FROM events
|
||||
WHERE source = 'gitea';
|
||||
|
||||
-- For each canonical id, keep the row whose current id is lexicographically
|
||||
-- smallest (stable, arbitrary tie-break) and delete the rest. The "old id
|
||||
-- already matches the new id" case lands here too — DELETE skips it because
|
||||
-- rn = 1 for any singleton group.
|
||||
DELETE FROM events
|
||||
WHERE id IN (
|
||||
SELECT old_id FROM (
|
||||
SELECT old_id, new_id,
|
||||
row_number() OVER (PARTITION BY new_id ORDER BY old_id) AS rn
|
||||
FROM _gitea_canonical
|
||||
) ranked
|
||||
WHERE rn > 1
|
||||
);
|
||||
|
||||
-- Rename remaining rows to the canonical id. Postgres defers PK uniqueness
|
||||
-- to statement end, so swapping ids across rows in one UPDATE is fine
|
||||
-- provided the final set is unique (dedup above guarantees that).
|
||||
UPDATE events e
|
||||
SET id = c.new_id
|
||||
FROM _gitea_canonical c
|
||||
WHERE e.id = c.old_id
|
||||
AND e.id <> c.new_id;
|
||||
|
||||
DROP TABLE _gitea_canonical;
|
||||
@@ -276,8 +276,14 @@ impl EventSource for GiteaSource {
|
||||
/// Convert a Gitea activity feed item into our Event row. The host gets
|
||||
/// stamped into the payload as `_host` so the reshape layer can build
|
||||
/// web URLs without needing global config.
|
||||
///
|
||||
/// The id is content-derived rather than using Gitea's `id` field directly:
|
||||
/// Gitea creates one Action row per interested user-context, so a push to
|
||||
/// an org repo by user U produces two rows (one under U's context, one
|
||||
/// under the org's), distinguished only by `id` and `user_id`. Keying on
|
||||
/// `(op_type, act_user_id, repo_id, ref_name, comment_id, created)` makes
|
||||
/// those two rows collapse to the same event on upsert.
|
||||
fn parse_gitea_event(item: &Value, host: &str) -> Option<Event> {
|
||||
let id = item.get("id").and_then(Value::as_i64)?;
|
||||
let op_type = item.get("op_type").and_then(Value::as_str)?.to_string();
|
||||
let created_str = item.get("created").and_then(Value::as_str)?;
|
||||
let occurred_at = DateTime::parse_from_rfc3339(created_str)
|
||||
@@ -285,13 +291,15 @@ fn parse_gitea_event(item: &Value, host: &str) -> Option<Event> {
|
||||
.with_timezone(&Utc);
|
||||
let private = item.get("is_private").and_then(Value::as_bool).unwrap_or(false);
|
||||
|
||||
let id = gitea_canonical_id(item, &op_type, created_str);
|
||||
|
||||
let mut payload = item.clone();
|
||||
if let Some(obj) = payload.as_object_mut() {
|
||||
obj.insert("_host".into(), Value::String(host.into()));
|
||||
}
|
||||
|
||||
Some(Event {
|
||||
id: format!("gitea:{id}"),
|
||||
id,
|
||||
source: Source::Gitea,
|
||||
action: op_type,
|
||||
occurred_at,
|
||||
@@ -300,6 +308,25 @@ fn parse_gitea_event(item: &Value, host: &str) -> Option<Event> {
|
||||
})
|
||||
}
|
||||
|
||||
/// Build the canonical, content-derived id for a Gitea action. Must stay
|
||||
/// in lockstep with the SQL formula in migration 0005 so back-fill and
|
||||
/// new writes share the same id space.
|
||||
fn gitea_canonical_id(item: &Value, op_type: &str, created: &str) -> String {
|
||||
let act_user_id = item
|
||||
.get("act_user_id")
|
||||
.and_then(Value::as_i64)
|
||||
.or_else(|| item.get("act_user").and_then(|u| u.get("id")).and_then(Value::as_i64))
|
||||
.unwrap_or(0);
|
||||
let repo_id = item
|
||||
.get("repo_id")
|
||||
.and_then(Value::as_i64)
|
||||
.or_else(|| item.get("repo").and_then(|r| r.get("id")).and_then(Value::as_i64))
|
||||
.unwrap_or(0);
|
||||
let ref_name = item.get("ref_name").and_then(Value::as_str).unwrap_or("");
|
||||
let comment_id = item.get("comment_id").and_then(Value::as_i64).unwrap_or(0);
|
||||
format!("gitea:{op_type}:{act_user_id}:{repo_id}:{ref_name}:{comment_id}:{created}")
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
@@ -310,14 +337,16 @@ mod tests {
|
||||
let raw = json!({
|
||||
"id": 973,
|
||||
"op_type": "commit_repo",
|
||||
"act_user_id": 42,
|
||||
"repo_id": 7,
|
||||
"ref_name": "refs/heads/main",
|
||||
"is_private": false,
|
||||
"content": "{\"Commits\":[{\"Sha1\":\"abc123\"}],\"Len\":1}",
|
||||
"created": "2026-05-03T16:37:45Z",
|
||||
"repo": { "full_name": "grenade/moments" }
|
||||
"repo": { "id": 7, "full_name": "grenade/moments" }
|
||||
});
|
||||
let ev = parse_gitea_event(&raw, "git.lair.cafe").expect("parses");
|
||||
assert_eq!(ev.id, "gitea:973");
|
||||
assert_eq!(ev.id, "gitea:commit_repo:42:7:refs/heads/main:0:2026-05-03T16:37:45Z");
|
||||
assert_eq!(ev.source, Source::Gitea);
|
||||
assert_eq!(ev.action, "commit_repo");
|
||||
assert!(ev.public);
|
||||
@@ -328,6 +357,43 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn dup_action_rows_for_user_and_org_contexts_collapse_to_same_id() {
|
||||
// Gitea creates two Action rows when grenade pushes to helexa/cortex:
|
||||
// one with user_id=grenade (surfaced by the user feed), one with
|
||||
// user_id=helexa (surfaced by the org feed). Everything except `id`
|
||||
// and `user_id` is identical. The canonical id ignores both.
|
||||
let user_ctx = json!({
|
||||
"id": 1322,
|
||||
"user_id": 42,
|
||||
"op_type": "commit_repo",
|
||||
"act_user_id": 42,
|
||||
"act_user": { "login": "grenade", "id": 42 },
|
||||
"repo_id": 99,
|
||||
"repo": { "id": 99, "full_name": "helexa/cortex" },
|
||||
"ref_name": "refs/heads/main",
|
||||
"comment_id": 0,
|
||||
"is_private": false,
|
||||
"created": "2026-05-20T04:32:50Z"
|
||||
});
|
||||
let org_ctx = json!({
|
||||
"id": 1323,
|
||||
"user_id": 7,
|
||||
"op_type": "commit_repo",
|
||||
"act_user_id": 42,
|
||||
"act_user": { "login": "grenade", "id": 42 },
|
||||
"repo_id": 99,
|
||||
"repo": { "id": 99, "full_name": "helexa/cortex" },
|
||||
"ref_name": "refs/heads/main",
|
||||
"comment_id": 0,
|
||||
"is_private": false,
|
||||
"created": "2026-05-20T04:32:50Z"
|
||||
});
|
||||
let a = parse_gitea_event(&user_ctx, "git.lair.cafe").expect("parses");
|
||||
let b = parse_gitea_event(&org_ctx, "git.lair.cafe").expect("parses");
|
||||
assert_eq!(a.id, b.id, "duplicate action rows must collide on id");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn org_event_user_filter_predicate() {
|
||||
let by_user = json!({
|
||||
|
||||
Reference in New Issue
Block a user