Compare commits

...

6 Commits

Author SHA1 Message Date
4acf189562 Log progress every 100 skipped galleries
Emit an info log with the running skip count every 100 unchanged
galleries so long runs show progress rather than silence.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-30 09:49:31 +03:00
617fa34a23 Pre-warm connection pool and size it to match concurrency
Configure sqlx pool with min_connections = max_connections so all
connections are established at startup, avoiding slow-acquire warnings
from lazy mTLS handshakes. Add idle_timeout (5 min) to recycle stale
connections from prior runs, and reduce acquire_timeout to 10s for
faster failure.

Size the pool to io_concurrency + ml_concurrency + 2 to accommodate
the worst case where all IO tasks call image_exists concurrently.
Reduce default io_concurrency from 4× to 2× ML concurrency to keep
pool size within PostgreSQL's default max_connections.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-30 09:40:35 +03:00
154d46f5a0 Batch skip-path DB writes and reduce pool pressure
Skip paths in process_image no longer call upsert_gallery_image
individually. Instead they return the GalleryImage and the gallery
loop batch-upserts all skipped rows in a single query. This eliminates
~100 concurrent DB calls per gallery during the common case where most
images are unchanged.

Pool size reduced from io_concurrency + ml_concurrency + 4 to
ml_concurrency + 6, since only ML transactions and the occasional
image_exists fallback need connections during task execution.

Reduce service concurrency from 32 to 24 for index and cluster.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-30 09:37:03 +03:00
eed2184ed2 Fix DB connection pool exhaustion during image processing
Move IO semaphore acquisition to the task spawn site so at most
io_concurrency tasks are active at any time. Previously all ~100
images per gallery were spawned eagerly and raced for DB connections
on their skip-path upserts outside any semaphore, causing pool
timeouts when tasks exceeded the connection limit.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-30 09:31:37 +03:00
83cd55bdcb Remove --include/--exclude filters and unused dependencies
Drop FilterConfig and the glob-based include/exclude filtering that was
never used in practice and complicated gallery stats computation.

Remove 17 unused crate dependencies flagged by cargo-machete across
rbv-data, rbv-cluster, rbv-ingest, rbv-auth, rbv-search, rbv-cli, and
rbv-api. Remove glob, hyper-util, tokio-rustls, and tower from the
workspace root as no crate references them.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-30 09:29:25 +03:00
0dbc932d4a Optimize index pipeline with 4-layer skip logic and backfill command
Gallery-level skip: store file_count/total_bytes per gallery, skip
entirely when unchanged. Batch existence check: one query per gallery
instead of per image. File-size fast path: stat-only skip when filename
and size match stored values. Two-stage pipeline: separate IO and ML
concurrency (--io-concurrency flag).

Adds `rbv backfill` to populate file_size and gallery stats from disk
without ML, so skip optimisations are effective immediately after upgrade.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-30 09:14:07 +03:00
26 changed files with 554 additions and 166 deletions

26
Cargo.lock generated
View File

