Compare commits
6 Commits
1e7bc916e1
...
4acf189562
| Author | SHA1 | Date | |
|---|---|---|---|
|
4acf189562
|
|||
|
617fa34a23
|
|||
|
154d46f5a0
|
|||
|
eed2184ed2
|
|||
|
83cd55bdcb
|
|||
|
0dbc932d4a
|
26
Cargo.lock
generated
26
Cargo.lock
generated
@@ -1094,12 +1094,6 @@ dependencies = [
|
||||
"weezl",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "glob"
|
||||
version = "0.3.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0cc23270f6e1808e30a928bdc84dea0b9b4136a8bc82338574f23baf47bbd280"
|
||||
|
||||
[[package]]
|
||||
name = "h2"
|
||||
version = "0.4.13"
|
||||
@@ -2304,7 +2298,6 @@ dependencies = [
|
||||
"axum-server",
|
||||
"chrono",
|
||||
"clap",
|
||||
"hyper-util",
|
||||
"image",
|
||||
"rbv-auth",
|
||||
"rbv-data",
|
||||
@@ -2314,19 +2307,14 @@ dependencies = [
|
||||
"rbv-ml",
|
||||
"rbv-search",
|
||||
"rustls",
|
||||
"rustls-pemfile",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sqlx",
|
||||
"thiserror 2.0.18",
|
||||
"tokio",
|
||||
"tokio-rustls",
|
||||
"tower",
|
||||
"tower-http",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
"uuid",
|
||||
"x509-parser",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -2336,11 +2324,9 @@ dependencies = [
|
||||
"anyhow",
|
||||
"argon2",
|
||||
"rand 0.8.5",
|
||||
"rbv-entity",
|
||||
"rustls",
|
||||
"rustls-pemfile",
|
||||
"thiserror 2.0.18",
|
||||
"tracing",
|
||||
"x509-parser",
|
||||
]
|
||||
|
||||
@@ -2354,7 +2340,6 @@ dependencies = [
|
||||
"rbv-cluster",
|
||||
"rbv-data",
|
||||
"rbv-entity",
|
||||
"rbv-hash",
|
||||
"rbv-infer",
|
||||
"rbv-ingest",
|
||||
"rbv-ml",
|
||||
@@ -2369,7 +2354,6 @@ version = "0.1.0"
|
||||
dependencies = [
|
||||
"rayon",
|
||||
"rbv-entity",
|
||||
"thiserror 2.0.18",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
@@ -2378,15 +2362,9 @@ name = "rbv-data"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"chrono",
|
||||
"pgvector",
|
||||
"rbv-entity",
|
||||
"rbv-hash",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sqlx",
|
||||
"thiserror 2.0.18",
|
||||
"tracing",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
@@ -2428,7 +2406,6 @@ name = "rbv-ingest"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"glob",
|
||||
"image",
|
||||
"rbv-data",
|
||||
"rbv-entity",
|
||||
@@ -2437,7 +2414,6 @@ dependencies = [
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sqlx",
|
||||
"thiserror 2.0.18",
|
||||
"tokio",
|
||||
"tracing",
|
||||
]
|
||||
@@ -2460,8 +2436,6 @@ dependencies = [
|
||||
"rbv-entity",
|
||||
"rbv-ml",
|
||||
"sqlx",
|
||||
"thiserror 2.0.18",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
||||
@@ -29,13 +29,11 @@ tokio = { version = "1", features = ["full"] }
|
||||
|
||||
# Web framework
|
||||
axum = { version = "0.8", features = ["macros"] }
|
||||
tower = "0.5"
|
||||
tower-http = { version = "0.6", features = ["fs", "cors", "trace"] }
|
||||
|
||||
# TLS
|
||||
rustls = { version = "0.23", features = ["ring"] }
|
||||
rustls-pemfile = "2"
|
||||
tokio-rustls = "0.26"
|
||||
x509-parser = "0.16"
|
||||
|
||||
# Database
|
||||
@@ -59,10 +57,8 @@ chrono = { version = "0.4", features = ["serde"] }
|
||||
thiserror = "2"
|
||||
anyhow = "1"
|
||||
async-trait = "0.1"
|
||||
glob = "0.3"
|
||||
rand = "0.8"
|
||||
rayon = "1"
|
||||
hyper-util = { version = "0.1", features = ["tokio", "server", "server-auto", "http1", "http2"] }
|
||||
axum-server = { version = "0.7", features = ["tls-rustls"] }
|
||||
|
||||
# Image
|
||||
|
||||
@@ -72,8 +72,8 @@ server, and `asset/` for the systemd unit and Podman quadlet files.
|
||||
## Usage
|
||||
|
||||
See [`crates/rbv-cli/README.md`](crates/rbv-cli/README.md) for the full
|
||||
`rbv index` / `rbv cluster` workflow, incremental re-indexing, and how to
|
||||
reset face assignments.
|
||||
`rbv index` / `rbv cluster` / `rbv backfill` workflow, incremental
|
||||
re-indexing, skip optimisations, and how to reset face assignments.
|
||||
|
||||
### API server
|
||||
|
||||
|
||||
@@ -8,7 +8,7 @@ Type=oneshot
|
||||
Environment=RUST_LOG=info,ort=off,sqlx::query=off
|
||||
ExecStart=/usr/local/bin/rbv cluster \
|
||||
--database postgres://rbv:password@localhost:4432/rbv \
|
||||
--concurrency 32 \
|
||||
--concurrency 24 \
|
||||
--threshold 0.55
|
||||
|
||||
[Install]
|
||||
|
||||
@@ -9,7 +9,7 @@ Environment=RUST_LOG=info,ort=off,sqlx::query=off
|
||||
ExecStart=/usr/local/bin/rbv index \
|
||||
--target /tank/data/rbv/%i \
|
||||
--face-cache /tank/data/rbv/cache/face \
|
||||
--concurrency 32 \
|
||||
--concurrency 24 \
|
||||
--database postgres://rbv:password@localhost:4432/rbv \
|
||||
--model-dir /tank/containers/immich/ml-cache \
|
||||
--face-score-thresh 0.7
|
||||
|
||||
@@ -18,22 +18,16 @@ rbv-auth = { workspace = true }
|
||||
rbv-search = { workspace = true }
|
||||
sqlx = { workspace = true }
|
||||
axum = { workspace = true }
|
||||
tower = { workspace = true }
|
||||
tower-http = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
tokio-rustls = { workspace = true }
|
||||
rustls = { workspace = true }
|
||||
rustls-pemfile = { workspace = true }
|
||||
x509-parser = { workspace = true }
|
||||
clap = { workspace = true }
|
||||
serde = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
uuid = { workspace = true }
|
||||
chrono = { workspace = true }
|
||||
anyhow = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
tracing-subscriber = { workspace = true }
|
||||
image = { workspace = true }
|
||||
hyper-util = { workspace = true }
|
||||
axum-server = { workspace = true }
|
||||
|
||||
@@ -5,7 +5,6 @@ edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
rbv-entity = { workspace = true }
|
||||
argon2 = { workspace = true }
|
||||
rustls = { workspace = true }
|
||||
rustls-pemfile = { workspace = true }
|
||||
@@ -13,4 +12,3 @@ x509-parser = { workspace = true }
|
||||
rand = { workspace = true }
|
||||
anyhow = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
|
||||
@@ -10,7 +10,6 @@ path = "src/main.rs"
|
||||
|
||||
[dependencies]
|
||||
rbv-entity = { workspace = true }
|
||||
rbv-hash = { workspace = true }
|
||||
rbv-data = { workspace = true }
|
||||
rbv-ml = { workspace = true }
|
||||
rbv-infer = { workspace = true }
|
||||
|
||||
@@ -26,9 +26,10 @@ rbv index \
|
||||
--target <PATH>... \
|
||||
--database <CONNSTR> \
|
||||
--model-dir <PATH> \
|
||||
[--concurrency <N>] # default 4
|
||||
[--include <GLOB>...]
|
||||
[--exclude <GLOB>...]
|
||||
[--concurrency <N>] # ML concurrency, default 4
|
||||
[--io-concurrency <N>] # file I/O concurrency, default 2× ML concurrency
|
||||
[--reindex] # bypass gallery-level skip
|
||||
[--ml-purge] # wipe all ML data and re-index from scratch
|
||||
```
|
||||
|
||||
`--target` can be any of:
|
||||
@@ -38,9 +39,37 @@ rbv index \
|
||||
- A root directory (immediate children are chunks)
|
||||
- Any arbitrary directory — galleries are discovered recursively
|
||||
|
||||
Images already present in the database are skipped, so re-running against
|
||||
the same target is safe and cheap. Failed images are not written to the database and will be retried on the next
|
||||
run.
|
||||
#### Skip optimisations
|
||||
|
||||
Re-running `index` against the same target is safe and fast. Four layers of
|
||||
skip logic avoid redundant work:
|
||||
|
||||
1. **Gallery-level skip** — each gallery's image file count and total byte
|
||||
size are stored in the database. If neither has changed since the last
|
||||
successful index, the entire gallery is skipped without reading any files.
|
||||
Use `--reindex` to bypass this check.
|
||||
|
||||
2. **Batch existence check** — for galleries that are not skipped, all known
|
||||
`(filename, image_id, file_size)` tuples are fetched in a single query
|
||||
rather than one query per image.
|
||||
|
||||
3. **File-size fast path** — each file is `stat()`-ed (cheap syscall). If the
|
||||
filename and file size match the stored values, the file is skipped without
|
||||
being read or hashed.
|
||||
|
||||
4. **Content-hash dedup** — if a file is read and its BLAKE3 hash matches an
|
||||
existing image (in this gallery or any other), ML processing is skipped.
|
||||
|
||||
Together these mean that a re-index of an unchanged 1.5 M image corpus
|
||||
completes in seconds rather than days.
|
||||
|
||||
#### Two-stage pipeline
|
||||
|
||||
File I/O (reads + hashing) and ML inference run under separate concurrency
|
||||
limits. `--io-concurrency` controls how many files are read/hashed in
|
||||
parallel (default: 4× `--concurrency`), while `--concurrency` controls ML
|
||||
inference slots. This keeps the disk and the ML backend both saturated
|
||||
instead of taking turns.
|
||||
|
||||
**Quality note:** Indexing one gallery or the whole tree produces identical
|
||||
embeddings. Recognition quality is determined entirely by `cluster` (below),
|
||||
@@ -49,6 +78,29 @@ runs contributed to it.
|
||||
|
||||
---
|
||||
|
||||
### `backfill`
|
||||
|
||||
Populate `file_size` and gallery stats from disk without running any ML
|
||||
inference. This is a one-time migration helper — run it after upgrading to
|
||||
make all skip optimisations effective on the very first `index` run.
|
||||
|
||||
```
|
||||
rbv backfill \
|
||||
--target <PATH>... \
|
||||
--database <CONNSTR> \
|
||||
[--concurrency <N>] # default 16
|
||||
```
|
||||
|
||||
For each gallery that exists both on disk and in the database, `backfill`
|
||||
stats every image file and writes `file_size` into `gallery_images` rows
|
||||
where it is currently NULL. It also sets the gallery-level `file_count` and
|
||||
`total_bytes` columns so that gallery-level skip works immediately.
|
||||
|
||||
No files are read (only `stat()`-ed) and no ML models are loaded, so this
|
||||
runs very quickly even on large corpora.
|
||||
|
||||
---
|
||||
|
||||
### `cluster`
|
||||
|
||||
Group all indexed face embeddings into person identities using cosine
|
||||
@@ -85,6 +137,18 @@ rbv index --target /mnt/galleries/new-chunk --database "$DATABASE_URL" --model-d
|
||||
rbv cluster --database "$DATABASE_URL"
|
||||
```
|
||||
|
||||
### After upgrading (one-time)
|
||||
|
||||
If you have an existing database from before the skip optimisations were
|
||||
added, run `backfill` once to populate file sizes and gallery stats:
|
||||
|
||||
```sh
|
||||
rbv migrate --database "$DATABASE_URL"
|
||||
rbv backfill --target /mnt/galleries --database "$DATABASE_URL"
|
||||
```
|
||||
|
||||
Subsequent `index` runs will then benefit from all skip layers immediately.
|
||||
|
||||
---
|
||||
|
||||
## Resetting face assignments
|
||||
|
||||
@@ -15,6 +15,7 @@ impl Cli {
|
||||
Command::Migrate(args) => crate::commands::migrate::run(args).await,
|
||||
Command::Index(args) => crate::commands::index::run(args).await,
|
||||
Command::Cluster(args) => crate::commands::cluster::run(args).await,
|
||||
Command::Backfill(args) => crate::commands::backfill::run(args).await,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -27,6 +28,8 @@ pub enum Command {
|
||||
Index(IndexArgs),
|
||||
/// Cluster detected faces into person identities
|
||||
Cluster(ClusterArgs),
|
||||
/// Backfill file_size and gallery stats from disk (no ML work)
|
||||
Backfill(BackfillArgs),
|
||||
}
|
||||
|
||||
#[derive(Parser)]
|
||||
@@ -42,14 +45,6 @@ pub struct IndexArgs {
|
||||
#[arg(long, required = true)]
|
||||
pub target: Vec<PathBuf>,
|
||||
|
||||
/// Filename/extension inclusion filters (rsync-style globs)
|
||||
#[arg(long)]
|
||||
pub include: Option<Vec<String>>,
|
||||
|
||||
/// Filename/extension exclusion filters (rsync-style globs)
|
||||
#[arg(long)]
|
||||
pub exclude: Option<Vec<String>>,
|
||||
|
||||
/// PostgreSQL connection string
|
||||
#[arg(long)]
|
||||
pub database: String,
|
||||
@@ -78,6 +73,16 @@ pub struct IndexArgs {
|
||||
#[arg(long)]
|
||||
pub face_cache: Option<std::path::PathBuf>,
|
||||
|
||||
/// Number of concurrent I/O tasks (file reads + hashing). Defaults to
|
||||
/// 4× the ML concurrency since I/O is not compute-bound.
|
||||
#[arg(long)]
|
||||
pub io_concurrency: Option<usize>,
|
||||
|
||||
/// Force re-processing of all galleries, bypassing the gallery-level
|
||||
/// skip-if-unchanged check. Per-image deduplication still applies.
|
||||
#[arg(long)]
|
||||
pub reindex: bool,
|
||||
|
||||
/// Purge all ML-derived data (embeddings, face detections, persons)
|
||||
/// before indexing, forcing a full re-index from scratch.
|
||||
/// Also clears the face crop cache if --face-cache is provided.
|
||||
@@ -85,6 +90,21 @@ pub struct IndexArgs {
|
||||
pub ml_purge: bool,
|
||||
}
|
||||
|
||||
#[derive(Parser)]
|
||||
pub struct BackfillArgs {
|
||||
/// PostgreSQL connection string
|
||||
#[arg(long)]
|
||||
pub database: String,
|
||||
|
||||
/// Path(s) to scan. May be a root, chunk, or gallery directory.
|
||||
#[arg(long, required = true)]
|
||||
pub target: Vec<PathBuf>,
|
||||
|
||||
/// Number of galleries to process concurrently
|
||||
#[arg(long, default_value = "16")]
|
||||
pub concurrency: usize,
|
||||
}
|
||||
|
||||
#[derive(Parser)]
|
||||
pub struct ClusterArgs {
|
||||
/// PostgreSQL connection string
|
||||
|
||||
116
crates/rbv-cli/src/commands/backfill.rs
Normal file
116
crates/rbv-cli/src/commands/backfill.rs
Normal file
@@ -0,0 +1,116 @@
|
||||
use anyhow::Result;
|
||||
use std::collections::HashMap;
|
||||
use std::path::Path;
|
||||
use tokio::sync::Semaphore;
|
||||
use tracing::{info, warn};
|
||||
use std::sync::Arc;
|
||||
use crate::args::BackfillArgs;
|
||||
|
||||
pub async fn run(args: BackfillArgs) -> Result<()> {
|
||||
let pool = rbv_data::connect(&args.database, args.concurrency as u32 + 2).await?;
|
||||
|
||||
// Discover galleries on disk from the provided targets.
|
||||
let galleries_on_disk = rbv_ingest::discover_galleries(&args.target)?;
|
||||
let disk_set: std::collections::HashSet<String> = galleries_on_disk
|
||||
.iter()
|
||||
.map(|p| p.to_string_lossy().into_owned())
|
||||
.collect();
|
||||
info!("Found {} galleries on disk.", disk_set.len());
|
||||
|
||||
// Load all galleries from DB.
|
||||
let db_galleries = rbv_data::gallery::all_gallery_paths(&pool).await?;
|
||||
info!("Found {} galleries in database.", db_galleries.len());
|
||||
|
||||
// Only process galleries that exist both on disk and in DB.
|
||||
let to_process: Vec<_> = db_galleries
|
||||
.into_iter()
|
||||
.filter(|(_, path)| disk_set.contains(path))
|
||||
.collect();
|
||||
info!("Processing {} galleries present on disk and in database.", to_process.len());
|
||||
|
||||
let sem = Arc::new(Semaphore::new(args.concurrency));
|
||||
let pool = Arc::new(pool);
|
||||
let mut tasks = Vec::new();
|
||||
|
||||
for (gid, gallery_path) in to_process {
|
||||
let sem = sem.clone();
|
||||
let pool = pool.clone();
|
||||
|
||||
tasks.push(tokio::spawn(async move {
|
||||
let _permit = sem.acquire().await.unwrap();
|
||||
match backfill_gallery(&pool, &gid, &gallery_path).await {
|
||||
Ok((updated, file_count, total_bytes)) => {
|
||||
if updated > 0 {
|
||||
info!(updated, file_count, total_bytes, "{}", gid.to_hex());
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Error backfilling {}: {e:#}", gallery_path);
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
let mut ok = 0usize;
|
||||
let mut errs = 0usize;
|
||||
for task in tasks {
|
||||
match task.await {
|
||||
Ok(Ok(())) => ok += 1,
|
||||
Ok(Err(_)) | Err(_) => errs += 1,
|
||||
}
|
||||
}
|
||||
|
||||
info!("Backfill complete. Galleries: {ok} ok, {errs} errors.");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// For one gallery: stat every image file on disk, match against DB filenames,
|
||||
/// batch-update file_size, then update gallery stats.
|
||||
async fn backfill_gallery(
|
||||
pool: &rbv_data::PgPool,
|
||||
gid: &rbv_entity::GalleryId,
|
||||
gallery_path: &str,
|
||||
) -> Result<(u64, i32, i64)> {
|
||||
let gallery_dir = Path::new(gallery_path);
|
||||
if !gallery_dir.is_dir() {
|
||||
return Ok((0, 0, 0));
|
||||
}
|
||||
|
||||
// Get filenames already in DB for this gallery (those with NULL file_size).
|
||||
let existing = rbv_data::image::existing_gallery_images(pool, gid).await?;
|
||||
|
||||
// Stat image files on disk.
|
||||
let image_paths = rbv_ingest::gallery::list_images(gallery_dir)?;
|
||||
let mut disk_sizes: HashMap<String, i64> = HashMap::with_capacity(image_paths.len());
|
||||
let mut total_bytes: i64 = 0;
|
||||
|
||||
for path in &image_paths {
|
||||
if let Ok(meta) = std::fs::metadata(path) {
|
||||
let size = meta.len() as i64;
|
||||
total_bytes += size;
|
||||
if let Some(fname) = path.file_name().and_then(|n| n.to_str()) {
|
||||
disk_sizes.insert(fname.to_string(), size);
|
||||
}
|
||||
}
|
||||
}
|
||||
let file_count = image_paths.len() as i32;
|
||||
|
||||
// Build update list: only files that exist in DB with NULL file_size.
|
||||
let updates: Vec<(String, i64)> = disk_sizes
|
||||
.into_iter()
|
||||
.filter(|(fname, _)| {
|
||||
existing
|
||||
.get(fname)
|
||||
.is_some_and(|(_, stored_size)| stored_size.is_none())
|
||||
})
|
||||
.collect();
|
||||
|
||||
let updated = rbv_data::image::backfill_file_sizes(pool, gid, &updates).await?;
|
||||
|
||||
// Update gallery stats.
|
||||
rbv_data::gallery::update_gallery_stats(pool, gid, file_count, total_bytes).await?;
|
||||
|
||||
Ok((updated, file_count, total_bytes))
|
||||
}
|
||||
@@ -1,11 +1,15 @@
|
||||
use anyhow::Result;
|
||||
use tracing::info;
|
||||
use std::sync::Arc;
|
||||
use rbv_ingest::{IngestConfig, ingest_galleries, discover_galleries, FilterConfig};
|
||||
use rbv_ingest::{IngestConfig, ingest_galleries, discover_galleries};
|
||||
use crate::args::IndexArgs;
|
||||
|
||||
pub async fn run(args: IndexArgs) -> Result<()> {
|
||||
let pool = rbv_data::connect(&args.database, args.concurrency as u32 + 4).await?;
|
||||
let io_concurrency = args.io_concurrency.unwrap_or(args.concurrency * 2);
|
||||
// Pool must accommodate io_concurrency tasks (some call image_exists)
|
||||
// + ml_concurrency transactions + main-loop queries.
|
||||
let pool_size = (io_concurrency + args.concurrency + 2) as u32;
|
||||
let pool = rbv_data::connect(&args.database, pool_size).await?;
|
||||
|
||||
if args.ml_purge {
|
||||
info!("Purging all ML-derived data (embeddings, faces, persons)...");
|
||||
@@ -25,10 +29,10 @@ pub async fn run(args: IndexArgs) -> Result<()> {
|
||||
rbv_infer::OnnxBackend::load_with_options(&args.model_dir, args.face_score_thresh)?
|
||||
);
|
||||
|
||||
let filter = FilterConfig::new(args.include, args.exclude);
|
||||
let config = IngestConfig {
|
||||
concurrency: args.concurrency,
|
||||
filter,
|
||||
io_concurrency,
|
||||
reindex: args.reindex,
|
||||
};
|
||||
|
||||
info!("Discovering galleries in {} target(s)...", args.target.len());
|
||||
@@ -44,8 +48,9 @@ pub async fn run(args: IndexArgs) -> Result<()> {
|
||||
let report = ingest_galleries(&pool, ml, galleries, &config).await?;
|
||||
|
||||
info!(
|
||||
"Indexing complete. Galleries: {}, Images processed: {}, skipped: {}, Faces detected: {}, Errors: {}",
|
||||
"Indexing complete. Galleries: {} processed, {} skipped. Images: {} processed, {} skipped. Faces: {}. Errors: {}",
|
||||
report.galleries_processed,
|
||||
report.galleries_skipped,
|
||||
report.images_processed,
|
||||
report.images_skipped,
|
||||
report.faces_detected,
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
pub mod migrate;
|
||||
pub mod index;
|
||||
pub mod cluster;
|
||||
pub mod backfill;
|
||||
|
||||
@@ -6,6 +6,5 @@ license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
rbv-entity = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
rayon = { workspace = true }
|
||||
|
||||
@@ -6,13 +6,7 @@ license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
rbv-entity = { workspace = true }
|
||||
rbv-hash = { workspace = true }
|
||||
sqlx = { workspace = true }
|
||||
pgvector = { workspace = true }
|
||||
uuid = { workspace = true }
|
||||
chrono = { workspace = true }
|
||||
serde = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
anyhow = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
|
||||
@@ -62,6 +62,61 @@ pub async fn random_galleries(pool: &PgPool, count: i64) -> Result<Vec<Gallery>>
|
||||
Ok(rows.iter().map(row_to_gallery).collect())
|
||||
}
|
||||
|
||||
/// Return (gallery_id, path) for all galleries.
|
||||
pub async fn all_gallery_paths(pool: &PgPool) -> Result<Vec<(GalleryId, String)>> {
|
||||
let rows = sqlx::query("SELECT id, path FROM galleries")
|
||||
.fetch_all(pool)
|
||||
.await?;
|
||||
Ok(rows
|
||||
.iter()
|
||||
.map(|r| {
|
||||
let id_bytes: Vec<u8> = r.get("id");
|
||||
(
|
||||
GalleryId(id_bytes.try_into().expect("32-byte id")),
|
||||
r.get::<String, _>("path"),
|
||||
)
|
||||
})
|
||||
.collect())
|
||||
}
|
||||
|
||||
// ── Gallery stats (skip-if-unchanged) ────────────────────────────────────────
|
||||
|
||||
/// Returns stored (file_count, total_bytes) if both are set and the gallery
|
||||
/// has been indexed at least once. Returns None otherwise (gallery should be
|
||||
/// processed).
|
||||
pub async fn get_gallery_stats(pool: &PgPool, id: &GalleryId) -> Result<Option<(i32, i64)>> {
|
||||
let row = sqlx::query(
|
||||
"SELECT file_count, total_bytes FROM galleries WHERE id = $1 AND indexed_at IS NOT NULL",
|
||||
)
|
||||
.bind(id.as_bytes())
|
||||
.fetch_optional(pool)
|
||||
.await?;
|
||||
|
||||
Ok(row.and_then(|r| {
|
||||
let fc: Option<i32> = r.get("file_count");
|
||||
let tb: Option<i64> = r.get("total_bytes");
|
||||
fc.zip(tb)
|
||||
}))
|
||||
}
|
||||
|
||||
/// Update gallery-level file stats after successful processing.
|
||||
pub async fn update_gallery_stats(
|
||||
pool: &PgPool,
|
||||
id: &GalleryId,
|
||||
file_count: i32,
|
||||
total_bytes: i64,
|
||||
) -> Result<()> {
|
||||
sqlx::query(
|
||||
"UPDATE galleries SET file_count = $2, total_bytes = $3 WHERE id = $1",
|
||||
)
|
||||
.bind(id.as_bytes())
|
||||
.bind(file_count)
|
||||
.bind(total_bytes)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ── Tags ─────────────────────────────────────────────────────────────────────
|
||||
|
||||
pub async fn galleries_by_tag(pool: &PgPool, tag: &str, page: i64, per_page: i64) -> Result<Vec<Gallery>> {
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use std::collections::HashMap;
|
||||
use anyhow::Result;
|
||||
use sqlx::{PgPool, Postgres, Row};
|
||||
use rbv_entity::{GalleryId, GalleryImage, Image, ImageId};
|
||||
@@ -15,9 +16,37 @@ pub async fn purge_ml_data(pool: &PgPool) -> Result<()> {
|
||||
sqlx::query("TRUNCATE gallery_images, images CASCADE").execute(pool).await?;
|
||||
sqlx::query("ALTER TABLE face_detections ENABLE TRIGGER trg_fd_person_image_count")
|
||||
.execute(pool).await?;
|
||||
// Reset gallery-level stats so galleries are re-processed after purge.
|
||||
sqlx::query("UPDATE galleries SET file_count = NULL, total_bytes = NULL")
|
||||
.execute(pool).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Fetch all known (filename → (image_id, file_size)) for a gallery in one query.
|
||||
/// Used for batch existence checks and file_size fast-path skipping.
|
||||
pub async fn existing_gallery_images(
|
||||
pool: &PgPool,
|
||||
gallery_id: &GalleryId,
|
||||
) -> Result<HashMap<String, (ImageId, Option<i64>)>> {
|
||||
let rows = sqlx::query(
|
||||
"SELECT filename, image_id, file_size FROM gallery_images WHERE gallery_id = $1",
|
||||
)
|
||||
.bind(gallery_id.as_bytes())
|
||||
.fetch_all(pool)
|
||||
.await?;
|
||||
|
||||
Ok(rows
|
||||
.into_iter()
|
||||
.map(|r| {
|
||||
let filename: String = r.get("filename");
|
||||
let iid_bytes: Vec<u8> = r.get("image_id");
|
||||
let image_id = ImageId(iid_bytes.try_into().expect("32-byte id"));
|
||||
let file_size: Option<i64> = r.get("file_size");
|
||||
(filename, (image_id, file_size))
|
||||
})
|
||||
.collect())
|
||||
}
|
||||
|
||||
pub async fn image_exists(pool: &PgPool, id: &ImageId) -> Result<bool> {
|
||||
let row = sqlx::query("SELECT EXISTS(SELECT 1 FROM images WHERE id = $1)")
|
||||
.bind(id.as_bytes())
|
||||
@@ -53,26 +82,59 @@ where
|
||||
{
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO gallery_images (gallery_id, image_id, filename, ordering)
|
||||
VALUES ($1, $2, $3, $4)
|
||||
INSERT INTO gallery_images (gallery_id, image_id, filename, ordering, file_size)
|
||||
VALUES ($1, $2, $3, $4, $5)
|
||||
ON CONFLICT (gallery_id, image_id) DO UPDATE SET
|
||||
filename = EXCLUDED.filename,
|
||||
ordering = EXCLUDED.ordering
|
||||
filename = EXCLUDED.filename,
|
||||
ordering = EXCLUDED.ordering,
|
||||
file_size = EXCLUDED.file_size
|
||||
"#,
|
||||
)
|
||||
.bind(gi.gallery_id.as_bytes())
|
||||
.bind(gi.image_id.as_bytes())
|
||||
.bind(&gi.filename)
|
||||
.bind(gi.ordering)
|
||||
.bind(gi.file_size)
|
||||
.execute(executor)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Batch upsert gallery_images rows in chunks, using a single query per chunk.
|
||||
pub async fn upsert_gallery_images_batch(pool: &PgPool, items: &[GalleryImage]) -> Result<()> {
|
||||
use sqlx::{Postgres, QueryBuilder};
|
||||
|
||||
if items.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
// 5 bind params per row; Postgres limit is 65535.
|
||||
const CHUNK: usize = 13_000;
|
||||
for chunk in items.chunks(CHUNK) {
|
||||
let mut qb: QueryBuilder<Postgres> = QueryBuilder::new(
|
||||
"INSERT INTO gallery_images (gallery_id, image_id, filename, ordering, file_size) ",
|
||||
);
|
||||
qb.push_values(chunk, |mut b, gi| {
|
||||
b.push_bind(gi.gallery_id.as_bytes().to_vec())
|
||||
.push_bind(gi.image_id.as_bytes().to_vec())
|
||||
.push_bind(&gi.filename)
|
||||
.push_bind(gi.ordering)
|
||||
.push_bind(gi.file_size);
|
||||
});
|
||||
qb.push(
|
||||
" ON CONFLICT (gallery_id, image_id) DO UPDATE SET \
|
||||
filename = EXCLUDED.filename, \
|
||||
ordering = EXCLUDED.ordering, \
|
||||
file_size = EXCLUDED.file_size",
|
||||
);
|
||||
qb.build().execute(pool).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn list_gallery_images(pool: &PgPool, gallery_id: &GalleryId) -> Result<Vec<(GalleryImage, Image)>> {
|
||||
let rows = sqlx::query(
|
||||
r#"
|
||||
SELECT gi.gallery_id, gi.image_id, gi.filename, gi.ordering,
|
||||
SELECT gi.gallery_id, gi.image_id, gi.filename, gi.ordering, gi.file_size,
|
||||
i.width, i.height
|
||||
FROM gallery_images gi
|
||||
JOIN images i ON i.id = gi.image_id
|
||||
@@ -93,6 +155,7 @@ pub async fn list_gallery_images(pool: &PgPool, gallery_id: &GalleryId) -> Resul
|
||||
image_id: image_id.clone(),
|
||||
filename: r.get("filename"),
|
||||
ordering: r.get("ordering"),
|
||||
file_size: r.get("file_size"),
|
||||
};
|
||||
let image = Image {
|
||||
id: image_id,
|
||||
@@ -103,6 +166,35 @@ pub async fn list_gallery_images(pool: &PgPool, gallery_id: &GalleryId) -> Resul
|
||||
}).collect())
|
||||
}
|
||||
|
||||
/// Batch-update file_size for gallery_images rows matched by (gallery_id, filename).
|
||||
pub async fn backfill_file_sizes(
|
||||
pool: &PgPool,
|
||||
gallery_id: &GalleryId,
|
||||
updates: &[(String, i64)],
|
||||
) -> Result<u64> {
|
||||
use sqlx::{Postgres, QueryBuilder};
|
||||
|
||||
if updates.is_empty() {
|
||||
return Ok(0);
|
||||
}
|
||||
const CHUNK: usize = 16_000;
|
||||
let mut total = 0u64;
|
||||
for chunk in updates.chunks(CHUNK) {
|
||||
let mut qb: QueryBuilder<Postgres> = QueryBuilder::new(
|
||||
"UPDATE gallery_images SET file_size = v.file_size FROM (",
|
||||
);
|
||||
qb.push_values(chunk, |mut b, (filename, size)| {
|
||||
b.push_bind(filename.as_str()).push_bind(*size);
|
||||
});
|
||||
qb.push(") AS v(filename, file_size) WHERE gallery_images.gallery_id = ");
|
||||
qb.push_bind(gallery_id.as_bytes());
|
||||
qb.push(" AND gallery_images.filename = v.filename");
|
||||
let result = qb.build().execute(pool).await?;
|
||||
total += result.rows_affected();
|
||||
}
|
||||
Ok(total)
|
||||
}
|
||||
|
||||
pub async fn get_image(pool: &PgPool, id: &ImageId) -> Result<Option<Image>> {
|
||||
let row = sqlx::query("SELECT id, width, height FROM images WHERE id = $1")
|
||||
.bind(id.as_bytes())
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use std::time::Duration;
|
||||
use anyhow::Result;
|
||||
use sqlx::PgPool;
|
||||
use sqlx::postgres::PgPoolOptions;
|
||||
@@ -5,6 +6,14 @@ use sqlx::postgres::PgPoolOptions;
|
||||
pub async fn connect(connstring: &str, max_connections: u32) -> Result<PgPool> {
|
||||
let pool = PgPoolOptions::new()
|
||||
.max_connections(max_connections)
|
||||
// Pre-create connections so the pool is warm from the start,
|
||||
// avoiding slow-acquire warnings from lazy mTLS handshakes.
|
||||
.min_connections(max_connections)
|
||||
// Recycle idle connections after 5 minutes so stale connections
|
||||
// from prior runs don't linger.
|
||||
.idle_timeout(Duration::from_secs(300))
|
||||
// Fail fast rather than blocking for 30s on pool exhaustion.
|
||||
.acquire_timeout(Duration::from_secs(10))
|
||||
.connect(connstring)
|
||||
.await?;
|
||||
Ok(pool)
|
||||
|
||||
@@ -33,4 +33,5 @@ pub struct GalleryImage {
|
||||
pub image_id: ImageId,
|
||||
pub filename: String,
|
||||
pub ordering: i32,
|
||||
pub file_size: Option<i64>,
|
||||
}
|
||||
|
||||
@@ -13,8 +13,6 @@ sqlx = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
serde = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
anyhow = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
glob = { workspace = true }
|
||||
image = { workspace = true }
|
||||
|
||||
@@ -1,43 +0,0 @@
|
||||
use std::path::Path;
|
||||
use glob::Pattern;
|
||||
|
||||
pub struct FilterConfig {
|
||||
include: Option<Vec<Pattern>>,
|
||||
exclude: Option<Vec<Pattern>>,
|
||||
}
|
||||
|
||||
impl FilterConfig {
|
||||
pub fn new(include: Option<Vec<String>>, exclude: Option<Vec<String>>) -> Self {
|
||||
Self {
|
||||
include: include.map(|v| v.iter().filter_map(|s| Pattern::new(s).ok()).collect()),
|
||||
exclude: exclude.map(|v| v.iter().filter_map(|s| Pattern::new(s).ok()).collect()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns true if the given filename should be included.
|
||||
///
|
||||
/// Mirrors rsync behaviour:
|
||||
/// - If include patterns are set, filename must match at least one.
|
||||
/// - If exclude patterns are set, filename must not match any.
|
||||
/// - If neither is set, all files are included.
|
||||
pub fn matches(&self, path: &Path) -> bool {
|
||||
let name = match path.file_name().and_then(|n| n.to_str()) {
|
||||
Some(n) => n,
|
||||
None => return false,
|
||||
};
|
||||
|
||||
if let Some(includes) = &self.include {
|
||||
if !includes.iter().any(|p| p.matches(name)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(excludes) = &self.exclude {
|
||||
if excludes.iter().any(|p| p.matches(name)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
true
|
||||
}
|
||||
}
|
||||
@@ -1,8 +1,6 @@
|
||||
pub mod traversal;
|
||||
pub mod filter;
|
||||
pub mod gallery;
|
||||
pub mod pipeline;
|
||||
|
||||
pub use pipeline::{ingest_galleries, IngestConfig, IngestReport};
|
||||
pub use traversal::{discover_galleries, DirKind};
|
||||
pub use filter::FilterConfig;
|
||||
|
||||
@@ -1,29 +1,40 @@
|
||||
use std::collections::HashMap;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::time::Duration;
|
||||
use anyhow::Result;
|
||||
use tokio::sync::Semaphore;
|
||||
use tracing::{info, warn};
|
||||
use rbv_entity::{ClipEmbedding, FaceDetection, Gallery, GalleryImage, Image};
|
||||
use rbv_entity::{ClipEmbedding, FaceDetection, Gallery, GalleryImage, Image, ImageId};
|
||||
use rbv_hash::{face_id, gallery_id, image_id};
|
||||
use std::sync::Arc;
|
||||
use rbv_ml::{AnalysisResult, MlBackend, MlError};
|
||||
use sqlx::PgPool;
|
||||
use crate::filter::FilterConfig;
|
||||
use crate::gallery::{list_images, read_index_json};
|
||||
|
||||
pub struct IngestConfig {
|
||||
pub concurrency: usize,
|
||||
pub filter: FilterConfig,
|
||||
pub io_concurrency: usize,
|
||||
pub reindex: bool,
|
||||
}
|
||||
|
||||
pub struct IngestReport {
|
||||
pub galleries_processed: usize,
|
||||
pub galleries_skipped: usize,
|
||||
pub images_processed: usize,
|
||||
pub images_skipped: usize,
|
||||
pub faces_detected: usize,
|
||||
pub errors: Vec<(PathBuf, anyhow::Error)>,
|
||||
}
|
||||
|
||||
/// Result from process_image. Skip paths return the GalleryImage for
|
||||
/// batch upsert, avoiding per-image DB calls during the task phase.
|
||||
enum ImageResult {
|
||||
/// Image was skipped — gallery_image row needs batch upsert.
|
||||
Skipped(GalleryImage),
|
||||
/// Image was fully processed (ML + DB transaction done).
|
||||
Processed { faces: usize },
|
||||
}
|
||||
|
||||
pub async fn ingest_galleries(
|
||||
pool: &PgPool,
|
||||
ml: Arc<dyn MlBackend>,
|
||||
@@ -32,13 +43,15 @@ pub async fn ingest_galleries(
|
||||
) -> Result<IngestReport> {
|
||||
let mut report = IngestReport {
|
||||
galleries_processed: 0,
|
||||
galleries_skipped: 0,
|
||||
images_processed: 0,
|
||||
images_skipped: 0,
|
||||
faces_detected: 0,
|
||||
errors: Vec::new(),
|
||||
};
|
||||
|
||||
let semaphore = Arc::new(Semaphore::new(config.concurrency));
|
||||
let io_semaphore = Arc::new(Semaphore::new(config.io_concurrency));
|
||||
let ml_semaphore = Arc::new(Semaphore::new(config.concurrency));
|
||||
|
||||
for gallery_path in &galleries {
|
||||
tracing::trace!("processing gallery {}", gallery_path.display());
|
||||
@@ -78,22 +91,61 @@ pub async fn ingest_galleries(
|
||||
}
|
||||
};
|
||||
|
||||
let filtered: Vec<PathBuf> = image_paths
|
||||
.into_iter()
|
||||
.filter(|p| config.filter.matches(p))
|
||||
.collect();
|
||||
// ── Optimization 1: gallery-level skip ──────────────────────────
|
||||
let (file_count, total_bytes) = match compute_gallery_stats(&image_paths) {
|
||||
Ok(stats) => stats,
|
||||
Err(e) => {
|
||||
warn!("Cannot stat gallery {}, processing anyway: {e}", gallery_path.display());
|
||||
(-1, -1)
|
||||
}
|
||||
};
|
||||
|
||||
if !config.reindex {
|
||||
if let Ok(Some((stored_count, stored_bytes))) =
|
||||
rbv_data::gallery::get_gallery_stats(pool, &gid).await
|
||||
{
|
||||
if stored_count == file_count && stored_bytes == total_bytes {
|
||||
report.galleries_skipped += 1;
|
||||
if report.galleries_skipped % 100 == 0 {
|
||||
info!(skipped = report.galleries_skipped, "galleries unchanged");
|
||||
}
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ── Optimization 2: batch existence check ───────────────────────
|
||||
let existing = match rbv_data::image::existing_gallery_images(pool, &gid).await {
|
||||
Ok(m) => Arc::new(m),
|
||||
Err(e) => {
|
||||
warn!("Cannot fetch existing images for {}: {e}", gallery_path.display());
|
||||
Arc::new(HashMap::new())
|
||||
}
|
||||
};
|
||||
|
||||
let mut tasks = Vec::new();
|
||||
for (ordering, image_path) in filtered.iter().enumerate() {
|
||||
for (ordering, image_path) in image_paths.iter().enumerate() {
|
||||
let pool = pool.clone();
|
||||
let ml = Arc::clone(&ml);
|
||||
let gid = gid.clone();
|
||||
let image_path = image_path.clone();
|
||||
let sem = semaphore.clone();
|
||||
let io_sem = io_semaphore.clone();
|
||||
let ml_sem = ml_semaphore.clone();
|
||||
let existing = Arc::clone(&existing);
|
||||
|
||||
tasks.push(tokio::spawn(async move {
|
||||
let _permit = sem.acquire_owned().await.unwrap();
|
||||
let result = process_image(&pool, ml.as_ref(), &gid, &image_path, ordering as i32).await;
|
||||
let io_permit = io_sem.acquire_owned().await.unwrap();
|
||||
let result = process_image(
|
||||
&pool,
|
||||
ml.as_ref(),
|
||||
&gid,
|
||||
&image_path,
|
||||
ordering as i32,
|
||||
&existing,
|
||||
io_permit,
|
||||
&ml_sem,
|
||||
)
|
||||
.await;
|
||||
(image_path, result)
|
||||
}));
|
||||
}
|
||||
@@ -102,19 +154,20 @@ pub async fn ingest_galleries(
|
||||
let mut g_skipped = 0usize;
|
||||
let mut g_faces = 0usize;
|
||||
let mut g_errors = 0usize;
|
||||
let mut skipped_gis = Vec::new();
|
||||
|
||||
for task in tasks {
|
||||
match task.await {
|
||||
Ok((_, Ok((skipped, faces)))) => {
|
||||
if skipped {
|
||||
report.images_skipped += 1;
|
||||
g_skipped += 1;
|
||||
} else {
|
||||
report.images_processed += 1;
|
||||
report.faces_detected += faces;
|
||||
g_processed += 1;
|
||||
g_faces += faces;
|
||||
}
|
||||
Ok((_, Ok(ImageResult::Skipped(gi)))) => {
|
||||
skipped_gis.push(gi);
|
||||
report.images_skipped += 1;
|
||||
g_skipped += 1;
|
||||
}
|
||||
Ok((_, Ok(ImageResult::Processed { faces }))) => {
|
||||
report.images_processed += 1;
|
||||
report.faces_detected += faces;
|
||||
g_processed += 1;
|
||||
g_faces += faces;
|
||||
}
|
||||
Ok((image_path, Err(e))) => {
|
||||
warn!("{} {} {e:#}", gid.to_hex(), image_path.display());
|
||||
@@ -128,6 +181,15 @@ pub async fn ingest_galleries(
|
||||
}
|
||||
}
|
||||
|
||||
// Batch-upsert all skipped gallery_image rows in one query.
|
||||
if !skipped_gis.is_empty() {
|
||||
if let Err(e) = rbv_data::image::upsert_gallery_images_batch(pool, &skipped_gis).await {
|
||||
warn!("batch upsert gallery_images failed for {}: {e:#}", gid.to_hex());
|
||||
g_errors += 1;
|
||||
report.errors.push((gallery_path.clone(), e));
|
||||
}
|
||||
}
|
||||
|
||||
let label = if !gallery.source_name.is_empty() {
|
||||
format!("{} {}", gid.to_hex(), gallery.source_name)
|
||||
} else {
|
||||
@@ -141,57 +203,110 @@ pub async fn ingest_galleries(
|
||||
"{label}"
|
||||
);
|
||||
|
||||
if g_errors == 0 && file_count >= 0 {
|
||||
let _ = rbv_data::gallery::update_gallery_stats(
|
||||
pool, &gid, file_count, total_bytes,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
report.galleries_processed += 1;
|
||||
}
|
||||
|
||||
Ok(report)
|
||||
}
|
||||
|
||||
/// Process a single image: hash, check if exists, submit to ML, upsert to DB.
|
||||
/// Returns (was_skipped, faces_detected).
|
||||
/// Stat all image files to get (count, total_bytes) for gallery-level skip.
|
||||
fn compute_gallery_stats(paths: &[PathBuf]) -> std::io::Result<(i32, i64)> {
|
||||
let mut total: i64 = 0;
|
||||
for p in paths {
|
||||
total += std::fs::metadata(p)?.len() as i64;
|
||||
}
|
||||
Ok((paths.len() as i32, total))
|
||||
}
|
||||
|
||||
/// Process a single image through a two-stage pipeline:
|
||||
/// Stage 1 (IO permit held): stat, file_size fast-path, read, hash
|
||||
/// Stage 2 (ML semaphore): inference, DB write
|
||||
///
|
||||
/// Skip paths return `ImageResult::Skipped(gi)` without touching the DB —
|
||||
/// the caller batch-upserts all skipped gallery_image rows in one query.
|
||||
/// Only the ML path (new images) writes to the DB inside the task.
|
||||
async fn process_image(
|
||||
pool: &PgPool,
|
||||
ml: &dyn MlBackend,
|
||||
gallery_id: &rbv_entity::GalleryId,
|
||||
image_path: &PathBuf,
|
||||
ordering: i32,
|
||||
) -> Result<(bool, usize)> {
|
||||
let bytes = tokio::fs::read(image_path).await?;
|
||||
let iid = image_id(&bytes);
|
||||
|
||||
existing: &HashMap<String, (ImageId, Option<i64>)>,
|
||||
io_permit: tokio::sync::OwnedSemaphorePermit,
|
||||
ml_sem: &Semaphore,
|
||||
) -> Result<ImageResult> {
|
||||
let filename = image_path
|
||||
.file_name()
|
||||
.and_then(|n| n.to_str())
|
||||
.unwrap_or("")
|
||||
.to_string();
|
||||
|
||||
// Always upsert the gallery_images row (ordering may change)
|
||||
// ── Stage 1: I/O (io_permit already held) ───────────────────────────
|
||||
|
||||
// Optimization 3: file_size fast-path — stat only, no file read.
|
||||
let metadata = tokio::fs::metadata(image_path).await?;
|
||||
let file_size = metadata.len() as i64;
|
||||
|
||||
if let Some((existing_iid, Some(stored_size))) = existing.get(&filename) {
|
||||
if *stored_size == file_size {
|
||||
drop(io_permit);
|
||||
return Ok(ImageResult::Skipped(GalleryImage {
|
||||
gallery_id: gallery_id.clone(),
|
||||
image_id: existing_iid.clone(),
|
||||
filename,
|
||||
ordering,
|
||||
file_size: Some(file_size),
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
// File is new or changed — read and hash.
|
||||
let bytes = tokio::fs::read(image_path).await?;
|
||||
let iid = image_id(&bytes);
|
||||
|
||||
let gi = GalleryImage {
|
||||
gallery_id: gallery_id.clone(),
|
||||
image_id: iid.clone(),
|
||||
filename,
|
||||
ordering,
|
||||
file_size: Some(file_size),
|
||||
};
|
||||
|
||||
// Check if ML work has already been done for this image content
|
||||
if rbv_data::image::image_exists(pool, &iid).await? {
|
||||
rbv_data::image::upsert_gallery_image(pool, &gi).await?;
|
||||
return Ok((true, 0));
|
||||
// Optimization 2: check batch-loaded set instead of per-image DB query.
|
||||
if existing.values().any(|(eid, _)| *eid == iid) {
|
||||
drop(io_permit);
|
||||
return Ok(ImageResult::Skipped(gi));
|
||||
}
|
||||
|
||||
// Resize for ML API (max 1MB part limit). Hash is computed on originals above.
|
||||
let ml_bytes = prepare_for_ml(&bytes)?;
|
||||
// Check images table directly (image may exist in another gallery).
|
||||
if rbv_data::image::image_exists(pool, &iid).await? {
|
||||
drop(io_permit);
|
||||
return Ok(ImageResult::Skipped(gi));
|
||||
}
|
||||
|
||||
// Resize for ML (hash is computed on originals above).
|
||||
let ml_bytes = prepare_for_ml(&bytes)?;
|
||||
drop(bytes);
|
||||
drop(io_permit);
|
||||
|
||||
// ── Stage 2: ML inference (under ml_sem) ────────────────────────────
|
||||
let _ml_permit = ml_sem.acquire().await.unwrap();
|
||||
|
||||
// Submit to ML API (with retry/backoff for transient connection errors)
|
||||
let result = match analyze_with_backoff(ml, &ml_bytes, image_path).await {
|
||||
Ok(r) => r,
|
||||
Err(e) => {
|
||||
warn!("Skipping {} after ML retries exhausted: {e:#}", image_path.display());
|
||||
return Ok((true, 0));
|
||||
return Ok(ImageResult::Skipped(gi));
|
||||
}
|
||||
};
|
||||
|
||||
// Build all entities before touching the DB
|
||||
let image = Image {
|
||||
id: iid.clone(),
|
||||
width: Some(result.image_width),
|
||||
@@ -214,7 +329,6 @@ async fn process_image(
|
||||
}).collect();
|
||||
let face_count = faces.len();
|
||||
|
||||
// Write everything in a single transaction
|
||||
let mut tx = pool.begin().await?;
|
||||
rbv_data::image::upsert_image(&mut *tx, &image).await?;
|
||||
rbv_data::image::upsert_gallery_image(&mut *tx, &gi).await?;
|
||||
@@ -222,11 +336,10 @@ async fn process_image(
|
||||
rbv_data::face::upsert_faces_batch(&mut *tx, &faces).await?;
|
||||
tx.commit().await?;
|
||||
|
||||
Ok((false, face_count))
|
||||
Ok(ImageResult::Processed { faces: face_count })
|
||||
}
|
||||
|
||||
/// Call `ml.analyze_image` with exponential backoff for transient failures.
|
||||
/// Retries up to 3 times (delays: 5s, 10s, 20s) on connection/timeout/5xx errors.
|
||||
async fn analyze_with_backoff(
|
||||
ml: &dyn MlBackend,
|
||||
bytes: &[u8],
|
||||
@@ -262,8 +375,6 @@ async fn analyze_with_backoff(
|
||||
}
|
||||
|
||||
/// Decode, resize if needed, and re-encode as JPEG for the ML API.
|
||||
/// Always produces JPEG output regardless of the source format, ensuring
|
||||
/// Pillow on the server side can always identify the image.
|
||||
fn prepare_for_ml(bytes: &[u8]) -> anyhow::Result<Vec<u8>> {
|
||||
const MAX_DIM: u32 = 1280;
|
||||
|
||||
|
||||
@@ -10,5 +10,3 @@ rbv-data = { workspace = true }
|
||||
rbv-ml = { workspace = true }
|
||||
sqlx = { workspace = true }
|
||||
anyhow = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
|
||||
5
migrations/0011_gallery_stats.sql
Normal file
5
migrations/0011_gallery_stats.sql
Normal file
@@ -0,0 +1,5 @@
|
||||
-- Gallery-level stats for skip-if-unchanged optimisation.
|
||||
-- NULL means "never computed" → gallery will not be skipped on next index run.
|
||||
ALTER TABLE galleries
|
||||
ADD COLUMN IF NOT EXISTS file_count INT DEFAULT NULL,
|
||||
ADD COLUMN IF NOT EXISTS total_bytes BIGINT DEFAULT NULL;
|
||||
4
migrations/0012_gallery_image_file_size.sql
Normal file
4
migrations/0012_gallery_image_file_size.sql
Normal file
@@ -0,0 +1,4 @@
|
||||
-- Per-image file size for fast-path skip (avoid reading + hashing unchanged files).
|
||||
-- NULL means "unknown" → first re-index after migration will populate this column.
|
||||
ALTER TABLE gallery_images
|
||||
ADD COLUMN IF NOT EXISTS file_size BIGINT DEFAULT NULL;
|
||||
Reference in New Issue
Block a user