Pre-warm connection pool and size it to match concurrency
Configure sqlx pool with min_connections = max_connections so all connections are established at startup, avoiding slow-acquire warnings from lazy mTLS handshakes. Add idle_timeout (5 min) to recycle stale connections from prior runs, and reduce acquire_timeout to 10s for faster failure. Size the pool to io_concurrency + ml_concurrency + 2 to accommodate the worst case where all IO tasks call image_exists concurrently. Reduce default io_concurrency from 4× to 2× ML concurrency to keep pool size within PostgreSQL's default max_connections. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -27,7 +27,7 @@ rbv index \
|
||||
--database <CONNSTR> \
|
||||
--model-dir <PATH> \
|
||||
[--concurrency <N>] # ML concurrency, default 4
|
||||
[--io-concurrency <N>] # file I/O concurrency, default 4× ML concurrency
|
||||
[--io-concurrency <N>] # file I/O concurrency, default 2× ML concurrency
|
||||
[--reindex] # bypass gallery-level skip
|
||||
[--ml-purge] # wipe all ML data and re-index from scratch
|
||||
```
|
||||
|
||||
@@ -5,10 +5,11 @@ use rbv_ingest::{IngestConfig, ingest_galleries, discover_galleries};
|
||||
use crate::args::IndexArgs;
|
||||
|
||||
pub async fn run(args: IndexArgs) -> Result<()> {
|
||||
let io_concurrency = args.io_concurrency.unwrap_or(args.concurrency * 4);
|
||||
// Pool size: ML concurrency (transactions) + a few for IO-phase image_exists
|
||||
// checks and the sequential gallery-level queries on the main loop.
|
||||
let pool = rbv_data::connect(&args.database, (args.concurrency + 6) as u32).await?;
|
||||
let io_concurrency = args.io_concurrency.unwrap_or(args.concurrency * 2);
|
||||
// Pool must accommodate io_concurrency tasks (some call image_exists)
|
||||
// + ml_concurrency transactions + main-loop queries.
|
||||
let pool_size = (io_concurrency + args.concurrency + 2) as u32;
|
||||
let pool = rbv_data::connect(&args.database, pool_size).await?;
|
||||
|
||||
if args.ml_purge {
|
||||
info!("Purging all ML-derived data (embeddings, faces, persons)...");
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use std::time::Duration;
|
||||
use anyhow::Result;
|
||||
use sqlx::PgPool;
|
||||
use sqlx::postgres::PgPoolOptions;
|
||||
@@ -5,6 +6,14 @@ use sqlx::postgres::PgPoolOptions;
|
||||
pub async fn connect(connstring: &str, max_connections: u32) -> Result<PgPool> {
|
||||
let pool = PgPoolOptions::new()
|
||||
.max_connections(max_connections)
|
||||
// Pre-create connections so the pool is warm from the start,
|
||||
// avoiding slow-acquire warnings from lazy mTLS handshakes.
|
||||
.min_connections(max_connections)
|
||||
// Recycle idle connections after 5 minutes so stale connections
|
||||
// from prior runs don't linger.
|
||||
.idle_timeout(Duration::from_secs(300))
|
||||
// Fail fast rather than blocking for 30s on pool exhaustion.
|
||||
.acquire_timeout(Duration::from_secs(10))
|
||||
.connect(connstring)
|
||||
.await?;
|
||||
Ok(pool)
|
||||
|
||||
Reference in New Issue
Block a user