@@ -1094,12 +1094,6 @@ dependencies = [
"weezl",
]
[[package]]
name = "glob"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0cc23270f6e1808e30a928bdc84dea0b9b4136a8bc82338574f23baf47bbd280"
[[package]]
name = "h2"
version = "0.4.13"
@@ -2304,7 +2298,6 @@ dependencies = [
"axum-server",
"chrono",
"clap",
"hyper-util",
"image",
"rbv-auth",
"rbv-data",
@@ -2314,19 +2307,14 @@ dependencies = [
"rbv-ml",
"rbv-search",
"rustls",
"rustls-pemfile",
"serde",
"serde_json",
"sqlx",
"thiserror 2.0.18",
"tokio",
"tokio-rustls",
"tower",
"tower-http",
"tracing",
"tracing-subscriber",
"uuid",
"x509-parser",
]
[[package]]
@@ -2336,11 +2324,9 @@ dependencies = [
"anyhow",
"argon2",
"rand 0.8.5",
"rbv-entity",
"rustls",
"rustls-pemfile",
"thiserror 2.0.18",
"tracing",
"x509-parser",
]
@@ -2354,7 +2340,6 @@ dependencies = [
"rbv-cluster",
"rbv-data",
"rbv-entity",
"rbv-hash",
"rbv-infer",
"rbv-ingest",
"rbv-ml",
@@ -2369,7 +2354,6 @@ version = "0.1.0"
dependencies = [
"rayon",
"rbv-entity",
"thiserror 2.0.18",
"tracing",
]
@@ -2378,15 +2362,9 @@ name = "rbv-data"
version = "0.1.0"
dependencies = [
"anyhow",
"chrono",
"pgvector",
"rbv-entity",
"rbv-hash",
"serde",
"serde_json",
"sqlx",
"thiserror 2.0.18",
"tracing",
"uuid",
]
@@ -2428,7 +2406,6 @@ name = "rbv-ingest"
version = "0.1.0"
dependencies = [
"anyhow",
"glob",
"image",
"rbv-data",
"rbv-entity",
@@ -2437,7 +2414,6 @@ dependencies = [
"serde",
"serde_json",
"sqlx",
"thiserror 2.0.18",
"tokio",
"tracing",
]
@@ -2460,8 +2436,6 @@ dependencies = [
"rbv-entity",
"rbv-ml",
"sqlx",
"thiserror 2.0.18",
"tracing",
]
[[package]]

View File

@@ -29,13 +29,11 @@ tokio = { version = "1", features = ["full"] }
# Web framework
axum = { version = "0.8", features = ["macros"] }
tower = "0.5"
tower-http = { version = "0.6", features = ["fs", "cors", "trace"] }
# TLS
rustls = { version = "0.23", features = ["ring"] }
rustls-pemfile = "2"
tokio-rustls = "0.26"
x509-parser = "0.16"
# Database
@@ -59,10 +57,8 @@ chrono = { version = "0.4", features = ["serde"] }
thiserror = "2"
anyhow = "1"
async-trait = "0.1"
glob = "0.3"
rand = "0.8"
rayon = "1"
hyper-util = { version = "0.1", features = ["tokio", "server", "server-auto", "http1", "http2"] }
axum-server = { version = "0.7", features = ["tls-rustls"] }
# Image

View File

@@ -72,8 +72,8 @@ server, and `asset/` for the systemd unit and Podman quadlet files.
## Usage
See [`crates/rbv-cli/README.md`](crates/rbv-cli/README.md) for the full
`rbv index` / `rbv cluster` workflow, incremental re-indexing, and how to
reset face assignments.
`rbv index` / `rbv cluster` / `rbv backfill` workflow, incremental
re-indexing, skip optimisations, and how to reset face assignments.
### API server

View File

@@ -8,7 +8,7 @@ Type=oneshot
Environment=RUST_LOG=info,ort=off,sqlx::query=off
ExecStart=/usr/local/bin/rbv cluster \
--database postgres://rbv:password@localhost:4432/rbv \
--concurrency 32 \
--concurrency 24 \
--threshold 0.55
[Install]

View File

@@ -9,7 +9,7 @@ Environment=RUST_LOG=info,ort=off,sqlx::query=off
ExecStart=/usr/local/bin/rbv index \
--target /tank/data/rbv/%i \
--face-cache /tank/data/rbv/cache/face \
--concurrency 32 \
--concurrency 24 \
--database postgres://rbv:password@localhost:4432/rbv \
--model-dir /tank/containers/immich/ml-cache \
--face-score-thresh 0.7

View File

@@ -18,22 +18,16 @@ rbv-auth = { workspace = true }
rbv-search = { workspace = true }
sqlx = { workspace = true }
axum = { workspace = true }
tower = { workspace = true }
tower-http = { workspace = true }
tokio = { workspace = true }
tokio-rustls = { workspace = true }
rustls = { workspace = true }
rustls-pemfile = { workspace = true }
x509-parser = { workspace = true }
clap = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
uuid = { workspace = true }
chrono = { workspace = true }
anyhow = { workspace = true }
thiserror = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
image = { workspace = true }
hyper-util = { workspace = true }
axum-server = { workspace = true }

View File

@@ -5,7 +5,6 @@ edition.workspace = true
license.workspace = true
[dependencies]
rbv-entity = { workspace = true }
argon2 = { workspace = true }
rustls = { workspace = true }
rustls-pemfile = { workspace = true }
@@ -13,4 +12,3 @@ x509-parser = { workspace = true }
rand = { workspace = true }
anyhow = { workspace = true }
thiserror = { workspace = true }
tracing = { workspace = true }

View File

@@ -10,7 +10,6 @@ path = "src/main.rs"
[dependencies]
rbv-entity = { workspace = true }
rbv-hash = { workspace = true }
rbv-data = { workspace = true }
rbv-ml = { workspace = true }
rbv-infer = { workspace = true }

View File

@@ -26,9 +26,10 @@ rbv index \
--target <PATH>... \
--database <CONNSTR> \
--model-dir <PATH> \
[--concurrency <N>] # default 4
[--include <GLOB>...]
[--exclude <GLOB>...]
[--concurrency <N>] # ML concurrency, default 4
[--io-concurrency <N>] # file I/O concurrency, default 2× ML concurrency
[--reindex] # bypass gallery-level skip
[--ml-purge] # wipe all ML data and re-index from scratch
```
`--target` can be any of:
@@ -38,9 +39,37 @@ rbv index \
- A root directory (immediate children are chunks)
- Any arbitrary directory — galleries are discovered recursively
Images already present in the database are skipped, so re-running against
the same target is safe and cheap. Failed images are not written to the database and will be retried on the next
run.
#### Skip optimisations
Re-running `index` against the same target is safe and fast. Four layers of
skip logic avoid redundant work:
1. **Gallery-level skip** — each gallery's image file count and total byte
size are stored in the database. If neither has changed since the last
successful index, the entire gallery is skipped without reading any files.
Use `--reindex` to bypass this check.
2. **Batch existence check** — for galleries that are not skipped, all known
`(filename, image_id, file_size)` tuples are fetched in a single query
rather than one query per image.
3. **File-size fast path** — each file is `stat()`-ed (cheap syscall). If the
filename and file size match the stored values, the file is skipped without
being read or hashed.
4. **Content-hash dedup** — if a file is read and its BLAKE3 hash matches an
existing image (in this gallery or any other), ML processing is skipped.
Together these mean that a re-index of an unchanged 1.5 M image corpus
completes in seconds rather than days.
#### Two-stage pipeline
File I/O (reads + hashing) and ML inference run under separate concurrency
limits. `--io-concurrency` controls how many files are read/hashed in
parallel (default: 4× `--concurrency`), while `--concurrency` controls ML
inference slots. This keeps the disk and the ML backend both saturated
instead of taking turns.
**Quality note:** Indexing one gallery or the whole tree produces identical
embeddings. Recognition quality is determined entirely by `cluster` (below),
@@ -49,6 +78,29 @@ runs contributed to it.
---
### `backfill`
Populate `file_size` and gallery stats from disk without running any ML
inference. This is a one-time migration helper — run it after upgrading to
make all skip optimisations effective on the very first `index` run.
```
rbv backfill \
--target <PATH>... \
--database <CONNSTR> \
[--concurrency <N>] # default 16
```
For each gallery that exists both on disk and in the database, `backfill`
stats every image file and writes `file_size` into `gallery_images` rows
where it is currently NULL. It also sets the gallery-level `file_count` and
`total_bytes` columns so that gallery-level skip works immediately.
No files are read (only `stat()`-ed) and no ML models are loaded, so this
runs very quickly even on large corpora.
---
### `cluster`
Group all indexed face embeddings into person identities using cosine
@@ -85,6 +137,18 @@ rbv index --target /mnt/galleries/new-chunk --database "$DATABASE_URL" --model-d
rbv cluster --database "$DATABASE_URL"
```
### After upgrading (one-time)
If you have an existing database from before the skip optimisations were
added, run `backfill` once to populate file sizes and gallery stats:
```sh
rbv migrate --database "$DATABASE_URL"
rbv backfill --target /mnt/galleries --database "$DATABASE_URL"
```
Subsequent `index` runs will then benefit from all skip layers immediately.
---
## Resetting face assignments

View File

@@ -15,6 +15,7 @@ impl Cli {
Command::Migrate(args) => crate::commands::migrate::run(args).await,
Command::Index(args) => crate::commands::index::run(args).await,
Command::Cluster(args) => crate::commands::cluster::run(args).await,
Command::Backfill(args) => crate::commands::backfill::run(args).await,
}
}
}
@@ -27,6 +28,8 @@ pub enum Command {
Index(IndexArgs),
/// Cluster detected faces into person identities
Cluster(ClusterArgs),
/// Backfill file_size and gallery stats from disk (no ML work)
Backfill(BackfillArgs),
}
#[derive(Parser)]
@@ -42,14 +45,6 @@ pub struct IndexArgs {
#[arg(long, required = true)]
pub target: Vec<PathBuf>,
/// Filename/extension inclusion filters (rsync-style globs)
#[arg(long)]
pub include: Option<Vec<String>>,
/// Filename/extension exclusion filters (rsync-style globs)
#[arg(long)]
pub exclude: Option<Vec<String>>,
/// PostgreSQL connection string
#[arg(long)]
pub database: String,
@@ -78,6 +73,16 @@ pub struct IndexArgs {
#[arg(long)]
pub face_cache: Option<std::path::PathBuf>,
/// Number of concurrent I/O tasks (file reads + hashing). Defaults to
/// 4× the ML concurrency since I/O is not compute-bound.
#[arg(long)]
pub io_concurrency: Option<usize>,
/// Force re-processing of all galleries, bypassing the gallery-level
/// skip-if-unchanged check. Per-image deduplication still applies.
#[arg(long)]
pub reindex: bool,
/// Purge all ML-derived data (embeddings, face detections, persons)
/// before indexing, forcing a full re-index from scratch.
/// Also clears the face crop cache if --face-cache is provided.
@@ -85,6 +90,21 @@ pub struct IndexArgs {
pub ml_purge: bool,
}
#[derive(Parser)]
pub struct BackfillArgs {
/// PostgreSQL connection string
#[arg(long)]
pub database: String,
/// Path(s) to scan. May be a root, chunk, or gallery directory.
#[arg(long, required = true)]
pub target: Vec<PathBuf>,
/// Number of galleries to process concurrently
#[arg(long, default_value = "16")]
pub concurrency: usize,
}
#[derive(Parser)]
pub struct ClusterArgs {
/// PostgreSQL connection string

View File

@@ -0,0 +1,116 @@
use anyhow::Result;
use std::collections::HashMap;
use std::path::Path;
use tokio::sync::Semaphore;
use tracing::{info, warn};
use std::sync::Arc;
use crate::args::BackfillArgs;
pub async fn run(args: BackfillArgs) -> Result<()> {
let pool = rbv_data::connect(&args.database, args.concurrency as u32 + 2).await?;
// Discover galleries on disk from the provided targets.
let galleries_on_disk = rbv_ingest::discover_galleries(&args.target)?;
let disk_set: std::collections::HashSet<String> = galleries_on_disk
.iter()
.map(|p| p.to_string_lossy().into_owned())
.collect();
info!("Found {} galleries on disk.", disk_set.len());
// Load all galleries from DB.
let db_galleries = rbv_data::gallery::all_gallery_paths(&pool).await?;
info!("Found {} galleries in database.", db_galleries.len());
// Only process galleries that exist both on disk and in DB.
let to_process: Vec<_> = db_galleries
.into_iter()
.filter(|(_, path)| disk_set.contains(path))
.collect();
info!("Processing {} galleries present on disk and in database.", to_process.len());
let sem = Arc::new(Semaphore::new(args.concurrency));
let pool = Arc::new(pool);
let mut tasks = Vec::new();
for (gid, gallery_path) in to_process {
let sem = sem.clone();
let pool = pool.clone();
tasks.push(tokio::spawn(async move {
let _permit = sem.acquire().await.unwrap();
match backfill_gallery(&pool, &gid, &gallery_path).await {
Ok((updated, file_count, total_bytes)) => {
if updated > 0 {
info!(updated, file_count, total_bytes, "{}", gid.to_hex());
}
Ok(())
}
Err(e) => {
warn!("Error backfilling {}: {e:#}", gallery_path);
Err(e)
}
}
}));
}
let mut ok = 0usize;
let mut errs = 0usize;
for task in tasks {
match task.await {
Ok(Ok(())) => ok += 1,
Ok(Err(_)) | Err(_) => errs += 1,
}
}
info!("Backfill complete. Galleries: {ok} ok, {errs} errors.");
Ok(())
}
/// For one gallery: stat every image file on disk, match against DB filenames,
/// batch-update file_size, then update gallery stats.
async fn backfill_gallery(
pool: &rbv_data::PgPool,
gid: &rbv_entity::GalleryId,
gallery_path: &str,
) -> Result<(u64, i32, i64)> {
let gallery_dir = Path::new(gallery_path);
if !gallery_dir.is_dir() {
return Ok((0, 0, 0));
}
// Get filenames already in DB for this gallery (those with NULL file_size).
let existing = rbv_data::image::existing_gallery_images(pool, gid).await?;
// Stat image files on disk.
let image_paths = rbv_ingest::gallery::list_images(gallery_dir)?;
let mut disk_sizes: HashMap<String, i64> = HashMap::with_capacity(image_paths.len());
let mut total_bytes: i64 = 0;
for path in &image_paths {
if let Ok(meta) = std::fs::metadata(path) {
let size = meta.len() as i64;
total_bytes += size;
if let Some(fname) = path.file_name().and_then(|n| n.to_str()) {
disk_sizes.insert(fname.to_string(), size);
}
}
}
let file_count = image_paths.len() as i32;
// Build update list: only files that exist in DB with NULL file_size.
let updates: Vec<(String, i64)> = disk_sizes
.into_iter()
.filter(|(fname, _)| {
existing
.get(fname)
.is_some_and(|(_, stored_size)| stored_size.is_none())
})
.collect();
let updated = rbv_data::image::backfill_file_sizes(pool, gid, &updates).await?;
// Update gallery stats.
rbv_data::gallery::update_gallery_stats(pool, gid, file_count, total_bytes).await?;
Ok((updated, file_count, total_bytes))
}

View File

@@ -1,11 +1,15 @@
use anyhow::Result;
use tracing::info;
use std::sync::Arc;
use rbv_ingest::{IngestConfig, ingest_galleries, discover_galleries, FilterConfig};
use rbv_ingest::{IngestConfig, ingest_galleries, discover_galleries};
use crate::args::IndexArgs;
pub async fn run(args: IndexArgs) -> Result<()> {
let pool = rbv_data::connect(&args.database, args.concurrency as u32 + 4).await?;
let io_concurrency = args.io_concurrency.unwrap_or(args.concurrency * 2);
// Pool must accommodate io_concurrency tasks (some call image_exists)
// + ml_concurrency transactions + main-loop queries.
let pool_size = (io_concurrency + args.concurrency + 2) as u32;
let pool = rbv_data::connect(&args.database, pool_size).await?;
if args.ml_purge {
info!("Purging all ML-derived data (embeddings, faces, persons)...");
@@ -25,10 +29,10 @@ pub async fn run(args: IndexArgs) -> Result<()> {
rbv_infer::OnnxBackend::load_with_options(&args.model_dir, args.face_score_thresh)?
);
let filter = FilterConfig::new(args.include, args.exclude);
let config = IngestConfig {
concurrency: args.concurrency,
filter,
io_concurrency,
reindex: args.reindex,
};
info!("Discovering galleries in {} target(s)...", args.target.len());
@@ -44,8 +48,9 @@ pub async fn run(args: IndexArgs) -> Result<()> {
let report = ingest_galleries(&pool, ml, galleries, &config).await?;
info!(
"Indexing complete. Galleries: {}, Images processed: {}, skipped: {}, Faces detected: {}, Errors: {}",
"Indexing complete. Galleries: {} processed, {} skipped. Images: {} processed, {} skipped. Faces: {}. Errors: {}",
report.galleries_processed,
report.galleries_skipped,
report.images_processed,
report.images_skipped,
report.faces_detected,

View File

@@ -1,3 +1,4 @@
pub mod migrate;
pub mod index;
pub mod cluster;
pub mod backfill;

View File

@@ -6,6 +6,5 @@ license.workspace = true
[dependencies]
rbv-entity = { workspace = true }
thiserror = { workspace = true }
tracing = { workspace = true }
rayon = { workspace = true }

View File

@@ -6,13 +6,7 @@ license.workspace = true
[dependencies]
rbv-entity = { workspace = true }
rbv-hash = { workspace = true }
sqlx = { workspace = true }
pgvector = { workspace = true }
uuid = { workspace = true }
chrono = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
thiserror = { workspace = true }
anyhow = { workspace = true }
tracing = { workspace = true }

View File

@@ -62,6 +62,61 @@ pub async fn random_galleries(pool: &PgPool, count: i64) -> Result<Vec<Gallery>>
Ok(rows.iter().map(row_to_gallery).collect())
}
/// Return (gallery_id, path) for all galleries.
pub async fn all_gallery_paths(pool: &PgPool) -> Result<Vec<(GalleryId, String)>> {
let rows = sqlx::query("SELECT id, path FROM galleries")
.fetch_all(pool)
.await?;
Ok(rows
.iter()
.map(|r| {
let id_bytes: Vec<u8> = r.get("id");
(
GalleryId(id_bytes.try_into().expect("32-byte id")),
r.get::<String, _>("path"),
)
})
.collect())
}
// ── Gallery stats (skip-if-unchanged) ────────────────────────────────────────
/// Returns stored (file_count, total_bytes) if both are set and the gallery
/// has been indexed at least once. Returns None otherwise (gallery should be
/// processed).
pub async fn get_gallery_stats(pool: &PgPool, id: &GalleryId) -> Result<Option<(i32, i64)>> {
let row = sqlx::query(
"SELECT file_count, total_bytes FROM galleries WHERE id = $1 AND indexed_at IS NOT NULL",
)
.bind(id.as_bytes())
.fetch_optional(pool)
.await?;
Ok(row.and_then(|r| {
let fc: Option<i32> = r.get("file_count");
let tb: Option<i64> = r.get("total_bytes");
fc.zip(tb)
}))
}
/// Update gallery-level file stats after successful processing.
pub async fn update_gallery_stats(
pool: &PgPool,
id: &GalleryId,
file_count: i32,
total_bytes: i64,
) -> Result<()> {
sqlx::query(
"UPDATE galleries SET file_count = $2, total_bytes = $3 WHERE id = $1",
)
.bind(id.as_bytes())
.bind(file_count)
.bind(total_bytes)
.execute(pool)
.await?;
Ok(())
}
// ── Tags ─────────────────────────────────────────────────────────────────────
pub async fn galleries_by_tag(pool: &PgPool, tag: &str, page: i64, per_page: i64) -> Result<Vec<Gallery>> {

View File

@@ -1,3 +1,4 @@
use std::collections::HashMap;
use anyhow::Result;
use sqlx::{PgPool, Postgres, Row};
use rbv_entity::{GalleryId, GalleryImage, Image, ImageId};
@@ -15,9 +16,37 @@ pub async fn purge_ml_data(pool: &PgPool) -> Result<()> {
sqlx::query("TRUNCATE gallery_images, images CASCADE").execute(pool).await?;
sqlx::query("ALTER TABLE face_detections ENABLE TRIGGER trg_fd_person_image_count")
.execute(pool).await?;
// Reset gallery-level stats so galleries are re-processed after purge.
sqlx::query("UPDATE galleries SET file_count = NULL, total_bytes = NULL")
.execute(pool).await?;
Ok(())
}
/// Fetch all known (filename → (image_id, file_size)) for a gallery in one query.
/// Used for batch existence checks and file_size fast-path skipping.
pub async fn existing_gallery_images(
pool: &PgPool,
gallery_id: &GalleryId,
) -> Result<HashMap<String, (ImageId, Option<i64>)>> {
let rows = sqlx::query(
"SELECT filename, image_id, file_size FROM gallery_images WHERE gallery_id = $1",
)
.bind(gallery_id.as_bytes())
.fetch_all(pool)
.await?;
Ok(rows
.into_iter()
.map(|r| {
let filename: String = r.get("filename");
let iid_bytes: Vec<u8> = r.get("image_id");
let image_id = ImageId(iid_bytes.try_into().expect("32-byte id"));
let file_size: Option<i64> = r.get("file_size");
(filename, (image_id, file_size))
})
.collect())
}
pub async fn image_exists(pool: &PgPool, id: &ImageId) -> Result<bool> {
let row = sqlx::query("SELECT EXISTS(SELECT 1 FROM images WHERE id = $1)")
.bind(id.as_bytes())
@@ -53,26 +82,59 @@ where
{
sqlx::query(
r#"
INSERT INTO gallery_images (gallery_id, image_id, filename, ordering)
VALUES ($1, $2, $3, $4)
INSERT INTO gallery_images (gallery_id, image_id, filename, ordering, file_size)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (gallery_id, image_id) DO UPDATE SET
filename = EXCLUDED.filename,
ordering = EXCLUDED.ordering
filename = EXCLUDED.filename,
ordering = EXCLUDED.ordering,
file_size = EXCLUDED.file_size
"#,
)
.bind(gi.gallery_id.as_bytes())
.bind(gi.image_id.as_bytes())
.bind(&gi.filename)
.bind(gi.ordering)
.bind(gi.file_size)
.execute(executor)
.await?;
Ok(())
}
/// Batch upsert gallery_images rows in chunks, using a single query per chunk.
pub async fn upsert_gallery_images_batch(pool: &PgPool, items: &[GalleryImage]) -> Result<()> {
use sqlx::{Postgres, QueryBuilder};
if items.is_empty() {
return Ok(());
}
// 5 bind params per row; Postgres limit is 65535.
const CHUNK: usize = 13_000;
for chunk in items.chunks(CHUNK) {
let mut qb: QueryBuilder<Postgres> = QueryBuilder::new(
"INSERT INTO gallery_images (gallery_id, image_id, filename, ordering, file_size) ",
);
qb.push_values(chunk, |mut b, gi| {
b.push_bind(gi.gallery_id.as_bytes().to_vec())
.push_bind(gi.image_id.as_bytes().to_vec())
.push_bind(&gi.filename)
.push_bind(gi.ordering)
.push_bind(gi.file_size);
});
qb.push(
" ON CONFLICT (gallery_id, image_id) DO UPDATE SET \
filename = EXCLUDED.filename, \
ordering = EXCLUDED.ordering, \
file_size = EXCLUDED.file_size",
);
qb.build().execute(pool).await?;
}
Ok(())
}
pub async fn list_gallery_images(pool: &PgPool, gallery_id: &GalleryId) -> Result<Vec<(GalleryImage, Image)>> {
let rows = sqlx::query(
r#"
SELECT gi.gallery_id, gi.image_id, gi.filename, gi.ordering,
SELECT gi.gallery_id, gi.image_id, gi.filename, gi.ordering, gi.file_size,
i.width, i.height
FROM gallery_images gi
JOIN images i ON i.id = gi.image_id
@@ -93,6 +155,7 @@ pub async fn list_gallery_images(pool: &PgPool, gallery_id: &GalleryId) -> Resul
image_id: image_id.clone(),
filename: r.get("filename"),
ordering: r.get("ordering"),
file_size: r.get("file_size"),
};
let image = Image {
id: image_id,
@@ -103,6 +166,35 @@ pub async fn list_gallery_images(pool: &PgPool, gallery_id: &GalleryId) -> Resul
}).collect())
}
/// Batch-update file_size for gallery_images rows matched by (gallery_id, filename).
pub async fn backfill_file_sizes(
pool: &PgPool,
gallery_id: &GalleryId,
updates: &[(String, i64)],
) -> Result<u64> {
use sqlx::{Postgres, QueryBuilder};
if updates.is_empty() {
return Ok(0);
}
const CHUNK: usize = 16_000;
let mut total = 0u64;
for chunk in updates.chunks(CHUNK) {
let mut qb: QueryBuilder<Postgres> = QueryBuilder::new(
"UPDATE gallery_images SET file_size = v.file_size FROM (",
);
qb.push_values(chunk, |mut b, (filename, size)| {
b.push_bind(filename.as_str()).push_bind(*size);
});
qb.push(") AS v(filename, file_size) WHERE gallery_images.gallery_id = ");
qb.push_bind(gallery_id.as_bytes());
qb.push(" AND gallery_images.filename = v.filename");
let result = qb.build().execute(pool).await?;
total += result.rows_affected();
}
Ok(total)
}
pub async fn get_image(pool: &PgPool, id: &ImageId) -> Result<Option<Image>> {
let row = sqlx::query("SELECT id, width, height FROM images WHERE id = $1")
.bind(id.as_bytes())

View File

@@ -1,3 +1,4 @@
use std::time::Duration;
use anyhow::Result;
use sqlx::PgPool;
use sqlx::postgres::PgPoolOptions;
@@ -5,6 +6,14 @@ use sqlx::postgres::PgPoolOptions;
pub async fn connect(connstring: &str, max_connections: u32) -> Result<PgPool> {
let pool = PgPoolOptions::new()
.max_connections(max_connections)
// Pre-create connections so the pool is warm from the start,
// avoiding slow-acquire warnings from lazy mTLS handshakes.
.min_connections(max_connections)
// Recycle idle connections after 5 minutes so stale connections
// from prior runs don't linger.
.idle_timeout(Duration::from_secs(300))
// Fail fast rather than blocking for 30s on pool exhaustion.
.acquire_timeout(Duration::from_secs(10))
.connect(connstring)
.await?;
Ok(pool)

View File

@@ -33,4 +33,5 @@ pub struct GalleryImage {
pub image_id: ImageId,
pub filename: String,
pub ordering: i32,
pub file_size: Option<i64>,
}

View File

@@ -13,8 +13,6 @@ sqlx = { workspace = true }
tokio = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
thiserror = { workspace = true }
anyhow = { workspace = true }
tracing = { workspace = true }
glob = { workspace = true }
image = { workspace = true }

View File

@@ -1,43 +0,0 @@
use std::path::Path;
use glob::Pattern;
pub struct FilterConfig {
include: Option<Vec<Pattern>>,
exclude: Option<Vec<Pattern>>,
}
impl FilterConfig {
pub fn new(include: Option<Vec<String>>, exclude: Option<Vec<String>>) -> Self {
Self {
include: include.map(|v| v.iter().filter_map(|s| Pattern::new(s).ok()).collect()),
exclude: exclude.map(|v| v.iter().filter_map(|s| Pattern::new(s).ok()).collect()),
}
}
/// Returns true if the given filename should be included.
///
/// Mirrors rsync behaviour:
/// - If include patterns are set, filename must match at least one.
/// - If exclude patterns are set, filename must not match any.
/// - If neither is set, all files are included.
pub fn matches(&self, path: &Path) -> bool {
let name = match path.file_name().and_then(|n| n.to_str()) {
Some(n) => n,
None => return false,
};
if let Some(includes) = &self.include {
if !includes.iter().any(|p| p.matches(name)) {
return false;
}
}
if let Some(excludes) = &self.exclude {
if excludes.iter().any(|p| p.matches(name)) {
return false;
}
}
true
}
}

View File

@@ -1,8 +1,6 @@
pub mod traversal;
pub mod filter;
pub mod gallery;
pub mod pipeline;
pub use pipeline::{ingest_galleries, IngestConfig, IngestReport};
pub use traversal::{discover_galleries, DirKind};
pub use filter::FilterConfig;

View File

@@ -1,29 +1,40 @@
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::time::Duration;
use anyhow::Result;
use tokio::sync::Semaphore;
use tracing::{info, warn};
use rbv_entity::{ClipEmbedding, FaceDetection, Gallery, GalleryImage, Image};
use rbv_entity::{ClipEmbedding, FaceDetection, Gallery, GalleryImage, Image, ImageId};
use rbv_hash::{face_id, gallery_id, image_id};
use std::sync::Arc;
use rbv_ml::{AnalysisResult, MlBackend, MlError};
use sqlx::PgPool;
use crate::filter::FilterConfig;
use crate::gallery::{list_images, read_index_json};
pub struct IngestConfig {
pub concurrency: usize,
pub filter: FilterConfig,
pub io_concurrency: usize,
pub reindex: bool,
}
pub struct IngestReport {
pub galleries_processed: usize,
pub galleries_skipped: usize,
pub images_processed: usize,
pub images_skipped: usize,
pub faces_detected: usize,
pub errors: Vec<(PathBuf, anyhow::Error)>,
}
/// Result from process_image. Skip paths return the GalleryImage for
/// batch upsert, avoiding per-image DB calls during the task phase.
enum ImageResult {
/// Image was skipped — gallery_image row needs batch upsert.
Skipped(GalleryImage),
/// Image was fully processed (ML + DB transaction done).
Processed { faces: usize },
}
pub async fn ingest_galleries(
pool: &PgPool,
ml: Arc<dyn MlBackend>,
@@ -32,13 +43,15 @@ pub async fn ingest_galleries(
) -> Result<IngestReport> {
let mut report = IngestReport {
galleries_processed: 0,
galleries_skipped: 0,
images_processed: 0,
images_skipped: 0,
faces_detected: 0,
errors: Vec::new(),
};
let semaphore = Arc::new(Semaphore::new(config.concurrency));
let io_semaphore = Arc::new(Semaphore::new(config.io_concurrency));
let ml_semaphore = Arc::new(Semaphore::new(config.concurrency));
for gallery_path in &galleries {
tracing::trace!("processing gallery {}", gallery_path.display());
@@ -78,22 +91,61 @@ pub async fn ingest_galleries(
}
};
let filtered: Vec<PathBuf> = image_paths
.into_iter()
.filter(|p| config.filter.matches(p))
.collect();
// ── Optimization 1: gallery-level skip ──────────────────────────
let (file_count, total_bytes) = match compute_gallery_stats(&image_paths) {
Ok(stats) => stats,
Err(e) => {
warn!("Cannot stat gallery {}, processing anyway: {e}", gallery_path.display());
(-1, -1)
}
};
if !config.reindex {
if let Ok(Some((stored_count, stored_bytes))) =
rbv_data::gallery::get_gallery_stats(pool, &gid).await
{
if stored_count == file_count && stored_bytes == total_bytes {
report.galleries_skipped += 1;
if report.galleries_skipped % 100 == 0 {
info!(skipped = report.galleries_skipped, "galleries unchanged");
}
continue;
}
}
}
// ── Optimization 2: batch existence check ───────────────────────
let existing = match rbv_data::image::existing_gallery_images(pool, &gid).await {
Ok(m) => Arc::new(m),
Err(e) => {
warn!("Cannot fetch existing images for {}: {e}", gallery_path.display());
Arc::new(HashMap::new())
}
};
let mut tasks = Vec::new();
for (ordering, image_path) in filtered.iter().enumerate() {
for (ordering, image_path) in image_paths.iter().enumerate() {
let pool = pool.clone();
let ml = Arc::clone(&ml);
let gid = gid.clone();
let image_path = image_path.clone();
let sem = semaphore.clone();
let io_sem = io_semaphore.clone();
let ml_sem = ml_semaphore.clone();
let existing = Arc::clone(&existing);
tasks.push(tokio::spawn(async move {
let _permit = sem.acquire_owned().await.unwrap();
let result = process_image(&pool, ml.as_ref(), &gid, &image_path, ordering as i32).await;
let io_permit = io_sem.acquire_owned().await.unwrap();
let result = process_image(
&pool,
ml.as_ref(),
&gid,
&image_path,
ordering as i32,
&existing,
io_permit,
&ml_sem,
)
.await;
(image_path, result)
}));
}
@@ -102,19 +154,20 @@ pub async fn ingest_galleries(
let mut g_skipped = 0usize;
let mut g_faces = 0usize;
let mut g_errors = 0usize;
let mut skipped_gis = Vec::new();
for task in tasks {
match task.await {
Ok((_, Ok((skipped, faces)))) => {
if skipped {
report.images_skipped += 1;
g_skipped += 1;
} else {
report.images_processed += 1;
report.faces_detected += faces;
g_processed += 1;
g_faces += faces;
}
Ok((_, Ok(ImageResult::Skipped(gi)))) => {
skipped_gis.push(gi);
report.images_skipped += 1;
g_skipped += 1;
}
Ok((_, Ok(ImageResult::Processed { faces }))) => {
report.images_processed += 1;
report.faces_detected += faces;
g_processed += 1;
g_faces += faces;
}
Ok((image_path, Err(e))) => {
warn!("{} {} {e:#}", gid.to_hex(), image_path.display());
@@ -128,6 +181,15 @@ pub async fn ingest_galleries(
}
}
// Batch-upsert all skipped gallery_image rows in one query.
if !skipped_gis.is_empty() {
if let Err(e) = rbv_data::image::upsert_gallery_images_batch(pool, &skipped_gis).await {
warn!("batch upsert gallery_images failed for {}: {e:#}", gid.to_hex());
g_errors += 1;
report.errors.push((gallery_path.clone(), e));
}
}
let label = if !gallery.source_name.is_empty() {
format!("{} {}", gid.to_hex(), gallery.source_name)
} else {
@@ -141,57 +203,110 @@ pub async fn ingest_galleries(
"{label}"
);
if g_errors == 0 && file_count >= 0 {
let _ = rbv_data::gallery::update_gallery_stats(
pool, &gid, file_count, total_bytes,
)
.await;
}
report.galleries_processed += 1;
}
Ok(report)
}
/// Process a single image: hash, check if exists, submit to ML, upsert to DB.
/// Returns (was_skipped, faces_detected).
/// Stat all image files to get (count, total_bytes) for gallery-level skip.
fn compute_gallery_stats(paths: &[PathBuf]) -> std::io::Result<(i32, i64)> {
let mut total: i64 = 0;
for p in paths {
total += std::fs::metadata(p)?.len() as i64;
}
Ok((paths.len() as i32, total))
}
/// Process a single image through a two-stage pipeline:
/// Stage 1 (IO permit held): stat, file_size fast-path, read, hash
/// Stage 2 (ML semaphore): inference, DB write
///
/// Skip paths return `ImageResult::Skipped(gi)` without touching the DB —
/// the caller batch-upserts all skipped gallery_image rows in one query.
/// Only the ML path (new images) writes to the DB inside the task.
async fn process_image(
pool: &PgPool,
ml: &dyn MlBackend,
gallery_id: &rbv_entity::GalleryId,
image_path: &PathBuf,
ordering: i32,
) -> Result<(bool, usize)> {
let bytes = tokio::fs::read(image_path).await?;
let iid = image_id(&bytes);
existing: &HashMap<String, (ImageId, Option<i64>)>,
io_permit: tokio::sync::OwnedSemaphorePermit,
ml_sem: &Semaphore,
) -> Result<ImageResult> {
let filename = image_path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("")
.to_string();
// Always upsert the gallery_images row (ordering may change)
// ── Stage 1: I/O (io_permit already held) ───────────────────────────
// Optimization 3: file_size fast-path — stat only, no file read.
let metadata = tokio::fs::metadata(image_path).await?;
let file_size = metadata.len() as i64;
if let Some((existing_iid, Some(stored_size))) = existing.get(&filename) {
if *stored_size == file_size {
drop(io_permit);
return Ok(ImageResult::Skipped(GalleryImage {
gallery_id: gallery_id.clone(),
image_id: existing_iid.clone(),
filename,
ordering,
file_size: Some(file_size),
}));
}
}
// File is new or changed — read and hash.
let bytes = tokio::fs::read(image_path).await?;
let iid = image_id(&bytes);
let gi = GalleryImage {
gallery_id: gallery_id.clone(),
image_id: iid.clone(),
filename,
ordering,
file_size: Some(file_size),
};
// Check if ML work has already been done for this image content
if rbv_data::image::image_exists(pool, &iid).await? {
rbv_data::image::upsert_gallery_image(pool, &gi).await?;
return Ok((true, 0));
// Optimization 2: check batch-loaded set instead of per-image DB query.
if existing.values().any(|(eid, _)| *eid == iid) {
drop(io_permit);
return Ok(ImageResult::Skipped(gi));
}
// Resize for ML API (max 1MB part limit). Hash is computed on originals above.
let ml_bytes = prepare_for_ml(&bytes)?;
// Check images table directly (image may exist in another gallery).
if rbv_data::image::image_exists(pool, &iid).await? {
drop(io_permit);
return Ok(ImageResult::Skipped(gi));
}
// Resize for ML (hash is computed on originals above).
let ml_bytes = prepare_for_ml(&bytes)?;
drop(bytes);
drop(io_permit);
// ── Stage 2: ML inference (under ml_sem) ────────────────────────────
let _ml_permit = ml_sem.acquire().await.unwrap();
// Submit to ML API (with retry/backoff for transient connection errors)
let result = match analyze_with_backoff(ml, &ml_bytes, image_path).await {
Ok(r) => r,
Err(e) => {
warn!("Skipping {} after ML retries exhausted: {e:#}", image_path.display());
return Ok((true, 0));
return Ok(ImageResult::Skipped(gi));
}
};
// Build all entities before touching the DB
let image = Image {
id: iid.clone(),
width: Some(result.image_width),
@@ -214,7 +329,6 @@ async fn process_image(
}).collect();
let face_count = faces.len();
// Write everything in a single transaction
let mut tx = pool.begin().await?;
rbv_data::image::upsert_image(&mut *tx, &image).await?;
rbv_data::image::upsert_gallery_image(&mut *tx, &gi).await?;
@@ -222,11 +336,10 @@ async fn process_image(
rbv_data::face::upsert_faces_batch(&mut *tx, &faces).await?;
tx.commit().await?;
Ok((false, face_count))
Ok(ImageResult::Processed { faces: face_count })
}
/// Call `ml.analyze_image` with exponential backoff for transient failures.
/// Retries up to 3 times (delays: 5s, 10s, 20s) on connection/timeout/5xx errors.
async fn analyze_with_backoff(
ml: &dyn MlBackend,
bytes: &[u8],
@@ -262,8 +375,6 @@ async fn analyze_with_backoff(
}
/// Decode, resize if needed, and re-encode as JPEG for the ML API.
/// Always produces JPEG output regardless of the source format, ensuring
/// Pillow on the server side can always identify the image.
fn prepare_for_ml(bytes: &[u8]) -> anyhow::Result<Vec<u8>> {
const MAX_DIM: u32 = 1280;

View File

@@ -10,5 +10,3 @@ rbv-data = { workspace = true }
rbv-ml = { workspace = true }
sqlx = { workspace = true }
anyhow = { workspace = true }
thiserror = { workspace = true }
tracing = { workspace = true }

View File

@@ -0,0 +1,5 @@
-- Gallery-level stats for skip-if-unchanged optimisation.
-- NULL means "never computed" → gallery will not be skipped on next index run.
ALTER TABLE galleries
ADD COLUMN IF NOT EXISTS file_count INT DEFAULT NULL,
ADD COLUMN IF NOT EXISTS total_bytes BIGINT DEFAULT NULL;

View File

@@ -0,0 +1,4 @@
-- Per-image file size for fast-path skip (avoid reading + hashing unchanged files).
-- NULL means "unknown" → first re-index after migration will populate this column.
ALTER TABLE gallery_images
ADD COLUMN IF NOT EXISTS file_size BIGINT DEFAULT NULL;