Merge feat/B3-cortex-upstream-client: cortex upstream entitlement client + chain (B3, #57)
All checks were successful
build-prerelease / Resolve version stamps + change detection (push) Successful in 41s
build-prerelease / Build neuron-blackwell (push) Successful in 1m37s
build-prerelease / Build neuron-ampere (push) Successful in 2m14s
build-prerelease / Build neuron-ada (push) Successful in 2m14s
build-prerelease / Build helexa-bench binary (push) Successful in 2m45s
build-prerelease / Build cortex binary (push) Successful in 2m58s
build-prerelease / Lint (fmt + clippy) (push) Successful in 3m32s
build-prerelease / Test (push) Successful in 6m44s
build-prerelease / Package helexa-neuron-ada RPM (push) Successful in 1m38s
build-prerelease / Package helexa-neuron-blackwell RPM (push) Successful in 1m46s
build-prerelease / Package helexa-neuron-ampere RPM (push) Successful in 1m47s
build-prerelease / Package helexa-bench RPM (push) Successful in 1m15s
build-prerelease / Package cortex RPM (push) Successful in 1m17s
build-prerelease / Publish to rpm.lair.cafe (unstable) (push) Successful in 54s
All checks were successful
build-prerelease / Resolve version stamps + change detection (push) Successful in 41s
build-prerelease / Build neuron-blackwell (push) Successful in 1m37s
build-prerelease / Build neuron-ampere (push) Successful in 2m14s
build-prerelease / Build neuron-ada (push) Successful in 2m14s
build-prerelease / Build helexa-bench binary (push) Successful in 2m45s
build-prerelease / Build cortex binary (push) Successful in 2m58s
build-prerelease / Lint (fmt + clippy) (push) Successful in 3m32s
build-prerelease / Test (push) Successful in 6m44s
build-prerelease / Package helexa-neuron-ada RPM (push) Successful in 1m38s
build-prerelease / Package helexa-neuron-blackwell RPM (push) Successful in 1m46s
build-prerelease / Package helexa-neuron-ampere RPM (push) Successful in 1m47s
build-prerelease / Package helexa-bench RPM (push) Successful in 1m15s
build-prerelease / Package cortex RPM (push) Successful in 1m17s
build-prerelease / Publish to rpm.lair.cafe (unstable) (push) Successful in 54s
This commit is contained in:
@@ -90,3 +90,18 @@ account_id = "operator"
|
||||
key_id = "infra"
|
||||
# No hard_cap → uncapped operator infra key (own fleet, own use). Still
|
||||
# metered for visibility.
|
||||
|
||||
# -- Upstream (helexa mesh) entitlements client (#57) --------------------
|
||||
# When enabled, a bearer key NOT found in [[entitlements.keys]] above is
|
||||
# validated against the helexa-upstream authority (mesh accounts), and its
|
||||
# budget is reserved/settled there. Operator-local keys (incl. the infra
|
||||
# key) never leave this process. Fail-closed: if upstream is unreachable a
|
||||
# request is refused (503 + Retry-After), never served un-authorized.
|
||||
# Disabled by default — a standalone operator runs purely local.
|
||||
[upstream]
|
||||
enabled = false
|
||||
# url = "https://upstream.helexa.ai"
|
||||
# Shared client bearer this cortex presents (maps to an operator_id
|
||||
# upstream). Override via CORTEX_UPSTREAM__BEARER in prod.
|
||||
# bearer = "replace-with-operator-client-secret"
|
||||
# timeout_secs = 5
|
||||
|
||||
@@ -22,6 +22,36 @@ pub struct GatewayConfig {
|
||||
/// setups keep working until keys are configured.
|
||||
#[serde(default)]
|
||||
pub entitlements: EntitlementsConfig,
|
||||
/// helexa-upstream client (#57). When enabled, keys not found in the
|
||||
/// local `[entitlements]` config are validated against the mesh
|
||||
/// authority, and budget is reserved/settled there. Disabled by default
|
||||
/// — a single operator runs purely local.
|
||||
#[serde(default)]
|
||||
pub upstream: UpstreamClientConfig,
|
||||
}
|
||||
|
||||
/// `[upstream]` — the helexa-upstream authority client (#57). Locally
|
||||
/// unrecognised bearer keys are resolved against `url`'s `/authz/v1` surface
|
||||
/// (mesh accounts); local keys (operator + infra) never leave the process.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
|
||||
pub struct UpstreamClientConfig {
|
||||
/// Enable the upstream fallthrough. Off → purely local entitlements.
|
||||
#[serde(default)]
|
||||
pub enabled: bool,
|
||||
/// Base URL of helexa-upstream (e.g. "https://upstream.helexa.ai").
|
||||
#[serde(default)]
|
||||
pub url: String,
|
||||
/// Shared client bearer this cortex presents to `/authz/v1` (maps to an
|
||||
/// operator_id upstream). Sent as `Authorization: Bearer <bearer>`.
|
||||
#[serde(default)]
|
||||
pub bearer: String,
|
||||
/// Per-call timeout (seconds) to upstream.
|
||||
#[serde(default = "default_upstream_timeout")]
|
||||
pub timeout_secs: u64,
|
||||
}
|
||||
|
||||
fn default_upstream_timeout() -> u64 {
|
||||
5
|
||||
}
|
||||
|
||||
/// `[entitlements]` — the local/static [`crate::entitlements::EntitlementProvider`]
|
||||
@@ -129,6 +159,7 @@ impl Default for GatewayConfig {
|
||||
neurons: vec![],
|
||||
models_config: default_models_path(),
|
||||
entitlements: EntitlementsConfig::default(),
|
||||
upstream: UpstreamClientConfig::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -81,12 +81,19 @@ pub struct BudgetSnapshot {
|
||||
pub reserved: u64,
|
||||
}
|
||||
|
||||
/// Authentication failure — the bearer key could not be resolved. Maps to
|
||||
/// `401 invalid_api_key` (#49/#63).
|
||||
/// Authentication failure — the bearer key could not be resolved.
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum AuthError {
|
||||
/// The key is genuinely unknown → `401 invalid_api_key` (#49/#63).
|
||||
#[error("invalid or unknown API key")]
|
||||
InvalidKey,
|
||||
/// The authority that could resolve the key is unreachable (e.g. the
|
||||
/// helexa-upstream client failed, #57). Fail **closed** but distinctly:
|
||||
/// a transient outage must surface as `503 service_unavailable` +
|
||||
/// `Retry-After`, never `401` — a real key must not be rejected as
|
||||
/// invalid during an upstream blip.
|
||||
#[error("entitlement authority unavailable; retry in {retry_after_secs}s")]
|
||||
Unavailable { retry_after_secs: u64 },
|
||||
}
|
||||
|
||||
/// Why a reservation was refused. Carries enough for the caller to build the
|
||||
|
||||
@@ -22,7 +22,7 @@ use axum::http::header::AUTHORIZATION;
|
||||
use axum::http::{HeaderMap, HeaderValue};
|
||||
use axum::middleware::Next;
|
||||
use axum::response::Response;
|
||||
use cortex_core::entitlements::{HEADER_ACCOUNT_ID, HEADER_KEY_ID};
|
||||
use cortex_core::entitlements::{AuthError, HEADER_ACCOUNT_ID, HEADER_KEY_ID};
|
||||
use cortex_core::error_envelope::OpenAiError;
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -83,14 +83,25 @@ pub async fn require_principal(
|
||||
req.extensions_mut().insert(principal);
|
||||
next.run(req).await
|
||||
}
|
||||
// An unrecognized key only hard-fails when auth is *required*.
|
||||
// In allow-anonymous mode (the default) we must IGNORE it and
|
||||
// serve the request unauthenticated — otherwise the placeholder
|
||||
// keys that OpenAI-compatible clients send by default (opencode,
|
||||
// Open WebUI, Agent Zero, litellm) would all break, even though
|
||||
// the operator never opted into auth. Pre-#49 the bearer was
|
||||
// never inspected at all; this preserves that for require_auth=false.
|
||||
Err(_) => {
|
||||
// The entitlement authority is unreachable (upstream client
|
||||
// blip, #57). Fail **closed but distinct**: a transient outage
|
||||
// must not reject a real key as `401 invalid_api_key` — it's a
|
||||
// retryable `503`. This holds regardless of require_auth: we
|
||||
// can't safely serve a key we couldn't authorize.
|
||||
Err(AuthError::Unavailable { retry_after_secs }) => {
|
||||
envelope_response(OpenAiError::service_unavailable(
|
||||
"entitlement authority temporarily unavailable",
|
||||
Some(retry_after_secs),
|
||||
))
|
||||
}
|
||||
// A genuinely unrecognized key only hard-fails when auth is
|
||||
// *required*. In allow-anonymous mode (the default) we IGNORE it
|
||||
// and serve unauthenticated — otherwise the placeholder keys that
|
||||
// OpenAI-compatible clients send by default (opencode, Open WebUI,
|
||||
// Agent Zero, litellm) would all break though the operator never
|
||||
// opted into auth. Pre-#49 the bearer was never inspected; this
|
||||
// preserves that for require_auth=false.
|
||||
Err(AuthError::InvalidKey) => {
|
||||
if fleet.require_auth {
|
||||
unauthorized("invalid API key")
|
||||
} else {
|
||||
|
||||
112
crates/cortex-gateway/src/entitlements_chain.rs
Normal file
112
crates/cortex-gateway/src/entitlements_chain.rs
Normal file
@@ -0,0 +1,112 @@
|
||||
//! Chained entitlement provider (#57): operator-local keys first, mesh
|
||||
//! upstream for everything else.
|
||||
//!
|
||||
//! `resolve` tries the [`LocalEntitlementProvider`] (operator + infra keys —
|
||||
//! never a network hop); only a locally-unknown key falls through to
|
||||
//! [`UpstreamEntitlementProvider`]. Because the local provider treats an
|
||||
//! unconfigured principal as uncapped, reserve/settle/release/snapshot must
|
||||
//! **not** blindly hit local — they dispatch to whichever backend resolved
|
||||
//! that account, remembered in a map keyed by `account_id` (populated at
|
||||
//! resolve time).
|
||||
|
||||
use crate::entitlements_local::LocalEntitlementProvider;
|
||||
use crate::entitlements_upstream::UpstreamEntitlementProvider;
|
||||
use async_trait::async_trait;
|
||||
use cortex_core::entitlements::{
|
||||
AuthError, BudgetError, BudgetSnapshot, EntitlementProvider, Principal, Reservation,
|
||||
};
|
||||
use std::collections::HashMap;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||
enum Backend {
|
||||
Local,
|
||||
Upstream,
|
||||
}
|
||||
|
||||
pub struct ChainedEntitlementProvider {
|
||||
local: LocalEntitlementProvider,
|
||||
upstream: UpstreamEntitlementProvider,
|
||||
/// account_id → which backend owns it, learned at resolve time.
|
||||
backends: RwLock<HashMap<String, Backend>>,
|
||||
}
|
||||
|
||||
impl ChainedEntitlementProvider {
|
||||
pub fn new(local: LocalEntitlementProvider, upstream: UpstreamEntitlementProvider) -> Self {
|
||||
Self {
|
||||
local,
|
||||
upstream,
|
||||
backends: RwLock::new(HashMap::new()),
|
||||
}
|
||||
}
|
||||
|
||||
async fn record(&self, account_id: &str, backend: Backend) {
|
||||
self.backends
|
||||
.write()
|
||||
.await
|
||||
.insert(account_id.to_string(), backend);
|
||||
}
|
||||
|
||||
/// The backend that owns `account_id`. Defaults to `Upstream` for an
|
||||
/// account never resolved this process-lifetime (a resolve always
|
||||
/// precedes reserve in a request, so this is just a safe fallback —
|
||||
/// upstream fails closed if the account is bogus).
|
||||
async fn backend_for(&self, account_id: &str) -> Backend {
|
||||
self.backends
|
||||
.read()
|
||||
.await
|
||||
.get(account_id)
|
||||
.copied()
|
||||
.unwrap_or(Backend::Upstream)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl EntitlementProvider for ChainedEntitlementProvider {
|
||||
async fn resolve(&self, api_key: &str) -> Result<Principal, AuthError> {
|
||||
match self.local.resolve(api_key).await {
|
||||
Ok(p) => {
|
||||
self.record(&p.account_id, Backend::Local).await;
|
||||
Ok(p)
|
||||
}
|
||||
Err(AuthError::InvalidKey) => {
|
||||
let p = self.upstream.resolve(api_key).await?;
|
||||
self.record(&p.account_id, Backend::Upstream).await;
|
||||
Ok(p)
|
||||
}
|
||||
Err(e) => Err(e),
|
||||
}
|
||||
}
|
||||
|
||||
async fn reserve(
|
||||
&self,
|
||||
principal: &Principal,
|
||||
max_tokens: u64,
|
||||
) -> Result<Reservation, BudgetError> {
|
||||
match self.backend_for(&principal.account_id).await {
|
||||
Backend::Local => self.local.reserve(principal, max_tokens).await,
|
||||
Backend::Upstream => self.upstream.reserve(principal, max_tokens).await,
|
||||
}
|
||||
}
|
||||
|
||||
async fn settle(&self, reservation: Reservation, actual_tokens: u64) {
|
||||
match self.backend_for(&reservation.principal.account_id).await {
|
||||
Backend::Local => self.local.settle(reservation, actual_tokens).await,
|
||||
Backend::Upstream => self.upstream.settle(reservation, actual_tokens).await,
|
||||
}
|
||||
}
|
||||
|
||||
async fn release(&self, reservation: Reservation) {
|
||||
match self.backend_for(&reservation.principal.account_id).await {
|
||||
Backend::Local => self.local.release(reservation).await,
|
||||
Backend::Upstream => self.upstream.release(reservation).await,
|
||||
}
|
||||
}
|
||||
|
||||
async fn snapshot(&self, principal: &Principal) -> Option<BudgetSnapshot> {
|
||||
match self.backend_for(&principal.account_id).await {
|
||||
Backend::Local => self.local.snapshot(principal).await,
|
||||
Backend::Upstream => self.upstream.snapshot(principal).await,
|
||||
}
|
||||
}
|
||||
}
|
||||
246
crates/cortex-gateway/src/entitlements_upstream.rs
Normal file
246
crates/cortex-gateway/src/entitlements_upstream.rs
Normal file
@@ -0,0 +1,246 @@
|
||||
//! helexa-upstream client (#57): an [`EntitlementProvider`] that resolves
|
||||
//! keys and reserves/settles budget against the mesh authority's
|
||||
//! `/authz/v1` surface (B2). It is "just another impl of the trait" — cortex
|
||||
//! enforcement (`auth.rs`, `metering.rs`) is unchanged.
|
||||
//!
|
||||
//! **Fail closed.** When upstream is unreachable, `resolve` returns
|
||||
//! [`AuthError::Unavailable`] (→ `503`, never `401`) and `reserve` refuses
|
||||
//! with a retryable [`BudgetError::RateLimited`] — a request is never served
|
||||
//! on an un-authorized key, and a real key is never rejected as invalid
|
||||
//! during a blip.
|
||||
|
||||
use async_trait::async_trait;
|
||||
use cortex_core::config::UpstreamClientConfig;
|
||||
use cortex_core::entitlements::{
|
||||
AuthError, BudgetError, BudgetSnapshot, EntitlementProvider, Principal, Reservation,
|
||||
};
|
||||
use serde::Deserialize;
|
||||
use std::time::Duration;
|
||||
|
||||
/// Retry-After (seconds) advertised when we fail closed on an upstream
|
||||
/// outage.
|
||||
const FAIL_CLOSED_RETRY_SECS: u64 = 5;
|
||||
|
||||
pub struct UpstreamEntitlementProvider {
|
||||
client: reqwest::Client,
|
||||
base_url: String,
|
||||
bearer: String,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct PrincipalDto {
|
||||
account_id: String,
|
||||
key_id: String,
|
||||
}
|
||||
#[derive(Deserialize)]
|
||||
struct SnapshotDto {
|
||||
hard_cap: Option<u64>,
|
||||
spent: u64,
|
||||
reserved: u64,
|
||||
}
|
||||
#[derive(Deserialize)]
|
||||
struct ResolveResp {
|
||||
principal: PrincipalDto,
|
||||
#[allow(dead_code)]
|
||||
snapshot: Option<SnapshotDto>,
|
||||
}
|
||||
#[derive(Deserialize)]
|
||||
struct ReserveResp {
|
||||
reservation_id: Option<i64>,
|
||||
rejected: Option<Rejection>,
|
||||
}
|
||||
#[derive(Deserialize)]
|
||||
#[serde(tag = "kind", rename_all = "snake_case")]
|
||||
enum Rejection {
|
||||
InsufficientQuota {
|
||||
requested: u64,
|
||||
available: u64,
|
||||
},
|
||||
RateLimited {
|
||||
requested: u64,
|
||||
available: u64,
|
||||
retry_after_secs: u64,
|
||||
},
|
||||
}
|
||||
|
||||
impl UpstreamEntitlementProvider {
|
||||
pub fn new(cfg: &UpstreamClientConfig) -> Self {
|
||||
let client = reqwest::Client::builder()
|
||||
.timeout(Duration::from_secs(cfg.timeout_secs))
|
||||
.build()
|
||||
.expect("failed to build upstream HTTP client");
|
||||
Self {
|
||||
client,
|
||||
base_url: cfg.url.trim_end_matches('/').to_string(),
|
||||
bearer: cfg.bearer.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
fn url(&self, path: &str) -> String {
|
||||
format!("{}{}", self.base_url, path)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl EntitlementProvider for UpstreamEntitlementProvider {
|
||||
async fn resolve(&self, api_key: &str) -> Result<Principal, AuthError> {
|
||||
let resp = self
|
||||
.client
|
||||
.post(self.url("/authz/v1/resolve"))
|
||||
.bearer_auth(&self.bearer)
|
||||
.json(&serde_json::json!({ "api_key": api_key }))
|
||||
.send()
|
||||
.await;
|
||||
let resp = match resp {
|
||||
Ok(r) => r,
|
||||
Err(e) => {
|
||||
tracing::warn!(error = %e, "upstream resolve unreachable; failing closed");
|
||||
return Err(AuthError::Unavailable {
|
||||
retry_after_secs: FAIL_CLOSED_RETRY_SECS,
|
||||
});
|
||||
}
|
||||
};
|
||||
if resp.status().as_u16() == 401 {
|
||||
return Err(AuthError::InvalidKey);
|
||||
}
|
||||
if !resp.status().is_success() {
|
||||
return Err(AuthError::Unavailable {
|
||||
retry_after_secs: FAIL_CLOSED_RETRY_SECS,
|
||||
});
|
||||
}
|
||||
match resp.json::<ResolveResp>().await {
|
||||
Ok(r) => Ok(Principal {
|
||||
account_id: r.principal.account_id,
|
||||
key_id: r.principal.key_id,
|
||||
}),
|
||||
Err(e) => {
|
||||
tracing::warn!(error = %e, "upstream resolve: bad body; failing closed");
|
||||
Err(AuthError::Unavailable {
|
||||
retry_after_secs: FAIL_CLOSED_RETRY_SECS,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn reserve(
|
||||
&self,
|
||||
principal: &Principal,
|
||||
max_tokens: u64,
|
||||
) -> Result<Reservation, BudgetError> {
|
||||
let fail_closed = || BudgetError::RateLimited {
|
||||
requested: max_tokens,
|
||||
available: 0,
|
||||
retry_after_secs: FAIL_CLOSED_RETRY_SECS,
|
||||
};
|
||||
let resp = self
|
||||
.client
|
||||
.post(self.url("/authz/v1/reserve"))
|
||||
.bearer_auth(&self.bearer)
|
||||
.json(&serde_json::json!({
|
||||
"account_id": principal.account_id,
|
||||
"key_id": principal.key_id,
|
||||
"max_tokens": max_tokens,
|
||||
}))
|
||||
.send()
|
||||
.await;
|
||||
let resp = match resp {
|
||||
Ok(r) if r.status().is_success() => r,
|
||||
Ok(r) => {
|
||||
tracing::warn!(status = %r.status(), "upstream reserve non-2xx; failing closed");
|
||||
return Err(fail_closed());
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!(error = %e, "upstream reserve unreachable; failing closed");
|
||||
return Err(fail_closed());
|
||||
}
|
||||
};
|
||||
match resp.json::<ReserveResp>().await {
|
||||
Ok(ReserveResp {
|
||||
reservation_id: Some(id),
|
||||
..
|
||||
}) => Ok(Reservation {
|
||||
id: id as u64,
|
||||
principal: principal.clone(),
|
||||
reserved: max_tokens,
|
||||
}),
|
||||
Ok(ReserveResp {
|
||||
rejected:
|
||||
Some(Rejection::InsufficientQuota {
|
||||
requested,
|
||||
available,
|
||||
}),
|
||||
..
|
||||
}) => Err(BudgetError::InsufficientQuota {
|
||||
requested,
|
||||
available,
|
||||
}),
|
||||
Ok(ReserveResp {
|
||||
rejected:
|
||||
Some(Rejection::RateLimited {
|
||||
requested,
|
||||
available,
|
||||
retry_after_secs,
|
||||
}),
|
||||
..
|
||||
}) => Err(BudgetError::RateLimited {
|
||||
requested,
|
||||
available,
|
||||
retry_after_secs,
|
||||
}),
|
||||
_ => Err(fail_closed()),
|
||||
}
|
||||
}
|
||||
|
||||
async fn settle(&self, reservation: Reservation, actual_tokens: u64) {
|
||||
// Best-effort; a lost settle is reaped by the upstream sweeper (B2).
|
||||
let _ = self
|
||||
.client
|
||||
.post(self.url("/authz/v1/settle"))
|
||||
.bearer_auth(&self.bearer)
|
||||
.json(&serde_json::json!({
|
||||
"reservation_id": reservation.id as i64,
|
||||
"actual_tokens": actual_tokens,
|
||||
}))
|
||||
.send()
|
||||
.await
|
||||
.inspect_err(
|
||||
|e| tracing::warn!(error = %e, "upstream settle failed (sweeper will reap)"),
|
||||
);
|
||||
}
|
||||
|
||||
async fn release(&self, reservation: Reservation) {
|
||||
let _ = self
|
||||
.client
|
||||
.post(self.url("/authz/v1/release"))
|
||||
.bearer_auth(&self.bearer)
|
||||
.json(&serde_json::json!({ "reservation_id": reservation.id as i64 }))
|
||||
.send()
|
||||
.await
|
||||
.inspect_err(
|
||||
|e| tracing::warn!(error = %e, "upstream release failed (sweeper will reap)"),
|
||||
);
|
||||
}
|
||||
|
||||
async fn snapshot(&self, principal: &Principal) -> Option<BudgetSnapshot> {
|
||||
let resp = self
|
||||
.client
|
||||
.post(self.url("/authz/v1/snapshot"))
|
||||
.bearer_auth(&self.bearer)
|
||||
.json(&serde_json::json!({
|
||||
"account_id": principal.account_id,
|
||||
"key_id": principal.key_id,
|
||||
}))
|
||||
.send()
|
||||
.await
|
||||
.ok()?;
|
||||
if !resp.status().is_success() {
|
||||
return None;
|
||||
}
|
||||
let dto = resp.json::<SnapshotDto>().await.ok()?;
|
||||
Some(BudgetSnapshot {
|
||||
hard_cap: dto.hard_cap,
|
||||
spent: dto.spent,
|
||||
reserved: dto.reserved,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -1,6 +1,8 @@
|
||||
pub mod anthropic_sse;
|
||||
pub mod auth;
|
||||
pub mod entitlements_chain;
|
||||
pub mod entitlements_local;
|
||||
pub mod entitlements_upstream;
|
||||
pub mod error;
|
||||
pub mod evictor;
|
||||
pub mod handlers;
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
use crate::entitlements_chain::ChainedEntitlementProvider;
|
||||
use crate::entitlements_local::LocalEntitlementProvider;
|
||||
use crate::entitlements_upstream::UpstreamEntitlementProvider;
|
||||
use cortex_core::catalogue::ModelCatalogue;
|
||||
use cortex_core::config::{EvictionSettings, GatewayConfig, NeuronEndpoint};
|
||||
use cortex_core::entitlements::EntitlementProvider;
|
||||
@@ -45,8 +47,20 @@ impl CortexState {
|
||||
|
||||
let catalogue = ModelCatalogue::load(&config.models_config);
|
||||
|
||||
let entitlements: Arc<dyn EntitlementProvider> =
|
||||
Arc::new(LocalEntitlementProvider::from_config(&config.entitlements));
|
||||
// Local provider always handles operator + infra keys. When the
|
||||
// upstream client is enabled (#57), wrap it in the chain so locally
|
||||
// unknown keys fall through to the mesh authority; otherwise stay
|
||||
// purely local.
|
||||
let local = LocalEntitlementProvider::from_config(&config.entitlements);
|
||||
let entitlements: Arc<dyn EntitlementProvider> = if config.upstream.enabled {
|
||||
tracing::info!(url = %config.upstream.url, "upstream entitlement client enabled");
|
||||
Arc::new(ChainedEntitlementProvider::new(
|
||||
local,
|
||||
UpstreamEntitlementProvider::new(&config.upstream),
|
||||
))
|
||||
} else {
|
||||
Arc::new(local)
|
||||
};
|
||||
|
||||
Self {
|
||||
nodes: RwLock::new(nodes),
|
||||
|
||||
@@ -57,6 +57,7 @@ async fn test_alias_resolves_in_chat_completions() {
|
||||
}],
|
||||
models_config: models_path.to_string_lossy().to_string(),
|
||||
entitlements: Default::default(),
|
||||
upstream: Default::default(),
|
||||
};
|
||||
|
||||
let fleet = Arc::new(CortexState::from_config(&config));
|
||||
@@ -143,6 +144,7 @@ async fn test_aliases_surface_in_v1_models() {
|
||||
}],
|
||||
models_config: models_path.to_string_lossy().to_string(),
|
||||
entitlements: Default::default(),
|
||||
upstream: Default::default(),
|
||||
};
|
||||
|
||||
let fleet = Arc::new(CortexState::from_config(&config));
|
||||
@@ -232,6 +234,7 @@ async fn test_alias_falls_through_for_unmapped_model() {
|
||||
}],
|
||||
models_config: models_path.to_string_lossy().to_string(),
|
||||
entitlements: Default::default(),
|
||||
upstream: Default::default(),
|
||||
};
|
||||
|
||||
let fleet = Arc::new(CortexState::from_config(&config));
|
||||
|
||||
@@ -105,6 +105,7 @@ async fn spawn_gateway(neuron_url: &str, entitlements: EntitlementsConfig) -> St
|
||||
}],
|
||||
models_config: "/dev/null".into(),
|
||||
entitlements,
|
||||
upstream: Default::default(),
|
||||
};
|
||||
|
||||
let fleet = Arc::new(CortexState::from_config(&config));
|
||||
|
||||
@@ -81,6 +81,7 @@ async fn spawn_gateway(neuron_url: &str, key: ApiKeyConfig) -> (Arc<CortexState>
|
||||
require_auth: true,
|
||||
keys: vec![key],
|
||||
},
|
||||
upstream: Default::default(),
|
||||
};
|
||||
let fleet = Arc::new(CortexState::from_config(&config));
|
||||
{
|
||||
|
||||
@@ -430,6 +430,7 @@ pub async fn spawn_gateway_with_state(mock_url: &str) -> (Arc<CortexState>, Stri
|
||||
}],
|
||||
models_config: "/dev/null".into(),
|
||||
entitlements: Default::default(),
|
||||
upstream: Default::default(),
|
||||
};
|
||||
|
||||
let fleet = Arc::new(CortexState::from_config(&config));
|
||||
|
||||
@@ -89,6 +89,7 @@ async fn error_response_no_healthy_nodes() {
|
||||
}],
|
||||
models_config: "/dev/null".into(),
|
||||
entitlements: Default::default(),
|
||||
upstream: Default::default(),
|
||||
};
|
||||
|
||||
let fleet = Arc::new(cortex_gateway::state::CortexState::from_config(&config));
|
||||
|
||||
@@ -72,6 +72,7 @@ fn make_fleet(endpoint: &str, defrag_after: u32) -> Arc<CortexState> {
|
||||
}],
|
||||
models_config: "/dev/null".into(),
|
||||
entitlements: Default::default(),
|
||||
upstream: Default::default(),
|
||||
};
|
||||
Arc::new(CortexState::from_config(&config))
|
||||
}
|
||||
|
||||
@@ -74,6 +74,7 @@ async fn fleet_with(big_healthy: bool, big_devices: usize) -> Arc<CortexState> {
|
||||
],
|
||||
models_config: cat.to_string_lossy().into_owned(),
|
||||
entitlements: Default::default(),
|
||||
upstream: Default::default(),
|
||||
};
|
||||
let fleet = Arc::new(CortexState::from_config(&config));
|
||||
{
|
||||
|
||||
@@ -72,6 +72,7 @@ async fn two_neuron_fleet(endpoint_a: &str, endpoint_b: &str) -> Arc<CortexState
|
||||
],
|
||||
models_config: "/dev/null".into(),
|
||||
entitlements: Default::default(),
|
||||
upstream: Default::default(),
|
||||
};
|
||||
Arc::new(CortexState::from_config(&config))
|
||||
}
|
||||
|
||||
@@ -53,6 +53,7 @@ async fn spawn_metered_gateway(neuron_url: &str) -> (Arc<CortexState>, String) {
|
||||
window: CapWindow::Balance,
|
||||
}],
|
||||
},
|
||||
upstream: Default::default(),
|
||||
};
|
||||
|
||||
let fleet = Arc::new(CortexState::from_config(&config));
|
||||
@@ -158,6 +159,7 @@ async fn anonymous_request_records_no_spend() {
|
||||
}],
|
||||
models_config: "/dev/null".into(),
|
||||
entitlements: EntitlementsConfig::default(),
|
||||
upstream: Default::default(),
|
||||
};
|
||||
let fleet = Arc::new(CortexState::from_config(&config));
|
||||
{
|
||||
|
||||
@@ -66,6 +66,7 @@ harness = "candle"
|
||||
}],
|
||||
models_config: cat_path.to_string_lossy().into_owned(),
|
||||
entitlements: Default::default(),
|
||||
upstream: Default::default(),
|
||||
};
|
||||
|
||||
let fleet = Arc::new(CortexState::from_config(&config));
|
||||
|
||||
@@ -55,6 +55,7 @@ capabilities = ["text"]
|
||||
}],
|
||||
models_config: cat_path.to_string_lossy().into_owned(),
|
||||
entitlements: Default::default(),
|
||||
upstream: Default::default(),
|
||||
};
|
||||
|
||||
let fleet = Arc::new(CortexState::from_config(&config));
|
||||
|
||||
@@ -32,6 +32,7 @@ async fn test_poller_discovers_models() {
|
||||
}],
|
||||
models_config: "/dev/null".into(),
|
||||
entitlements: Default::default(),
|
||||
upstream: Default::default(),
|
||||
};
|
||||
|
||||
let fleet = Arc::new(CortexState::from_config(&config));
|
||||
@@ -84,6 +85,7 @@ async fn test_poller_updates_gateway_models_endpoint() {
|
||||
}],
|
||||
models_config: "/dev/null".into(),
|
||||
entitlements: Default::default(),
|
||||
upstream: Default::default(),
|
||||
};
|
||||
|
||||
let fleet = Arc::new(CortexState::from_config(&config));
|
||||
@@ -156,6 +158,7 @@ async fn test_models_endpoint_unions_capabilities_across_nodes() {
|
||||
],
|
||||
models_config: "/dev/null".into(),
|
||||
entitlements: Default::default(),
|
||||
upstream: Default::default(),
|
||||
};
|
||||
|
||||
let fleet = Arc::new(CortexState::from_config(&config));
|
||||
@@ -219,6 +222,7 @@ async fn test_poller_marks_unreachable_node_unhealthy() {
|
||||
}],
|
||||
models_config: "/dev/null".into(),
|
||||
entitlements: Default::default(),
|
||||
upstream: Default::default(),
|
||||
};
|
||||
|
||||
let fleet = Arc::new(CortexState::from_config(&config));
|
||||
@@ -273,6 +277,7 @@ async fn test_poller_removes_stale_models() {
|
||||
}],
|
||||
models_config: "/dev/null".into(),
|
||||
entitlements: Default::default(),
|
||||
upstream: Default::default(),
|
||||
};
|
||||
|
||||
let fleet = Arc::new(CortexState::from_config(&config));
|
||||
@@ -304,6 +309,7 @@ async fn test_poller_removes_stale_models() {
|
||||
}],
|
||||
models_config: "/dev/null".into(),
|
||||
entitlements: Default::default(),
|
||||
upstream: Default::default(),
|
||||
};
|
||||
|
||||
let fleet2 = Arc::new(CortexState::from_config(&config2));
|
||||
@@ -386,6 +392,7 @@ async fn test_poller_captures_activation_from_health() {
|
||||
}],
|
||||
models_config: "/dev/null".into(),
|
||||
entitlements: Default::default(),
|
||||
upstream: Default::default(),
|
||||
};
|
||||
|
||||
let fleet = Arc::new(CortexState::from_config(&config));
|
||||
@@ -431,6 +438,7 @@ async fn test_poller_parses_recovering_status() {
|
||||
}],
|
||||
models_config: "/dev/null".into(),
|
||||
entitlements: Default::default(),
|
||||
upstream: Default::default(),
|
||||
};
|
||||
|
||||
let fleet = Arc::new(CortexState::from_config(&config));
|
||||
|
||||
@@ -75,6 +75,7 @@ async fn spawn_gateway(neuron: &str, context: usize) -> String {
|
||||
}],
|
||||
models_config: "/dev/null".into(),
|
||||
entitlements: Default::default(),
|
||||
upstream: Default::default(),
|
||||
};
|
||||
let fleet = Arc::new(CortexState::from_config(&config));
|
||||
{
|
||||
|
||||
@@ -118,6 +118,7 @@ async fn test_no_healthy_nodes() {
|
||||
}],
|
||||
models_config: "/dev/null".into(),
|
||||
entitlements: Default::default(),
|
||||
upstream: Default::default(),
|
||||
};
|
||||
let fleet = std::sync::Arc::new(cortex_gateway::state::CortexState::from_config(&config));
|
||||
|
||||
|
||||
105
crates/cortex-gateway/tests/upstream_chain.rs
Normal file
105
crates/cortex-gateway/tests/upstream_chain.rs
Normal file
@@ -0,0 +1,105 @@
|
||||
//! B3: the chained entitlement provider (local → upstream) and fail-closed
|
||||
//! semantics, exercised against a mock helexa-upstream `/authz/v1`.
|
||||
|
||||
use axum::{Json, Router, routing::post};
|
||||
use cortex_core::config::{ApiKeyConfig, EntitlementsConfig, UpstreamClientConfig};
|
||||
use cortex_core::entitlements::{AuthError, EntitlementProvider};
|
||||
use cortex_gateway::entitlements_chain::ChainedEntitlementProvider;
|
||||
use cortex_gateway::entitlements_local::LocalEntitlementProvider;
|
||||
use cortex_gateway::entitlements_upstream::UpstreamEntitlementProvider;
|
||||
use serde_json::{Value, json};
|
||||
use tokio::net::TcpListener;
|
||||
|
||||
/// Mock upstream: `mesh-key` resolves to a mesh account; anything else 401.
|
||||
/// reserve always grants reservation 1.
|
||||
async fn spawn_mock_upstream() -> String {
|
||||
async fn resolve(Json(body): Json<Value>) -> axum::response::Response {
|
||||
use axum::response::IntoResponse;
|
||||
if body["api_key"] == "mesh-key" {
|
||||
Json(json!({"principal": {"account_id": "mesh-acct", "key_id": "mesh-key-1"}}))
|
||||
.into_response()
|
||||
} else {
|
||||
(
|
||||
axum::http::StatusCode::UNAUTHORIZED,
|
||||
Json(json!({"error": {"code": "invalid_api_key"}})),
|
||||
)
|
||||
.into_response()
|
||||
}
|
||||
}
|
||||
async fn reserve() -> Json<Value> {
|
||||
Json(json!({ "reservation_id": 1 }))
|
||||
}
|
||||
let app = Router::new()
|
||||
.route("/authz/v1/resolve", post(resolve))
|
||||
.route("/authz/v1/reserve", post(reserve));
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let addr = listener.local_addr().unwrap();
|
||||
tokio::spawn(async move {
|
||||
axum::serve(listener, app).await.unwrap();
|
||||
});
|
||||
format!("http://{addr}")
|
||||
}
|
||||
|
||||
fn local_with_key() -> LocalEntitlementProvider {
|
||||
let cfg = EntitlementsConfig {
|
||||
require_auth: false,
|
||||
keys: vec![ApiKeyConfig {
|
||||
key: "local-key".into(),
|
||||
account_id: "op".into(),
|
||||
key_id: None,
|
||||
hard_cap: None,
|
||||
window: Default::default(),
|
||||
}],
|
||||
};
|
||||
LocalEntitlementProvider::from_config(&cfg)
|
||||
}
|
||||
|
||||
fn chain(local: LocalEntitlementProvider, url: &str) -> ChainedEntitlementProvider {
|
||||
let upstream = UpstreamEntitlementProvider::new(&UpstreamClientConfig {
|
||||
enabled: true,
|
||||
url: url.to_string(),
|
||||
bearer: "client-secret".into(),
|
||||
timeout_secs: 5,
|
||||
});
|
||||
ChainedEntitlementProvider::new(local, upstream)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn local_key_resolves_locally() {
|
||||
let url = spawn_mock_upstream().await;
|
||||
let c = chain(local_with_key(), &url);
|
||||
let p = c.resolve("local-key").await.expect("local resolves");
|
||||
assert_eq!(p.account_id, "op");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn unknown_key_falls_through_to_upstream() {
|
||||
let url = spawn_mock_upstream().await;
|
||||
let c = chain(local_with_key(), &url);
|
||||
let p = c.resolve("mesh-key").await.expect("upstream resolves");
|
||||
assert_eq!(p.account_id, "mesh-acct");
|
||||
assert_eq!(p.key_id, "mesh-key-1");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn unknown_everywhere_is_invalid_key() {
|
||||
let url = spawn_mock_upstream().await;
|
||||
let c = chain(local_with_key(), &url);
|
||||
match c.resolve("nope").await {
|
||||
Err(AuthError::InvalidKey) => {}
|
||||
other => panic!("expected InvalidKey, got {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn upstream_unreachable_fails_closed_as_unavailable() {
|
||||
// No mock — point at a dead port. A locally-unknown key must surface
|
||||
// Unavailable (→ 503), never InvalidKey (→ 401).
|
||||
let c = chain(local_with_key(), "http://127.0.0.1:1");
|
||||
match c.resolve("some-mesh-key").await {
|
||||
Err(AuthError::Unavailable { retry_after_secs }) => assert!(retry_after_secs > 0),
|
||||
other => panic!("expected Unavailable, got {other:?}"),
|
||||
}
|
||||
// A local key still resolves without touching upstream.
|
||||
assert_eq!(c.resolve("local-key").await.unwrap().account_id, "op");
|
||||
}
|
||||
Reference in New Issue
Block a user