Batch cluster DB writes: ~268k round trips → two bulk queries

Generate all PersonIds upfront, then bulk-insert all persons with a
chunked QueryBuilder INSERT and bulk-update all face assignments with a
chunked QueryBuilder UPDATE FROM VALUES. Reduces the 40-minute write
phase to seconds. Also fixes NaiveDateTime/TIMESTAMPTZ decode in person.rs.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-23 16:46:49 +02:00
parent 23c3836510
commit f5fa5f4f6b
3 changed files with 56 additions and 15 deletions

View File

@@ -25,21 +25,27 @@ pub async fn run(args: ClusterArgs) -> Result<()> {
let clusters = cluster_faces(&faces, &config);
info!("Formed {} clusters.", clusters.len());
let mut persons_created = 0u64;
let mut faces_assigned = 0u64;
for cluster in clusters {
// Generate all person IDs upfront then write in two bulk queries.
let mut person_ids = Vec::with_capacity(clusters.len());
let mut assignments = Vec::new();
for cluster in &clusters {
if cluster.is_empty() {
continue;
}
let person_id = rbv_data::person::create_person(&pool).await?;
persons_created += 1;
for face_id in &cluster {
rbv_data::face::assign_face_to_person(&pool, face_id, &person_id).await?;
faces_assigned += 1;
let person_id = rbv_entity::PersonId::new();
for face_id in cluster {
assignments.push((face_id.clone(), person_id.clone()));
}
person_ids.push(person_id);
}
info!("Created {} persons, assigned {} faces.", persons_created, faces_assigned);
let persons_created = person_ids.len();
let faces_assigned = assignments.len();
info!("Writing {persons_created} persons and {faces_assigned} face assignments...");
rbv_data::person::create_persons_batch(&pool, &person_ids).await?;
rbv_data::face::assign_faces_batch(&pool, &assignments).await?;
info!("Created {persons_created} persons, assigned {faces_assigned} faces.");
Ok(())
}

View File

@@ -38,6 +38,26 @@ pub async fn assign_face_to_person(pool: &PgPool, face_id: &FaceId, person_id: &
Ok(())
}
pub async fn assign_faces_batch(pool: &PgPool, assignments: &[(FaceId, PersonId)]) -> Result<()> {
if assignments.is_empty() {
return Ok(());
}
const CHUNK: usize = 16_000;
for chunk in assignments.chunks(CHUNK) {
let mut qb: QueryBuilder<Postgres> = QueryBuilder::new(
"UPDATE face_detections SET person_id = v.person_id \
FROM (VALUES ",
);
qb.push_values(chunk, |mut b, (face_id, person_id)| {
b.push_bind(face_id.as_bytes().to_vec())
.push_bind(person_id.as_uuid());
});
qb.push(") AS v(face_id, person_id) WHERE face_detections.id = v.face_id");
qb.build().execute(pool).await?;
}
Ok(())
}
pub async fn all_face_embeddings(pool: &PgPool) -> Result<Vec<(FaceId, Vec<f32>)>> {
let rows = sqlx::query("SELECT id, embedding FROM face_detections")
.fetch_all(pool)

View File

@@ -1,5 +1,5 @@
use anyhow::Result;
use sqlx::{PgPool, Row};
use sqlx::{PgPool, Postgres, QueryBuilder, Row};
use rbv_entity::{GalleryId, Person, PersonId, PersonName};
pub async fn create_person(pool: &PgPool) -> Result<PersonId> {
@@ -11,6 +11,21 @@ pub async fn create_person(pool: &PgPool) -> Result<PersonId> {
Ok(id)
}
pub async fn create_persons_batch(pool: &PgPool, person_ids: &[PersonId]) -> Result<()> {
if person_ids.is_empty() {
return Ok(());
}
const CHUNK: usize = 32_000;
for chunk in person_ids.chunks(CHUNK) {
let mut qb: QueryBuilder<Postgres> = QueryBuilder::new("INSERT INTO persons (id) ");
qb.push_values(chunk, |mut b, id| {
b.push_bind(id.as_uuid());
});
qb.build().execute(pool).await?;
}
Ok(())
}
pub async fn get_person(pool: &PgPool, id: &PersonId) -> Result<Option<Person>> {
let row = sqlx::query("SELECT id, created_at FROM persons WHERE id = $1")
.bind(id.as_uuid())
@@ -18,7 +33,7 @@ pub async fn get_person(pool: &PgPool, id: &PersonId) -> Result<Option<Person>>
.await?;
Ok(row.map(|r| Person {
id: PersonId(r.get("id")),
created_at: r.get::<chrono::NaiveDateTime, _>("created_at").and_utc(),
created_at: r.get("created_at"),
}))
}
@@ -32,7 +47,7 @@ pub async fn get_all_persons_paged(pool: &PgPool, page: i64, per_page: i64) -> R
.await?;
Ok(rows.iter().map(|r| Person {
id: PersonId(r.get("id")),
created_at: r.get::<chrono::NaiveDateTime, _>("created_at").and_utc(),
created_at: r.get("created_at"),
}).collect())
}
@@ -92,7 +107,7 @@ pub async fn find_persons_by_name(pool: &PgPool, name: &str) -> Result<Vec<(Pers
let id: uuid::Uuid = r.get("id");
let person = Person {
id: PersonId(id),
created_at: r.get::<chrono::NaiveDateTime, _>("created_at").and_utc(),
created_at: r.get("created_at"),
};
let pname = PersonName {
person_id: PersonId(id),
@@ -125,6 +140,6 @@ pub async fn get_persons_for_gallery(pool: &PgPool, gallery_id: &GalleryId) -> R
.await?;
Ok(rows.iter().map(|r| Person {
id: PersonId(r.get("id")),
created_at: r.get::<chrono::NaiveDateTime, _>("created_at").and_utc(),
created_at: r.get("created_at"),
}).collect())
}