feat: candle-native rule-based strategy DSL
Adds a `rule_based` strategy variant that lets users define trading strategies entirely through JSON config — no code changes required per new strategy. **Strategy DSL (`RuleBasedParams`)** - `Condition` enum tagged by `"kind"`: `ema_crossover`, `ema_trend`, `rsi`, `bollinger`, `price_level`, `position`, `all_of`, `any_of`, `not`; nestable arbitrarily - `candle_interval` field (e.g. `"5m"`) declares the evaluation cadence - Signal evaluation in `strategy/signal.rs`; strategy impl in `strategy/rule_strategy.rs` **Candle-native evaluation** - `CandleAggregator` in `instrument_data.rs` accumulates raw trades into fixed-interval candles for live mode, replacing the `interval_secs` time-throttle hack - `candle_ready: bool` on `SwymInstrumentData` gates strategy evaluation to exactly one `generate_algo_orders` call per candle close - Candle-based backtests continue using pre-formed `DataKind::Candle` events from the DB (aggregator not needed on that path) **Wire-up** - `worker.rs` derives the effective candle interval from `RuleBasedParams.candle_interval` for rule-based runs - `runner.rs` initialises live `SwymInstrumentData` with the aggregator and pre-registered EMA periods when the strategy requires candle mode Existing compiled strategies (`SimpleSpread`, `EmaCrossover`, `RsiReversal`, `BollingerBreakout`) are unchanged. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -15,6 +15,8 @@ pub enum StrategyConfig {
|
||||
RsiReversal(RsiReversalParams),
|
||||
/// Momentum strategy that buys breakouts above/below Bollinger Bands.
|
||||
BollingerBreakout(BollingerBreakoutParams),
|
||||
/// API-defined strategy composed of declarative JSON rules evaluated on each candle close.
|
||||
RuleBased(RuleBasedParams),
|
||||
}
|
||||
|
||||
impl Default for StrategyConfig {
|
||||
@@ -118,6 +120,171 @@ fn default_oversold() -> Decimal { Decimal::from(30) }
|
||||
fn default_bb_period() -> u32 { 20 }
|
||||
fn default_num_std_dev() -> Decimal { Decimal::TWO }
|
||||
|
||||
// --- Rule-based strategy DSL ---
|
||||
|
||||
/// Parameters for the runtime-interpreted rule-based strategy.
|
||||
///
|
||||
/// Strategies are expressed as a list of rules evaluated on each candle close.
|
||||
/// All rules whose `when` condition is true simultaneously fire their `then` action.
|
||||
///
|
||||
/// Example JSON:
|
||||
/// ```json
|
||||
/// {
|
||||
/// "type": "rule_based",
|
||||
/// "candle_interval": "5m",
|
||||
/// "rules": [
|
||||
/// {
|
||||
/// "when": { "kind": "all_of", "conditions": [
|
||||
/// { "kind": "ema_crossover", "fast_period": 9, "slow_period": 21, "direction": "above" },
|
||||
/// { "kind": "position", "state": "flat" }
|
||||
/// ]},
|
||||
/// "then": { "side": "buy", "quantity": "0.001" }
|
||||
/// }
|
||||
/// ]
|
||||
/// }
|
||||
/// ```
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
pub struct RuleBasedParams {
|
||||
/// Candle interval driving strategy evaluation, e.g. `"1m"`, `"5m"`, `"1h"`.
|
||||
pub candle_interval: String,
|
||||
/// Ordered list of trading rules. All rules whose condition is true on a candle close fire.
|
||||
pub rules: Vec<Rule>,
|
||||
}
|
||||
|
||||
/// A single trading rule: place an order when `when` evaluates to true.
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
pub struct Rule {
|
||||
pub when: Condition,
|
||||
pub then: Action,
|
||||
}
|
||||
|
||||
/// The order to place when a rule fires.
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
pub struct Action {
|
||||
pub side: ActionSide,
|
||||
/// Per-order size in base asset units. For sell orders, actual size is capped to open position.
|
||||
pub quantity: Decimal,
|
||||
}
|
||||
|
||||
/// Order direction for a rule action.
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum ActionSide {
|
||||
Buy,
|
||||
Sell,
|
||||
}
|
||||
|
||||
/// A composable condition tree evaluated against instrument state at candle close.
|
||||
///
|
||||
/// Leaf variants check market indicators or current position.
|
||||
/// Combinator variants compose multiple conditions.
|
||||
///
|
||||
/// All variants are tagged with `"kind"` for unambiguous JSON parsing.
|
||||
///
|
||||
/// ```json
|
||||
/// { "kind": "ema_crossover", "fast_period": 9, "slow_period": 21, "direction": "above" }
|
||||
/// { "kind": "position", "state": "flat" }
|
||||
/// { "kind": "all_of", "conditions": [...] }
|
||||
/// { "kind": "not", "condition": { "kind": "position", "state": "long" } }
|
||||
/// ```
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
#[serde(tag = "kind", rename_all = "snake_case")]
|
||||
pub enum Condition {
|
||||
/// True when fast EMA crosses above (or below) slow EMA on this candle.
|
||||
EmaCrossover {
|
||||
fast_period: usize,
|
||||
slow_period: usize,
|
||||
direction: CrossoverDirection,
|
||||
},
|
||||
/// True when price is above (or below) a single EMA.
|
||||
EmaTrend {
|
||||
period: usize,
|
||||
direction: TrendDirection,
|
||||
},
|
||||
/// True when RSI crosses a threshold.
|
||||
Rsi {
|
||||
#[serde(default = "default_rsi_period_usize")]
|
||||
period: usize,
|
||||
threshold: Decimal,
|
||||
comparison: Comparison,
|
||||
},
|
||||
/// True when price breaks above the upper or below the lower Bollinger Band.
|
||||
Bollinger {
|
||||
#[serde(default = "default_bb_period_usize")]
|
||||
period: usize,
|
||||
#[serde(default = "default_num_std_dev")]
|
||||
num_std_dev: Decimal,
|
||||
band: BollingerBand,
|
||||
},
|
||||
/// True when price is above or below a fixed level.
|
||||
PriceLevel {
|
||||
price: Decimal,
|
||||
direction: TrendDirection,
|
||||
},
|
||||
/// True when the current position matches the required state.
|
||||
Position {
|
||||
state: PositionState,
|
||||
},
|
||||
/// True when all sub-conditions are true.
|
||||
AllOf {
|
||||
conditions: Vec<Condition>,
|
||||
},
|
||||
/// True when any sub-condition is true.
|
||||
AnyOf {
|
||||
conditions: Vec<Condition>,
|
||||
},
|
||||
/// True when the sub-condition is false.
|
||||
Not {
|
||||
condition: Box<Condition>,
|
||||
},
|
||||
}
|
||||
|
||||
/// EMA crossover direction.
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum CrossoverDirection {
|
||||
/// Fast EMA crossed above slow EMA on this candle.
|
||||
Above,
|
||||
/// Fast EMA crossed below slow EMA on this candle.
|
||||
Below,
|
||||
}
|
||||
|
||||
/// Price / EMA trend direction.
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum TrendDirection {
|
||||
Above,
|
||||
Below,
|
||||
}
|
||||
|
||||
/// Threshold comparison direction for RSI.
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum Comparison {
|
||||
Above,
|
||||
Below,
|
||||
}
|
||||
|
||||
/// Which Bollinger Band the price should breach.
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum BollingerBand {
|
||||
AboveUpper,
|
||||
BelowLower,
|
||||
}
|
||||
|
||||
/// Required position state for a position condition.
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum PositionState {
|
||||
Flat,
|
||||
Long,
|
||||
Short,
|
||||
}
|
||||
|
||||
fn default_rsi_period_usize() -> usize { 14 }
|
||||
fn default_bb_period_usize() -> usize { 20 }
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
@@ -58,6 +58,8 @@ pub async fn execute_run(
|
||||
.await?;
|
||||
|
||||
let strategy = SwymStrategy::from_config(&strategy_config);
|
||||
let ema_periods = strategy.ema_periods();
|
||||
let candle_interval_secs = strategy.candle_interval_secs();
|
||||
|
||||
let system_args = SystemArgs::new(
|
||||
&instruments,
|
||||
@@ -67,7 +69,17 @@ pub async fn execute_run(
|
||||
DefaultRiskManager::default(),
|
||||
market_stream,
|
||||
DefaultGlobalData::default(),
|
||||
|_| SwymInstrumentData::default(),
|
||||
move |_| {
|
||||
let mut data = if let Some(secs) = candle_interval_secs {
|
||||
SwymInstrumentData::with_candle_aggregator(secs)
|
||||
} else {
|
||||
SwymInstrumentData::default()
|
||||
};
|
||||
for &period in &ema_periods {
|
||||
data.register_ema(period);
|
||||
}
|
||||
data
|
||||
},
|
||||
);
|
||||
|
||||
let mut system = SystemBuilder::new(system_args)
|
||||
|
||||
@@ -25,6 +25,109 @@ use std::collections::{HashMap, VecDeque};
|
||||
|
||||
use crate::strategy::indicators::RunningEma;
|
||||
|
||||
/// A closed OHLCV candle produced by [`CandleAggregator`].
|
||||
#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
|
||||
pub struct CompletedCandle {
|
||||
pub open_time: DateTime<Utc>,
|
||||
pub open: f64,
|
||||
pub high: f64,
|
||||
pub low: f64,
|
||||
pub close: f64,
|
||||
pub volume: f64,
|
||||
pub trade_count: u64,
|
||||
}
|
||||
|
||||
/// Aggregates individual trades into fixed-interval OHLCV candles for live mode.
|
||||
///
|
||||
/// Candle boundaries are aligned to UTC epoch (e.g. 1m candles close at :00, :01, ...).
|
||||
/// Call [`push_trade`] for each incoming trade. When the return value is `Some`, a new
|
||||
/// candle has just closed — update indicators and set `candle_ready = true`.
|
||||
#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
|
||||
pub struct CandleAggregator {
|
||||
pub interval_secs: u64,
|
||||
pub current_slot: Option<DateTime<Utc>>,
|
||||
pub open: f64,
|
||||
pub high: f64,
|
||||
pub low: f64,
|
||||
pub close: f64,
|
||||
pub volume: f64,
|
||||
pub trade_count: u64,
|
||||
}
|
||||
|
||||
impl CandleAggregator {
|
||||
pub fn new(interval_secs: u64) -> Self {
|
||||
Self {
|
||||
interval_secs,
|
||||
current_slot: None,
|
||||
open: 0.0,
|
||||
high: 0.0,
|
||||
low: 0.0,
|
||||
close: 0.0,
|
||||
volume: 0.0,
|
||||
trade_count: 0,
|
||||
}
|
||||
}
|
||||
|
||||
/// Feed a trade into the aggregator.
|
||||
///
|
||||
/// Returns `Some(candle)` when the trade's timestamp crosses into a new candle
|
||||
/// interval, closing the previous candle. The returned candle's OHLCV reflects
|
||||
/// all trades up to (but not including) the current trade.
|
||||
pub fn push_trade(
|
||||
&mut self,
|
||||
price: f64,
|
||||
volume: f64,
|
||||
time: DateTime<Utc>,
|
||||
) -> Option<CompletedCandle> {
|
||||
let slot = self.slot_for(time);
|
||||
match self.current_slot {
|
||||
None => {
|
||||
self.current_slot = Some(slot);
|
||||
self.open = price;
|
||||
self.high = price;
|
||||
self.low = price;
|
||||
self.close = price;
|
||||
self.volume = volume;
|
||||
self.trade_count = 1;
|
||||
None
|
||||
}
|
||||
Some(current) if current == slot => {
|
||||
if price > self.high { self.high = price; }
|
||||
if price < self.low { self.low = price; }
|
||||
self.close = price;
|
||||
self.volume += volume;
|
||||
self.trade_count += 1;
|
||||
None
|
||||
}
|
||||
Some(current) => {
|
||||
let closed = CompletedCandle {
|
||||
open_time: current,
|
||||
open: self.open,
|
||||
high: self.high,
|
||||
low: self.low,
|
||||
close: self.close,
|
||||
volume: self.volume,
|
||||
trade_count: self.trade_count,
|
||||
};
|
||||
self.current_slot = Some(slot);
|
||||
self.open = price;
|
||||
self.high = price;
|
||||
self.low = price;
|
||||
self.close = price;
|
||||
self.volume = volume;
|
||||
self.trade_count = 1;
|
||||
Some(closed)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn slot_for(&self, time: DateTime<Utc>) -> DateTime<Utc> {
|
||||
let ts = time.timestamp();
|
||||
let slot_ts = (ts / self.interval_secs as i64) * self.interval_secs as i64;
|
||||
DateTime::from_timestamp(slot_ts, 0).unwrap_or(time)
|
||||
}
|
||||
}
|
||||
|
||||
/// Maximum number of prices retained for window-based indicators (RSI, Bollinger Bands).
|
||||
/// EMA computation uses [`RunningEma`] and is not subject to this limit.
|
||||
const MAX_PRICE_HISTORY: usize = 500;
|
||||
@@ -34,11 +137,15 @@ const MAX_PRICE_HISTORY: usize = 500;
|
||||
/// indicators (RSI, Bollinger), and stateful [`RunningEma`] instances for EMA-based
|
||||
/// signals.
|
||||
///
|
||||
/// Handles both tick-based and candle-based data:
|
||||
/// - `DataKind::Trade`: updates price history and EMAs, delegates rest to inner.
|
||||
/// - `DataKind::Candle`: updates inner's `last_traded_price` and `l1` (using close
|
||||
/// as synthetic bid/ask) so that `price()` returns the candle close and fill
|
||||
/// simulation works correctly.
|
||||
/// Supports two evaluation modes controlled by `candle_aggregator`:
|
||||
/// - **Tick mode** (`candle_aggregator = None`): every trade updates indicators; compiled
|
||||
/// strategies use `interval_secs` throttling. Existing behaviour.
|
||||
/// - **Candle mode** (`candle_aggregator = Some`): indicators update only on candle closes;
|
||||
/// `candle_ready` is `true` for exactly the one `generate_algo_orders` call following
|
||||
/// each candle close. Used by `RuleStrategy` and candle-based backtests.
|
||||
///
|
||||
/// For candle-based backtests `DataKind::Candle` events arrive pre-formed from the DB;
|
||||
/// the aggregator is not used (leave `candle_aggregator = None` for that path).
|
||||
#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
|
||||
pub struct SwymInstrumentData {
|
||||
pub inner: DefaultInstrumentMarketData,
|
||||
@@ -50,6 +157,12 @@ pub struct SwymInstrumentData {
|
||||
/// Stateful running EMAs, keyed by period. Updated incrementally on each price;
|
||||
/// never re-seeded or windowed. Correct over arbitrarily long price series.
|
||||
pub ema_states: HashMap<usize, RunningEma>,
|
||||
/// Present in live candle mode; aggregates raw trades into fixed-interval candles.
|
||||
pub candle_aggregator: Option<CandleAggregator>,
|
||||
/// Set to `true` for the single `generate_algo_orders` call immediately after a
|
||||
/// candle closes (either from the aggregator or a `DataKind::Candle` backtest event).
|
||||
/// Cleared at the start of the next `process()` call.
|
||||
pub candle_ready: bool,
|
||||
}
|
||||
|
||||
impl Default for SwymInstrumentData {
|
||||
@@ -60,6 +173,8 @@ impl Default for SwymInstrumentData {
|
||||
last_event_time: None,
|
||||
trade_prices: VecDeque::new(),
|
||||
ema_states: HashMap::new(),
|
||||
candle_aggregator: None,
|
||||
candle_ready: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -81,37 +196,50 @@ impl<InstrumentKey> Processor<&barter_data::event::MarketEvent<InstrumentKey, Da
|
||||
&mut self,
|
||||
event: &barter_data::event::MarketEvent<InstrumentKey, DataKind>,
|
||||
) -> Self::Audit {
|
||||
// Clear the candle flag at the start of each event — it is only valid
|
||||
// for the single generate_algo_orders call that immediately follows a close.
|
||||
self.candle_ready = false;
|
||||
self.last_event_time = Some(event.time_exchange);
|
||||
|
||||
match &event.kind {
|
||||
DataKind::Trade(trade) => {
|
||||
if let Some(price) = Decimal::from_f64_retain(trade.price) {
|
||||
self.push_price(price);
|
||||
if let Some(agg) = &mut self.candle_aggregator {
|
||||
// Candle-aggregation mode: only push_price on candle close.
|
||||
if let Some(completed) = agg.push_trade(trade.price, trade.amount, event.time_exchange) {
|
||||
if let Some(close) = Decimal::from_f64_retain(completed.close) {
|
||||
self.push_price(close);
|
||||
self.candle_ready = true;
|
||||
}
|
||||
}
|
||||
// Always update inner from the raw trade for up-to-date price/L1.
|
||||
self.inner.process(event);
|
||||
} else {
|
||||
// Tick mode: update indicators and inner state on every trade.
|
||||
if let Some(price) = Decimal::from_f64_retain(trade.price) {
|
||||
self.push_price(price);
|
||||
}
|
||||
self.inner.process(event);
|
||||
}
|
||||
// Let inner handle Trade and OrderBookL1 normally.
|
||||
self.inner.process(event);
|
||||
}
|
||||
DataKind::Candle(candle) => {
|
||||
// On candle close: use the close price as the indicator input and
|
||||
// update inner so that price() and fill simulation return the close.
|
||||
// Pre-formed candle event (candle-based backtest replay).
|
||||
// Update indicators with the close price and signal a candle close.
|
||||
if let Some(close) = Decimal::from_f64_retain(candle.close) {
|
||||
self.push_price(close);
|
||||
|
||||
// Update inner's last_traded_price directly.
|
||||
self.inner.last_traded_price =
|
||||
Some(Timed::new(close, event.time_exchange));
|
||||
|
||||
// Synthesize a symmetric L1 using close as both bid and ask.
|
||||
// This gives a mid-price == close and enables order fills.
|
||||
let level = Level { price: close, amount: Decimal::ONE };
|
||||
self.inner.l1 = OrderBookL1 {
|
||||
last_update_time: event.time_exchange,
|
||||
best_bid: Some(level),
|
||||
best_ask: Some(level),
|
||||
};
|
||||
|
||||
self.candle_ready = true;
|
||||
}
|
||||
// Do NOT delegate candle events to inner.process() — it would be a no-op
|
||||
// (inner's _ => {} arm) but this is clearer about intent.
|
||||
}
|
||||
_ => {
|
||||
self.inner.process(event);
|
||||
@@ -149,6 +277,18 @@ impl InFlightRequestRecorder<ExchangeIndex, InstrumentIndex> for SwymInstrumentD
|
||||
}
|
||||
|
||||
impl SwymInstrumentData {
|
||||
/// Create an instance configured for live candle-aggregation mode.
|
||||
///
|
||||
/// Trades will be accumulated into candles of `interval_secs` duration.
|
||||
/// `candle_ready` is set on each candle close so that candle-native strategies
|
||||
/// know when to evaluate their signals.
|
||||
pub fn with_candle_aggregator(interval_secs: u64) -> Self {
|
||||
Self {
|
||||
candle_aggregator: Some(CandleAggregator::new(interval_secs)),
|
||||
..Self::default()
|
||||
}
|
||||
}
|
||||
|
||||
fn push_price(&mut self, price: Decimal) {
|
||||
// Update window-based indicators buffer.
|
||||
self.trade_prices.push_back(price);
|
||||
|
||||
@@ -3,6 +3,8 @@ pub mod ema_crossover;
|
||||
pub mod indicators;
|
||||
pub mod instrument_data;
|
||||
pub mod rsi_reversal;
|
||||
pub mod rule_strategy;
|
||||
pub mod signal;
|
||||
pub mod simple_spread;
|
||||
pub mod swym_strategy;
|
||||
|
||||
|
||||
209
services/paper-executor/src/strategy/rule_strategy.rs
Normal file
209
services/paper-executor/src/strategy/rule_strategy.rs
Normal file
@@ -0,0 +1,209 @@
|
||||
//! Runtime-interpreted rule-based strategy.
|
||||
//!
|
||||
//! Evaluates a declarative JSON rule list on every candle close, placing market
|
||||
//! orders for all rules whose `when` condition is satisfied. The strategy never
|
||||
//! fires between candle closes (`candle_ready` acts as the gate).
|
||||
//!
|
||||
//! # Backtrader equivalent
|
||||
//!
|
||||
//! There is no direct equivalent; the closest is a custom `Strategy` class that
|
||||
//! checks a list of indicator conditions in `next()`.
|
||||
|
||||
use barter::{
|
||||
engine::Engine,
|
||||
engine::state::instrument::filter::InstrumentFilter,
|
||||
strategy::{
|
||||
algo::AlgoStrategy,
|
||||
close_positions::{ClosePositionsStrategy, close_open_positions_with_market_orders},
|
||||
on_disconnect::OnDisconnectStrategy,
|
||||
on_trading_disabled::OnTradingDisabled,
|
||||
},
|
||||
};
|
||||
use barter_execution::order::{
|
||||
id::{ClientOrderId, StrategyId},
|
||||
request::{OrderRequestCancel, OrderRequestOpen, RequestOpen},
|
||||
OrderEvent, OrderKey, OrderKind, TimeInForce,
|
||||
};
|
||||
use barter::engine::state::instrument::data::InstrumentDataState;
|
||||
use barter_instrument::{
|
||||
Side,
|
||||
asset::AssetIndex,
|
||||
exchange::{ExchangeId, ExchangeIndex},
|
||||
instrument::InstrumentIndex,
|
||||
};
|
||||
use swym_dal::models::strategy_config::{ActionSide, Rule};
|
||||
|
||||
use crate::strategy::{
|
||||
SwymState,
|
||||
signal::{EvalCtx, evaluate},
|
||||
};
|
||||
|
||||
const STRATEGY_NAME: &str = "rule_based";
|
||||
|
||||
/// Runtime-interpreted rule-based strategy.
|
||||
///
|
||||
/// Constructed from [`RuleBasedParams`] via [`SwymStrategy::from_config`]. All
|
||||
/// rules whose `when` condition evaluates to `true` at candle close simultaneously
|
||||
/// fire their `then` action (no throttling; candle close is the natural throttle).
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct RuleStrategy {
|
||||
pub id: StrategyId,
|
||||
/// Candle interval in seconds, used to initialise [`CandleAggregator`] in live mode.
|
||||
pub interval_secs: u64,
|
||||
pub rules: Vec<Rule>,
|
||||
}
|
||||
|
||||
impl RuleStrategy {
|
||||
pub fn new(rules: Vec<Rule>, interval_secs: u64) -> Self {
|
||||
Self {
|
||||
id: StrategyId::new(STRATEGY_NAME),
|
||||
interval_secs,
|
||||
rules,
|
||||
}
|
||||
}
|
||||
|
||||
/// All EMA periods referenced anywhere in the rule tree.
|
||||
///
|
||||
/// The executor uses this to pre-register [`RunningEma`] instances before the
|
||||
/// first candle arrives so that crossover detection has a valid `prev` value.
|
||||
pub fn ema_periods(&self) -> Vec<usize> {
|
||||
let mut periods = Vec::new();
|
||||
for rule in &self.rules {
|
||||
collect_ema_periods_from_rule(rule, &mut periods);
|
||||
}
|
||||
periods.sort_unstable();
|
||||
periods.dedup();
|
||||
periods
|
||||
}
|
||||
}
|
||||
|
||||
fn collect_ema_periods_from_rule(rule: &Rule, out: &mut Vec<usize>) {
|
||||
crate::strategy::signal::collect_ema_periods(&rule.when, out);
|
||||
}
|
||||
|
||||
impl AlgoStrategy for RuleStrategy {
|
||||
type State = SwymState;
|
||||
|
||||
fn generate_algo_orders(
|
||||
&self,
|
||||
state: &Self::State,
|
||||
) -> (
|
||||
impl IntoIterator<Item = OrderRequestCancel>,
|
||||
impl IntoIterator<Item = OrderRequestOpen>,
|
||||
) {
|
||||
let cancels: Vec<OrderRequestCancel> = Vec::new();
|
||||
let mut opens: Vec<OrderRequestOpen> = Vec::new();
|
||||
|
||||
for instrument_state in state.instruments.instruments(&InstrumentFilter::None) {
|
||||
// Only evaluate when a candle has just closed.
|
||||
if !instrument_state.data.candle_ready {
|
||||
continue;
|
||||
}
|
||||
|
||||
let Some(price) = instrument_state.data.price() else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let ctx = EvalCtx {
|
||||
data: &instrument_state.data,
|
||||
is_flat: instrument_state.position.current.is_none(),
|
||||
is_long: instrument_state
|
||||
.position
|
||||
.current
|
||||
.as_ref()
|
||||
.is_some_and(|p| p.side == Side::Buy),
|
||||
is_short: instrument_state
|
||||
.position
|
||||
.current
|
||||
.as_ref()
|
||||
.is_some_and(|p| p.side == Side::Sell),
|
||||
};
|
||||
|
||||
for rule in &self.rules {
|
||||
if !evaluate(&rule.when, &ctx) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let side = match rule.then.side {
|
||||
ActionSide::Buy => Side::Buy,
|
||||
ActionSide::Sell => Side::Sell,
|
||||
};
|
||||
|
||||
// For sell orders, close the full open position rather than a fixed quantity.
|
||||
let quantity = if side == Side::Sell {
|
||||
instrument_state
|
||||
.position
|
||||
.current
|
||||
.as_ref()
|
||||
.map(|p| p.quantity_abs)
|
||||
.unwrap_or(rule.then.quantity)
|
||||
} else {
|
||||
rule.then.quantity
|
||||
};
|
||||
|
||||
opens.push(OrderEvent {
|
||||
key: OrderKey {
|
||||
exchange: instrument_state.instrument.exchange,
|
||||
instrument: instrument_state.key,
|
||||
strategy: self.id.clone(),
|
||||
cid: ClientOrderId::random(),
|
||||
},
|
||||
state: RequestOpen {
|
||||
side,
|
||||
price,
|
||||
quantity,
|
||||
kind: OrderKind::Market,
|
||||
time_in_force: TimeInForce::GoodUntilCancelled { post_only: false },
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
(cancels, opens)
|
||||
}
|
||||
}
|
||||
|
||||
impl ClosePositionsStrategy for RuleStrategy {
|
||||
type State = SwymState;
|
||||
|
||||
fn close_positions_requests<'a>(
|
||||
&'a self,
|
||||
state: &'a Self::State,
|
||||
filter: &'a InstrumentFilter,
|
||||
) -> (
|
||||
impl IntoIterator<Item = OrderRequestCancel<ExchangeIndex, InstrumentIndex>> + 'a,
|
||||
impl IntoIterator<Item = OrderRequestOpen<ExchangeIndex, InstrumentIndex>> + 'a,
|
||||
)
|
||||
where
|
||||
ExchangeIndex: 'a,
|
||||
AssetIndex: 'a,
|
||||
InstrumentIndex: 'a,
|
||||
{
|
||||
close_open_positions_with_market_orders(&self.id, state, filter, |_| {
|
||||
ClientOrderId::random()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<Clock, State, ExecutionTxs, Risk> OnDisconnectStrategy<Clock, State, ExecutionTxs, Risk>
|
||||
for RuleStrategy
|
||||
{
|
||||
type OnDisconnect = ();
|
||||
|
||||
fn on_disconnect(
|
||||
_engine: &mut Engine<Clock, State, ExecutionTxs, Self, Risk>,
|
||||
_exchange: ExchangeId,
|
||||
) {
|
||||
}
|
||||
}
|
||||
|
||||
impl<Clock, State, ExecutionTxs, Risk> OnTradingDisabled<Clock, State, ExecutionTxs, Risk>
|
||||
for RuleStrategy
|
||||
{
|
||||
type OnTradingDisabled = ();
|
||||
|
||||
fn on_trading_disabled(
|
||||
_engine: &mut Engine<Clock, State, ExecutionTxs, Self, Risk>,
|
||||
) {
|
||||
}
|
||||
}
|
||||
217
services/paper-executor/src/strategy/signal.rs
Normal file
217
services/paper-executor/src/strategy/signal.rs
Normal file
@@ -0,0 +1,217 @@
|
||||
//! Condition evaluation for the rule-based strategy DSL.
|
||||
//!
|
||||
//! Converts a [`Condition`] tree (defined in `swym-dal`) into a `bool` by
|
||||
//! inspecting the current [`SwymInstrumentData`] and pre-extracted position flags.
|
||||
|
||||
use barter::engine::state::instrument::data::InstrumentDataState;
|
||||
use swym_dal::models::strategy_config::{
|
||||
BollingerBand, Comparison, Condition, CrossoverDirection, PositionState, TrendDirection,
|
||||
};
|
||||
|
||||
use crate::strategy::{indicators, instrument_data::SwymInstrumentData};
|
||||
|
||||
/// Evaluation context for a single instrument at candle-close time.
|
||||
///
|
||||
/// Position flags are pre-extracted by the caller from the barter `InstrumentState`
|
||||
/// so that `signal.rs` stays free of barter type dependencies.
|
||||
pub struct EvalCtx<'a> {
|
||||
pub data: &'a SwymInstrumentData,
|
||||
pub is_flat: bool,
|
||||
pub is_long: bool,
|
||||
pub is_short: bool,
|
||||
}
|
||||
|
||||
/// Recursively evaluate a [`Condition`] tree against the current instrument state.
|
||||
pub fn evaluate(cond: &Condition, ctx: &EvalCtx<'_>) -> bool {
|
||||
match cond {
|
||||
Condition::AllOf { conditions } => conditions.iter().all(|sub| evaluate(sub, ctx)),
|
||||
Condition::AnyOf { conditions } => conditions.iter().any(|sub| evaluate(sub, ctx)),
|
||||
Condition::Not { condition } => !evaluate(condition, ctx),
|
||||
Condition::Position { state } => match state {
|
||||
PositionState::Flat => ctx.is_flat,
|
||||
PositionState::Long => ctx.is_long,
|
||||
PositionState::Short => ctx.is_short,
|
||||
},
|
||||
Condition::EmaCrossover { fast_period, slow_period, direction } => {
|
||||
let Some(fast) = ctx.data.ema_states.get(fast_period) else { return false; };
|
||||
let Some(slow) = ctx.data.ema_states.get(slow_period) else { return false; };
|
||||
match direction {
|
||||
CrossoverDirection::Above => {
|
||||
indicators::ema_crossed_above(fast, slow).unwrap_or(false)
|
||||
}
|
||||
CrossoverDirection::Below => {
|
||||
indicators::ema_crossed_below(fast, slow).unwrap_or(false)
|
||||
}
|
||||
}
|
||||
}
|
||||
Condition::EmaTrend { period, direction } => {
|
||||
let Some(ema) = ctx.data.ema_states.get(period) else { return false; };
|
||||
let Some(current_ema) = ema.current else { return false; };
|
||||
let Some(price) = ctx.data.price() else { return false; };
|
||||
match direction {
|
||||
TrendDirection::Above => price > current_ema,
|
||||
TrendDirection::Below => price < current_ema,
|
||||
}
|
||||
}
|
||||
Condition::Rsi { period, threshold, comparison } => {
|
||||
let Some(value) = indicators::rsi(&ctx.data.trade_prices, *period) else {
|
||||
return false;
|
||||
};
|
||||
match comparison {
|
||||
Comparison::Above => value > *threshold,
|
||||
Comparison::Below => value < *threshold,
|
||||
}
|
||||
}
|
||||
Condition::Bollinger { period, num_std_dev, band } => {
|
||||
let Some((_sma, upper, lower)) =
|
||||
indicators::bollinger_bands(&ctx.data.trade_prices, *period, *num_std_dev)
|
||||
else {
|
||||
return false;
|
||||
};
|
||||
let Some(price) = ctx.data.price() else { return false; };
|
||||
match band {
|
||||
BollingerBand::AboveUpper => price > upper,
|
||||
BollingerBand::BelowLower => price < lower,
|
||||
}
|
||||
}
|
||||
Condition::PriceLevel { price: level, direction } => {
|
||||
let Some(price) = ctx.data.price() else { return false; };
|
||||
match direction {
|
||||
TrendDirection::Above => price > *level,
|
||||
TrendDirection::Below => price < *level,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Collect all EMA periods referenced anywhere in a condition tree.
|
||||
///
|
||||
/// Used to pre-register [`RunningEma`] instances before the first market event.
|
||||
pub fn collect_ema_periods(cond: &Condition, out: &mut Vec<usize>) {
|
||||
match cond {
|
||||
Condition::AllOf { conditions } => {
|
||||
conditions.iter().for_each(|sub| collect_ema_periods(sub, out));
|
||||
}
|
||||
Condition::AnyOf { conditions } => {
|
||||
conditions.iter().for_each(|sub| collect_ema_periods(sub, out));
|
||||
}
|
||||
Condition::Not { condition } => collect_ema_periods(condition, out),
|
||||
Condition::EmaCrossover { fast_period, slow_period, .. } => {
|
||||
out.push(*fast_period);
|
||||
out.push(*slow_period);
|
||||
}
|
||||
Condition::EmaTrend { period, .. } => out.push(*period),
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
/// Parse a candle interval string (e.g. `"1m"`, `"5m"`) into seconds.
|
||||
///
|
||||
/// Returns `None` for unrecognised strings.
|
||||
pub fn parse_interval_secs(interval: &str) -> Option<u64> {
|
||||
match interval {
|
||||
"1m" => Some(60),
|
||||
"5m" => Some(300),
|
||||
"15m" => Some(900),
|
||||
"1h" => Some(3_600),
|
||||
"4h" => Some(14_400),
|
||||
"1d" => Some(86_400),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use rust_decimal_macros::dec;
|
||||
use swym_dal::models::strategy_config::{Condition, CrossoverDirection, PositionState};
|
||||
|
||||
fn flat_ctx(data: &SwymInstrumentData) -> EvalCtx<'_> {
|
||||
EvalCtx { data, is_flat: true, is_long: false, is_short: false }
|
||||
}
|
||||
|
||||
fn long_ctx(data: &SwymInstrumentData) -> EvalCtx<'_> {
|
||||
EvalCtx { data, is_flat: false, is_long: true, is_short: false }
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn position_flat_matches() {
|
||||
let data = SwymInstrumentData::default();
|
||||
let cond = Condition::Position { state: PositionState::Flat };
|
||||
assert!(evaluate(&cond, &flat_ctx(&data)));
|
||||
assert!(!evaluate(&cond, &long_ctx(&data)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn all_of_short_circuits_on_false() {
|
||||
let data = SwymInstrumentData::default();
|
||||
// Both conditions false (no prices, so EMA unavailable — returns false)
|
||||
let cond = Condition::AllOf {
|
||||
conditions: vec![
|
||||
Condition::Position { state: PositionState::Long },
|
||||
Condition::Position { state: PositionState::Short },
|
||||
],
|
||||
};
|
||||
assert!(!evaluate(&cond, &flat_ctx(&data)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn any_of_returns_true_if_one_matches() {
|
||||
let data = SwymInstrumentData::default();
|
||||
let cond = Condition::AnyOf {
|
||||
conditions: vec![
|
||||
Condition::Position { state: PositionState::Long },
|
||||
Condition::Position { state: PositionState::Flat },
|
||||
],
|
||||
};
|
||||
assert!(evaluate(&cond, &flat_ctx(&data)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn not_negates() {
|
||||
let data = SwymInstrumentData::default();
|
||||
let cond = Condition::Not {
|
||||
condition: Box::new(Condition::Position { state: PositionState::Flat }),
|
||||
};
|
||||
assert!(!evaluate(&cond, &flat_ctx(&data)));
|
||||
assert!(evaluate(&cond, &long_ctx(&data)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_interval_secs_known_values() {
|
||||
assert_eq!(parse_interval_secs("1m"), Some(60));
|
||||
assert_eq!(parse_interval_secs("5m"), Some(300));
|
||||
assert_eq!(parse_interval_secs("1h"), Some(3_600));
|
||||
assert_eq!(parse_interval_secs("1d"), Some(86_400));
|
||||
assert_eq!(parse_interval_secs("2m"), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn collect_ema_periods_nested() {
|
||||
let cond = Condition::AllOf {
|
||||
conditions: vec![
|
||||
Condition::EmaCrossover {
|
||||
fast_period: 9,
|
||||
slow_period: 21,
|
||||
direction: CrossoverDirection::Above,
|
||||
},
|
||||
Condition::EmaTrend { period: 50, direction: TrendDirection::Above },
|
||||
],
|
||||
};
|
||||
let mut periods = Vec::new();
|
||||
collect_ema_periods(&cond, &mut periods);
|
||||
periods.sort_unstable();
|
||||
periods.dedup();
|
||||
assert_eq!(periods, vec![9, 21, 50]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn price_level_above_returns_false_when_no_price() {
|
||||
let data = SwymInstrumentData::default();
|
||||
let cond = Condition::PriceLevel {
|
||||
price: dec!(50000),
|
||||
direction: TrendDirection::Above,
|
||||
};
|
||||
assert!(!evaluate(&cond, &flat_ctx(&data)));
|
||||
}
|
||||
}
|
||||
@@ -3,7 +3,9 @@ use super::{
|
||||
bollinger_breakout::BollingerBreakout,
|
||||
ema_crossover::EmaCrossover,
|
||||
rsi_reversal::RsiReversal,
|
||||
rule_strategy::RuleStrategy,
|
||||
simple_spread::SimpleSpread,
|
||||
signal::parse_interval_secs,
|
||||
};
|
||||
use barter::{
|
||||
engine::Engine,
|
||||
@@ -32,6 +34,7 @@ pub enum SwymStrategy {
|
||||
EmaCrossover(EmaCrossover),
|
||||
RsiReversal(RsiReversal),
|
||||
BollingerBreakout(BollingerBreakout),
|
||||
RuleBased(RuleStrategy),
|
||||
}
|
||||
|
||||
impl SwymStrategy {
|
||||
@@ -50,17 +53,33 @@ impl SwymStrategy {
|
||||
StrategyConfig::BollingerBreakout(params) => {
|
||||
Self::BollingerBreakout(BollingerBreakout::from_params(params))
|
||||
}
|
||||
StrategyConfig::RuleBased(params) => {
|
||||
let interval_secs = parse_interval_secs(¶ms.candle_interval)
|
||||
.unwrap_or(60);
|
||||
Self::RuleBased(RuleStrategy::new(params.rules.clone(), interval_secs))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the EMA periods this strategy requires, so they can be pre-registered
|
||||
/// in [`SwymInstrumentData::ema_states`] before the first market event arrives.
|
||||
/// EMA periods this strategy requires, pre-registered before the first market event.
|
||||
pub fn ema_periods(&self) -> Vec<usize> {
|
||||
match self {
|
||||
Self::EmaCrossover(s) => vec![s.fast_period, s.slow_period],
|
||||
Self::RuleBased(s) => s.ema_periods(),
|
||||
_ => vec![],
|
||||
}
|
||||
}
|
||||
|
||||
/// Candle interval in seconds for live candle-aggregation mode.
|
||||
///
|
||||
/// Returns `Some` only for strategies that require candle-based evaluation.
|
||||
/// The executor uses this to initialise a [`CandleAggregator`] in live runs.
|
||||
pub fn candle_interval_secs(&self) -> Option<u64> {
|
||||
match self {
|
||||
Self::RuleBased(s) => Some(s.interval_secs),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl AlgoStrategy for SwymStrategy {
|
||||
@@ -109,6 +128,13 @@ impl AlgoStrategy for SwymStrategy {
|
||||
opens.into_iter().collect::<Vec<_>>(),
|
||||
)
|
||||
}
|
||||
Self::RuleBased(s) => {
|
||||
let (cancels, opens) = s.generate_algo_orders(state);
|
||||
(
|
||||
cancels.into_iter().collect::<Vec<_>>(),
|
||||
opens.into_iter().collect::<Vec<_>>(),
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -165,6 +191,13 @@ impl ClosePositionsStrategy for SwymStrategy {
|
||||
opens.into_iter().collect::<Vec<_>>(),
|
||||
)
|
||||
}
|
||||
Self::RuleBased(s) => {
|
||||
let (cancels, opens) = s.close_positions_requests(state, filter);
|
||||
(
|
||||
cancels.into_iter().collect::<Vec<_>>(),
|
||||
opens.into_iter().collect::<Vec<_>>(),
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ use crate::runner::{execute_backtest, execute_run};
|
||||
use sqlx::PgPool;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use swym_dal::models::strategy_config::StrategyConfig;
|
||||
use swym_dal::repo::paper_run;
|
||||
use tokio::sync::Semaphore;
|
||||
use tracing::{debug, error, info};
|
||||
@@ -60,7 +61,16 @@ pub async fn run_poll_loop(pool: PgPool, config: &ExecutorConfig) {
|
||||
let finishes_at = run.finishes_at;
|
||||
let risk_free_return = run.risk_free_return;
|
||||
let mode = run.mode.clone();
|
||||
let candle_interval = run.candle_interval.clone();
|
||||
|
||||
// For rule-based strategies the candle interval is embedded in the
|
||||
// strategy config itself; fall back to the paper run row for compiled
|
||||
// strategies that were created with an explicit candle_interval.
|
||||
let candle_interval = match &run_config.strategy {
|
||||
StrategyConfig::RuleBased(params) => {
|
||||
Some(params.candle_interval.clone())
|
||||
}
|
||||
_ => run.candle_interval.clone(),
|
||||
};
|
||||
|
||||
info!(%run_id, %mode, ?starts_at, ?finishes_at, ?candle_interval, "executing paper run");
|
||||
|
||||
|
||||
Reference in New Issue
Block a user