fix: use since cursor in github-repo polls to prevent missed commits
After initial backfill, scan_repo was fetching only page 1 (100 most recent commits) per repo. If more than 100 commits landed between 7-day polls, older ones in that window were permanently missed. Now stores the newest commit date in poller_state.last_modified and passes it as &since= on subsequent polls, with full pagination, so only genuinely new commits are fetched but none are skipped. On first poll after deploy, last_modified is NULL so no since filter is applied — triggering a full re-backfill that catches any previously missed commits. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -117,19 +117,23 @@ impl GithubRepoSource {
|
|||||||
Ok(repos)
|
Ok(repos)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Fetch commits for a single repo, paginating fully on first run.
|
/// Fetch commits for a single repo, paginating fully on first run
|
||||||
|
/// and using `since` on subsequent runs to catch everything new.
|
||||||
async fn scan_repo(&self, repo: &Repo) -> Result<usize, SourceError> {
|
async fn scan_repo(&self, repo: &Repo) -> Result<usize, SourceError> {
|
||||||
let state_key = format!("github-repo:{}", repo.full_name);
|
let state_key = format!("github-repo:{}", repo.full_name);
|
||||||
let prior = self.state.load(&state_key).await?;
|
let prior = self.state.load(&state_key).await?;
|
||||||
let first_run = prior.is_none();
|
let since = prior.as_ref().and_then(|s| s.last_modified);
|
||||||
let max_pages = if first_run { MAX_BACKFILL_PAGES } else { 1 };
|
|
||||||
|
|
||||||
let mut total = 0usize;
|
let mut total = 0usize;
|
||||||
for page in 1..=max_pages {
|
let mut newest: Option<DateTime<Utc>> = since;
|
||||||
let url = format!(
|
for page in 1..=MAX_BACKFILL_PAGES {
|
||||||
|
let mut url = format!(
|
||||||
"https://api.github.com/repos/{}/commits?author={}&per_page={}&page={}",
|
"https://api.github.com/repos/{}/commits?author={}&per_page={}&page={}",
|
||||||
repo.full_name, self.config.user, self.config.per_page, page
|
repo.full_name, self.config.user, self.config.per_page, page
|
||||||
);
|
);
|
||||||
|
if let Some(since_dt) = since {
|
||||||
|
url.push_str(&format!("&since={}", since_dt.to_rfc3339()));
|
||||||
|
}
|
||||||
let req = self.apply_headers(self.client.get(&url));
|
let req = self.apply_headers(self.client.get(&url));
|
||||||
let resp = req
|
let resp = req
|
||||||
.send()
|
.send()
|
||||||
@@ -165,6 +169,13 @@ impl GithubRepoSource {
|
|||||||
.iter()
|
.iter()
|
||||||
.filter_map(|item| parse_commit(item, repo))
|
.filter_map(|item| parse_commit(item, repo))
|
||||||
.collect();
|
.collect();
|
||||||
|
for ev in &events {
|
||||||
|
newest = Some(match newest {
|
||||||
|
Some(n) if ev.occurred_at > n => ev.occurred_at,
|
||||||
|
Some(n) => n,
|
||||||
|
None => ev.occurred_at,
|
||||||
|
});
|
||||||
|
}
|
||||||
total += self.writer.upsert_events(&events).await?;
|
total += self.writer.upsert_events(&events).await?;
|
||||||
|
|
||||||
if items.len() < self.config.per_page as usize {
|
if items.len() < self.config.per_page as usize {
|
||||||
@@ -172,7 +183,7 @@ impl GithubRepoSource {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
self.state.touch(&state_key).await?;
|
self.state.save(&state_key, None, newest).await?;
|
||||||
Ok(total)
|
Ok(total)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user