diff --git a/Makefile b/Makefile index 06fda42..e492af3 100644 --- a/Makefile +++ b/Makefile @@ -45,6 +45,7 @@ build: cache $(GO) build -trimpath -o bin/bot ./cmd/bot $(GO) build -trimpath -o bin/migrate ./cmd/migrate $(GO) build -trimpath -o bin/backtest ./cmd/backtest + $(GO) build -trimpath -o bin/mode-days ./cmd/mode-days backtest: cache $(GO) run ./cmd/backtest -candles "$${BT_CANDLES:?set BT_CANDLES}" diff --git a/README.md b/README.md index d0d4f91..5ba9748 100644 --- a/README.md +++ b/README.md @@ -127,7 +127,7 @@ APP_MODE=backtest go run ./cmd/bot | `RISK_RISK_BUDGET_PER_INSTRUMENT_PCT` | доля equity | `0.005` | рекомендуется `> 0` | Риск-бюджет на инструмент, используется вместе с оценкой неблагоприятного overnight-движения. Больше - крупнее позиции при прочих равных. | | `RISK_MIN_ORDER_NOTIONAL_RUB` | сумма в рублях | `1000` | `> 0` включает минимум; `<= 0` фактически отключает | Минимальный notional заявки. Если рассчитанная позиция меньше, сигнал отклоняется по sizing. | -Если средний `realized_edge_bps - expected_net_edge_bps` по последним 20 закрытым сделкам ниже `-10 bps`, scheduler пишет `risk_event(WARN, size_reduction_rule_triggered)` и до восстановления качества режет sizing до `0.5x`. +Если средний `realized_edge_bps - expected_net_edge_bps` по последним 20 закрытым сделкам ниже `-10 bps`, scheduler пишет `risk_event(WARN, size_reduction_rule_triggered)` и до восстановления качества режет sizing до `0.5x`. Если два таких окна по 20 сделок идут подряд в `live_trade`, бот автоматически переключает persisted/runtime mode в `live_readonly` и блокирует новые брокерские заявки до ручного вмешательства. ### LIQ @@ -188,6 +188,7 @@ make race make build go run ./cmd/migrate -direction=up go run ./cmd/migrate up +go run ./cmd/mode-days -check=true go run ./cmd/backtest -candles candles.csv -out ./backtest_out go run ./cmd/backtest -candles candles.csv -minute-candles minute.csv -use-minute-model -out ./backtest_out go run ./cmd/bot -mode=paper @@ -199,11 +200,13 @@ go run ./cmd/bot -healthcheck Backtest CSV columns: ```csv -instrument_uid,trade_date,open,high,low,close,volume_lots -TRUR,2024-01-09,100,101,99,100.5,10000 +instrument_uid,trade_date,open,high,low,close,volume_lots,lot,min_price_increment +TRUR,2024-01-09,100,101,99,100.5,10000,10,0.01 ``` -Для minute-модели используется тот же формат, но `trade_date` может быть timestamp (`2024-01-09T18:25:00Z` или `2024-01-09 18:25:00`). +Для minute-модели используется тот же формат, но `trade_date` может быть timestamp (`2024-01-09T18:25:00Z` или `2024-01-09 18:25:00`). CLI backtest требует `lot` и `min_price_increment` для каждого `instrument_uid`; metadata можно дать в daily CSV или в minute CSV. + +`cmd/mode-days` считает distinct-дни по `system_state_history` и проверяет пороги `live_readonly >= 20`, `paper >= 20`, `sandbox >= 10`. История пишется после миграции `0010`; дни до неё автоматически восстановить нельзя, потому что старая схема хранила только текущий `system_state`. `ClientOrderID` детерминирован по `(date, instrument_uid, side, attempt)`, укладывается в лимит T-Invest `order_id <= 36` и содержит SHA-256 suffix. При ручных массовых перезапусках с теми же параметрами id остаётся тем же, что намеренно подавляет дубли. diff --git a/cmd/backtest/main.go b/cmd/backtest/main.go index c77ab75..6281983 100644 --- a/cmd/backtest/main.go +++ b/cmd/backtest/main.go @@ -4,6 +4,7 @@ import ( "flag" "fmt" "os" + "strings" "github.com/shopspring/decimal" @@ -72,6 +73,9 @@ func run() error { if *useMinuteModel && len(minuteCandles) == 0 { return fmt.Errorf("-minute-candles is required when -use-minute-model=true") } + if err := validateMetadata(candles, metadata); err != nil { + return err + } entry, err := decimal.NewFromString(*entrySlip) if err != nil { return fmt.Errorf("entry slippage: %w", err) @@ -149,6 +153,20 @@ func run() error { return nil } +func validateMetadata(candles map[string][]domain.Candle, metadata map[string]backtest.InstrumentMetadata) error { + var missing []string + for instrumentUID := range candles { + meta := metadata[instrumentUID] + if meta.Lot <= 0 || !meta.MinPriceIncrement.IsPositive() { + missing = append(missing, instrumentUID) + } + } + if len(missing) > 0 { + return fmt.Errorf("missing lot/min_price_increment metadata for instruments: %s", strings.Join(missing, ",")) + } + return nil +} + func mergeMetadata(dst, src map[string]backtest.InstrumentMetadata) { for uid, meta := range src { current := dst[uid] diff --git a/cmd/backtest/main_test.go b/cmd/backtest/main_test.go new file mode 100644 index 0000000..962b358 --- /dev/null +++ b/cmd/backtest/main_test.go @@ -0,0 +1,36 @@ +package main + +import ( + "strings" + "testing" + "time" + + "github.com/shopspring/decimal" + + "overnight-trading-bot/internal/backtest" + "overnight-trading-bot/internal/domain" +) + +func TestValidateMetadataRejectsMissingLotOrTick(t *testing.T) { + candles := map[string][]domain.Candle{ + "uid": {{InstrumentUID: "uid", TradeDate: time.Date(2026, 6, 8, 0, 0, 0, 0, time.UTC)}}, + } + err := validateMetadata(candles, map[string]backtest.InstrumentMetadata{ + "uid": {Lot: 10}, + }) + if err == nil || !strings.Contains(err.Error(), "missing lot/min_price_increment metadata") { + t.Fatalf("err=%v, want missing metadata error", err) + } +} + +func TestValidateMetadataAcceptsCompleteMetadata(t *testing.T) { + candles := map[string][]domain.Candle{ + "uid": {{InstrumentUID: "uid", TradeDate: time.Date(2026, 6, 8, 0, 0, 0, 0, time.UTC)}}, + } + err := validateMetadata(candles, map[string]backtest.InstrumentMetadata{ + "uid": {Lot: 10, MinPriceIncrement: decimal.RequireFromString("0.01")}, + }) + if err != nil { + t.Fatal(err) + } +} diff --git a/cmd/mode-days/main.go b/cmd/mode-days/main.go new file mode 100644 index 0000000..35824af --- /dev/null +++ b/cmd/mode-days/main.go @@ -0,0 +1,138 @@ +package main + +import ( + "context" + "flag" + "fmt" + "os" + "sort" + "strings" + "time" + + _ "github.com/go-sql-driver/mysql" + "github.com/jmoiron/sqlx" + + "overnight-trading-bot/internal/domain" +) + +const moscowOffset = 3 * time.Hour + +type modeDayRow struct { + Mode string `db:"mode"` + Days int `db:"days"` +} + +func main() { + if err := run(); err != nil { + fmt.Fprintf(os.Stderr, "%v\n", err) + os.Exit(1) + } +} + +func run() error { + dsn := flag.String("dsn", os.Getenv("DB_DSN"), "MySQL/MariaDB DSN") + fromRaw := flag.String("from", "", "optional start date YYYY-MM-DD") + toRaw := flag.String("to", "", "optional end date YYYY-MM-DD, inclusive") + check := flag.Bool("check", true, "fail when live readiness thresholds are not met") + minReadonly := flag.Int("min-readonly-days", 20, "minimum live_readonly days") + minPaper := flag.Int("min-paper-days", 20, "minimum paper days") + minSandbox := flag.Int("min-sandbox-days", 10, "minimum sandbox days") + flag.Parse() + if *dsn == "" { + return fmt.Errorf("DB_DSN is required") + } + from, err := parseOptionalDate(*fromRaw) + if err != nil { + return fmt.Errorf("from: %w", err) + } + to, err := parseOptionalDate(*toRaw) + if err != nil { + return fmt.Errorf("to: %w", err) + } + db, err := sqlx.Open("mysql", *dsn) + if err != nil { + return fmt.Errorf("open db: %w", err) + } + defer func() { + _ = db.Close() + }() + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + if err := db.PingContext(ctx); err != nil { + return fmt.Errorf("ping db: %w", err) + } + counts, err := loadModeDayCounts(ctx, db, from, to) + if err != nil { + return err + } + printCounts(counts) + if !*check { + return nil + } + thresholds := map[domain.Mode]int{ + domain.ModeLiveReadonly: *minReadonly, + domain.ModePaper: *minPaper, + domain.ModeSandbox: *minSandbox, + } + return checkThresholds(counts, thresholds) +} + +func loadModeDayCounts(ctx context.Context, db *sqlx.DB, from, to time.Time) (map[domain.Mode]int, error) { + query := `SELECT mode, COUNT(DISTINCT DATE(DATE_ADD(ts, INTERVAL 3 HOUR))) AS days FROM system_state_history WHERE DAYOFWEEK(DATE_ADD(ts, INTERVAL 3 HOUR)) BETWEEN 2 AND 6` + var args []any + if !from.IsZero() { + query += ` AND ts >= ?` + args = append(args, from.Add(-moscowOffset)) + } + if !to.IsZero() { + query += ` AND ts < ?` + args = append(args, to.AddDate(0, 0, 1).Add(-moscowOffset)) + } + query += ` GROUP BY mode` + var rows []modeDayRow + if err := db.SelectContext(ctx, &rows, query, args...); err != nil { + return nil, fmt.Errorf("query mode days: %w", err) + } + counts := make(map[domain.Mode]int, len(rows)) + for _, row := range rows { + mode, err := domain.ParseMode(row.Mode) + if err != nil { + return nil, err + } + counts[mode] = row.Days + } + return counts, nil +} + +func printCounts(counts map[domain.Mode]int) { + modes := make([]string, 0, len(counts)) + for mode := range counts { + modes = append(modes, string(mode)) + } + sort.Strings(modes) + for _, rawMode := range modes { + mode := domain.Mode(rawMode) + fmt.Printf("%s=%d\n", mode, counts[mode]) + } +} + +func checkThresholds(counts map[domain.Mode]int, thresholds map[domain.Mode]int) error { + var failed []string + for mode, threshold := range thresholds { + if counts[mode] < threshold { + failed = append(failed, fmt.Sprintf("%s=%d/%d", mode, counts[mode], threshold)) + } + } + sort.Strings(failed) + if len(failed) > 0 { + return fmt.Errorf("mode day thresholds not met: %s", strings.Join(failed, ", ")) + } + return nil +} + +func parseOptionalDate(raw string) (time.Time, error) { + if raw == "" { + return time.Time{}, nil + } + return time.ParseInLocation("2006-01-02", raw, time.UTC) +} diff --git a/go.mod b/go.mod index cb6dedf..8c4285f 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/shopspring/decimal v1.4.0 github.com/testcontainers/testcontainers-go v0.42.0 github.com/testcontainers/testcontainers-go/modules/mariadb v0.42.0 + google.golang.org/grpc v1.81.1 google.golang.org/protobuf v1.36.11 ) @@ -69,10 +70,9 @@ require ( golang.org/x/crypto v0.51.0 // indirect golang.org/x/net v0.55.0 // indirect golang.org/x/oauth2 v0.36.0 // indirect - golang.org/x/sys v0.45.0 // indirect + golang.org/x/sys v0.46.0 // indirect golang.org/x/text v0.37.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20260526163538-3dc84a4a5aaa // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20260526163538-3dc84a4a5aaa // indirect - google.golang.org/grpc v1.81.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 5a59a63..ec75c2c 100644 --- a/go.sum +++ b/go.sum @@ -163,8 +163,8 @@ golang.org/x/oauth2 v0.36.0/go.mod h1:YDBUJMTkDnJS+A4BP4eZBjCqtokkg1hODuPjwiGPO7 golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.45.0 h1:dO4czNzziLiiXplLQgBCEpCvXQ3dnkn0SdaZSYdQ+FY= -golang.org/x/sys v0.45.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= +golang.org/x/sys v0.46.0 h1:noSf2Fq6F8DBgS+LysIkx7rIExoNHJsxOAtPp4rthXw= +golang.org/x/sys v0.46.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= golang.org/x/term v0.43.0 h1:S4RLU2sB31O/NCl+zFN9Aru9A/Cq2aqKpTZJ6B+DwT4= golang.org/x/term v0.43.0/go.mod h1:lrhlHNdQJHO+1qVYiHfFKVuVioJIheAc3fBSMFYEIsk= golang.org/x/text v0.37.0 h1:Cqjiwd9eSg8e0QAkyCaQTNHFIIzWtidPahFWR83rTrc= diff --git a/internal/app/app.go b/internal/app/app.go index a400335..3ec98b3 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -150,6 +150,16 @@ func Run(ctx context.Context, opts Options) error { _, _ = fmt.Fprintf(opts.Stdout, "system unhalted: %s\n", opts.Reason) return nil } + if cfg.App.Mode == domain.ModeLiveTrade { + persistedMode, err := repo.GetSystemMode(ctx) + if err != nil { + return fmt.Errorf("read persisted system mode: %w", err) + } + if persistedMode == domain.ModeLiveReadonly { + cfg.App.Mode = domain.ModeLiveReadonly + log.Warn("runtime mode downgraded from live_trade to persisted live_readonly") + } + } gateway, closer, err := buildGateway(ctx, cfg, log) if err != nil { diff --git a/internal/domain/types.go b/internal/domain/types.go index d83a2d0..3dfac69 100644 --- a/internal/domain/types.go +++ b/internal/domain/types.go @@ -184,6 +184,7 @@ type FeatureSet struct { TickBps decimal.Decimal ADV20 decimal.Decimal ExpectedCostBps decimal.Decimal + CostBreakdownJSON string NetEdgeBps decimal.Decimal EntryIntervalVolume decimal.Decimal ExitIntervalVolume decimal.Decimal diff --git a/internal/execution/engine.go b/internal/execution/engine.go index 09998b4..aeb100a 100644 --- a/internal/execution/engine.go +++ b/internal/execution/engine.go @@ -69,6 +69,10 @@ func NewEngine(mode domain.Mode, accountID string, gateway Gateway, store reposi } } +func (e *Engine) SetMode(mode domain.Mode) { + e.mode = mode +} + func (e *Engine) SetMaxQuoteAge(maxQuoteAge time.Duration) { e.maxQuoteAge = maxQuoteAge } diff --git a/internal/features/pipeline.go b/internal/features/pipeline.go index a6442ca..f72aced 100644 --- a/internal/features/pipeline.go +++ b/internal/features/pipeline.go @@ -2,6 +2,7 @@ package features import ( "context" + "encoding/json" "fmt" "sort" "time" @@ -94,6 +95,9 @@ func Compute(instrument domain.Instrument, candles []domain.Candle, tradeDate ti var lastROn decimal.Decimal var lastRDay decimal.Decimal for i := 1; i < len(candles); i++ { + if !consecutiveDailyCandles(candles[i-1].TradeDate, candles[i].TradeDate) { + continue + } rOn, err := OvernightReturn(candles[i].Open, candles[i-1].Close) if err != nil { return domain.FeatureSet{}, err @@ -107,6 +111,9 @@ func Compute(instrument domain.Instrument, candles []domain.Candle, tradeDate ti lastROn = rOn lastRDay = rDay } + if len(overnight) == 0 { + return domain.FeatureSet{}, fmt.Errorf("need at least 1 consecutive daily candle pair") + } short := Rolling(overnight, cfg.RollingShort, cfg.EWMALambda) long := Rolling(overnight, cfg.RollingLong, cfg.EWMALambda) q05Abs := rollingQ05Abs(overnight, cfg.RollingShort) @@ -118,6 +125,7 @@ func Compute(instrument domain.Instrument, candles []domain.Candle, tradeDate ti Add(cfg.ExitSlippageBps). Add(commission). Add(cfg.RiskBufferBps) + costBreakdownJSON := expectedCostBreakdownJSON(spread, cfg, commission, expectedCost) return domain.FeatureSet{ InstrumentUID: instrument.InstrumentUID, TradeDate: tradeDate, @@ -135,6 +143,7 @@ func Compute(instrument domain.Instrument, candles []domain.Candle, tradeDate ti TickBps: spread.TickBps, ADV20: adv, ExpectedCostBps: expectedCost, + CostBreakdownJSON: costBreakdownJSON, NetEdgeBps: rawEdgeBps.Sub(expectedCost), EntryIntervalVolume: entryVolume, ExitIntervalVolume: exitVolume, @@ -142,6 +151,28 @@ func Compute(instrument domain.Instrument, candles []domain.Candle, tradeDate ti }, nil } +func expectedCostBreakdownJSON(spread SpreadResult, cfg PipelineConfig, commission, expectedCost decimal.Decimal) string { + spreadEntry := spread.HalfSpreadBps + if spreadEntry.IsZero() && spread.SpreadBps.IsPositive() { + spreadEntry = spread.SpreadBps.Div(decimal.NewFromInt(2)) + } + spreadExit := spread.SpreadBps.Sub(spreadEntry) + payload := map[string]string{ + "expected_spread_entry_bps": spreadEntry.String(), + "expected_spread_exit_bps": spreadExit.String(), + "expected_slippage_entry_bps": cfg.EntrySlippageBps.String(), + "expected_slippage_exit_bps": cfg.ExitSlippageBps.String(), + "commission_roundtrip_bps": commission.String(), + "risk_buffer_bps": cfg.RiskBufferBps.String(), + "expected_cost_bps": expectedCost.String(), + } + raw, err := json.Marshal(payload) + if err != nil { + return "{}" + } + return string(raw) +} + func rollingQ05Abs(values []float64, window int) decimal.Decimal { if window <= 0 || len(values) < window { return decimal.Zero @@ -170,9 +201,27 @@ func historicalDailyCandles(candles []domain.Candle, tradeDate time.Time) []doma out = append(out, candle) } } + sort.Slice(out, func(i, j int) bool { + return out[i].TradeDate.Before(out[j].TradeDate) + }) return out } +func consecutiveDailyCandles(previous, current time.Time) bool { + prevDay := dateOnly(previous) + currentDay := dateOnly(current) + if !currentDay.After(prevDay) { + return false + } + weekdays := 0 + for day := prevDay.AddDate(0, 0, 1); !day.After(currentDay); day = day.AddDate(0, 0, 1) { + if day.Weekday() != time.Saturday && day.Weekday() != time.Sunday { + weekdays++ + } + } + return weekdays == 1 +} + func dateOnly(ts time.Time) time.Time { year, month, day := ts.UTC().Date() return time.Date(year, month, day, 0, 0, 0, 0, time.UTC) diff --git a/internal/features/pipeline_test.go b/internal/features/pipeline_test.go index 178fadd..d7d1172 100644 --- a/internal/features/pipeline_test.go +++ b/internal/features/pipeline_test.go @@ -2,6 +2,7 @@ package features import ( "context" + "encoding/json" "testing" "time" @@ -44,6 +45,24 @@ func TestComputeExpectedCostIncludesCommissionAndSlippage(t *testing.T) { if !got.ExpectedCostBps.Equal(decimal.NewFromInt(22)) { t.Fatalf("expected cost=%s, want 22", got.ExpectedCostBps) } + var breakdown map[string]string + if err := json.Unmarshal([]byte(got.CostBreakdownJSON), &breakdown); err != nil { + t.Fatalf("cost breakdown is not valid JSON: %v", err) + } + wantBreakdown := map[string]string{ + "expected_spread_entry_bps": "5", + "expected_spread_exit_bps": "5", + "expected_slippage_entry_bps": "2", + "expected_slippage_exit_bps": "3", + "commission_roundtrip_bps": "2", + "risk_buffer_bps": "5", + "expected_cost_bps": "22", + } + for key, want := range wantBreakdown { + if breakdown[key] != want { + t.Fatalf("breakdown[%s]=%q, want %q in %s", key, breakdown[key], want, got.CostBreakdownJSON) + } + } if !got.EntryIntervalVolume.Equal(decimal.NewFromInt(10000)) || !got.ExitIntervalVolume.Equal(decimal.NewFromInt(9000)) { t.Fatalf("interval volumes were not preserved: %+v", got) } @@ -72,7 +91,7 @@ func TestComputeExpectedCostFallsBackToConfigCommission(t *testing.T) { } func TestComputeStoresHistoricalQ05Abs(t *testing.T) { - start := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC) + start := time.Date(2026, 1, 5, 0, 0, 0, 0, time.UTC) returns := []string{"-0.10", "0.01", "0.02", "0.03", "0.04"} candles := []domain.Candle{{ InstrumentUID: "uid", @@ -89,13 +108,13 @@ func TestComputeStoresHistoricalQ05Abs(t *testing.T) { open := decimal.NewFromInt(100).Mul(decimal.NewFromInt(1).Add(r)) candles = append(candles, domain.Candle{ InstrumentUID: "uid", - TradeDate: start.AddDate(0, 0, i+1), + TradeDate: addBusinessDays(start, i+1), Open: open, Close: decimal.NewFromInt(100), VolumeLots: decimal.NewFromInt(1), }) } - got, err := Compute(domain.Instrument{InstrumentUID: "uid", Lot: 1}, candles, start.AddDate(0, 0, 6), SpreadResult{}, PipelineConfig{ + got, err := Compute(domain.Instrument{InstrumentUID: "uid", Lot: 1}, candles, addBusinessDays(start, 6), SpreadResult{}, PipelineConfig{ RollingShort: 5, RollingLong: 5, EWMALambda: 0.08, @@ -113,6 +132,48 @@ func TestComputeStoresHistoricalQ05Abs(t *testing.T) { } } +func TestComputeSkipsOvernightReturnAcrossMissingWeekday(t *testing.T) { + start := time.Date(2026, 1, 5, 0, 0, 0, 0, time.UTC) // Monday. + candles := []domain.Candle{ + {InstrumentUID: "uid", TradeDate: start, Open: decimal.NewFromInt(100), Close: decimal.NewFromInt(100), VolumeLots: decimal.NewFromInt(1)}, + {InstrumentUID: "uid", TradeDate: start.AddDate(0, 0, 1), Open: decimal.NewFromInt(101), Close: decimal.NewFromInt(100), VolumeLots: decimal.NewFromInt(1)}, + {InstrumentUID: "uid", TradeDate: start.AddDate(0, 0, 3), Open: decimal.NewFromInt(50), Close: decimal.NewFromInt(100), VolumeLots: decimal.NewFromInt(1)}, + } + got, err := Compute(domain.Instrument{InstrumentUID: "uid", Lot: 1}, candles, start.AddDate(0, 0, 4), SpreadResult{}, PipelineConfig{ + RollingShort: 1, + RollingLong: 1, + EWMALambda: 0.08, + }, decimal.Zero, decimal.Zero) + if err != nil { + t.Fatal(err) + } + want := decimal.RequireFromString("0.01") + if !got.ROn.Equal(want) { + t.Fatalf("ROn=%s, want %s from last consecutive pair", got.ROn, want) + } +} + +func TestComputeAllowsWeekendGap(t *testing.T) { + friday := time.Date(2026, 1, 9, 0, 0, 0, 0, time.UTC) + monday := friday.AddDate(0, 0, 3) + candles := []domain.Candle{ + {InstrumentUID: "uid", TradeDate: friday, Open: decimal.NewFromInt(100), Close: decimal.NewFromInt(100), VolumeLots: decimal.NewFromInt(1)}, + {InstrumentUID: "uid", TradeDate: monday, Open: decimal.NewFromInt(101), Close: decimal.NewFromInt(100), VolumeLots: decimal.NewFromInt(1)}, + } + got, err := Compute(domain.Instrument{InstrumentUID: "uid", Lot: 1}, candles, monday.AddDate(0, 0, 1), SpreadResult{}, PipelineConfig{ + RollingShort: 1, + RollingLong: 1, + EWMALambda: 0.08, + }, decimal.Zero, decimal.Zero) + if err != nil { + t.Fatal(err) + } + want := decimal.RequireFromString("0.01") + if !got.ROn.Equal(want) { + t.Fatalf("ROn=%s, want %s across weekend", got.ROn, want) + } +} + func flatCandles(start time.Time, count int) []domain.Candle { candles := make([]domain.Candle, 0, count) for i := 0; i < count; i++ { @@ -128,6 +189,18 @@ func flatCandles(start time.Time, count int) []domain.Candle { return candles } +func addBusinessDays(start time.Time, days int) time.Time { + out := start + for added := 0; added < days; { + out = out.AddDate(0, 0, 1) + if out.Weekday() == time.Saturday || out.Weekday() == time.Sunday { + continue + } + added++ + } + return out +} + func TestIntervalVolume(t *testing.T) { got := IntervalVolume([]domain.Candle{ {Close: decimal.NewFromInt(100), VolumeLots: decimal.NewFromInt(10)}, diff --git a/internal/repository/migrations/0009_feature_cost_breakdown.down.sql b/internal/repository/migrations/0009_feature_cost_breakdown.down.sql new file mode 100644 index 0000000..be49b4c --- /dev/null +++ b/internal/repository/migrations/0009_feature_cost_breakdown.down.sql @@ -0,0 +1,4 @@ +ALTER TABLE features + DROP COLUMN cost_breakdown_json; + +UPDATE schema_meta SET meta_value='0008' WHERE meta_key='schema_version'; diff --git a/internal/repository/migrations/0009_feature_cost_breakdown.up.sql b/internal/repository/migrations/0009_feature_cost_breakdown.up.sql new file mode 100644 index 0000000..9d5048e --- /dev/null +++ b/internal/repository/migrations/0009_feature_cost_breakdown.up.sql @@ -0,0 +1,4 @@ +ALTER TABLE features + ADD COLUMN cost_breakdown_json JSON AFTER expected_cost_bps; + +UPDATE schema_meta SET meta_value='0009' WHERE meta_key='schema_version'; diff --git a/internal/repository/migrations/0010_system_state_history.down.sql b/internal/repository/migrations/0010_system_state_history.down.sql new file mode 100644 index 0000000..d6b67ab --- /dev/null +++ b/internal/repository/migrations/0010_system_state_history.down.sql @@ -0,0 +1,3 @@ +DROP TABLE IF EXISTS system_state_history; + +UPDATE schema_meta SET meta_value='0009' WHERE meta_key='schema_version'; diff --git a/internal/repository/migrations/0010_system_state_history.up.sql b/internal/repository/migrations/0010_system_state_history.up.sql new file mode 100644 index 0000000..4aac2a4 --- /dev/null +++ b/internal/repository/migrations/0010_system_state_history.up.sql @@ -0,0 +1,19 @@ +CREATE TABLE IF NOT EXISTS system_state_history ( + id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, + ts DATETIME(3) NOT NULL, + state ENUM('INIT','SYNC_INSTRUMENTS','SYNC_MARKET_DATA','GENERATE_SIGNALS','WAIT_ENTRY_WINDOW','PLACE_ENTRY_ORDERS','MONITOR_ENTRY_ORDERS','HOLD_OVERNIGHT','WAIT_EXIT_WINDOW','PLACE_EXIT_ORDERS','MONITOR_EXIT_ORDERS','RECONCILE','REPORT','SLEEP','HALTED') NOT NULL, + mode ENUM('backtest','paper','sandbox','live_readonly','live_trade') NOT NULL, + halted TINYINT(1) NOT NULL DEFAULT 0, + halt_reason TEXT, + context_json JSON, + KEY ix_system_state_history_ts (ts), + KEY ix_system_state_history_mode_ts (mode, ts) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; + +INSERT INTO system_state_history (ts, state, mode, halted, halt_reason, context_json) +SELECT last_heartbeat, state, mode, halted, halt_reason, context_json +FROM system_state +WHERE id=1 + AND NOT EXISTS (SELECT 1 FROM system_state_history); + +UPDATE schema_meta SET meta_value='0010' WHERE meta_key='schema_version'; diff --git a/internal/repository/mysql/repository.go b/internal/repository/mysql/repository.go index d91ffb1..7955f3b 100644 --- a/internal/repository/mysql/repository.go +++ b/internal/repository/mysql/repository.go @@ -231,13 +231,13 @@ func (r *Repository) mergeFeatures(ctx context.Context, oldInstrumentUID, newIns INSERT INTO features ( instrument_uid, trade_date, r_on, r_day, mu_on_60, mu_on_252, sigma_on_60, q05_on_60_abs, tstat_on_60, win_on_60, ewma_on, spread_bps, half_spread_bps, tick_bps, - adv_20, expected_cost_bps, net_edge_bps, entry_interval_volume, + adv_20, expected_cost_bps, cost_breakdown_json, net_edge_bps, entry_interval_volume, exit_interval_volume, calculated_at ) SELECT ?, trade_date, r_on, r_day, mu_on_60, mu_on_252, sigma_on_60, q05_on_60_abs, tstat_on_60, win_on_60, ewma_on, spread_bps, half_spread_bps, tick_bps, - adv_20, expected_cost_bps, net_edge_bps, entry_interval_volume, + adv_20, expected_cost_bps, cost_breakdown_json, net_edge_bps, entry_interval_volume, exit_interval_volume, calculated_at FROM features WHERE instrument_uid=? ON DUPLICATE KEY UPDATE @@ -247,6 +247,7 @@ ON DUPLICATE KEY UPDATE ewma_on=VALUES(ewma_on), spread_bps=VALUES(spread_bps), half_spread_bps=VALUES(half_spread_bps), tick_bps=VALUES(tick_bps), adv_20=VALUES(adv_20), expected_cost_bps=VALUES(expected_cost_bps), + cost_breakdown_json=VALUES(cost_breakdown_json), net_edge_bps=VALUES(net_edge_bps), entry_interval_volume=VALUES(entry_interval_volume), exit_interval_volume=VALUES(exit_interval_volume), calculated_at=VALUES(calculated_at)`, newInstrumentUID, oldInstrumentUID) if err != nil { @@ -392,12 +393,12 @@ func (r *Repository) UpsertFeature(ctx context.Context, feature domain.FeatureSe INSERT INTO features ( instrument_uid, trade_date, r_on, r_day, mu_on_60, mu_on_252, sigma_on_60, q05_on_60_abs, tstat_on_60, win_on_60, ewma_on, spread_bps, half_spread_bps, tick_bps, - adv_20, expected_cost_bps, net_edge_bps, entry_interval_volume, + adv_20, expected_cost_bps, cost_breakdown_json, net_edge_bps, entry_interval_volume, exit_interval_volume, calculated_at ) VALUES ( :instrument_uid, :trade_date, :r_on, :r_day, :mu_on_60, :mu_on_252, :sigma_on_60, :q05_on_60_abs, :tstat_on_60, :win_on_60, :ewma_on, :spread_bps, :half_spread_bps, :tick_bps, - :adv_20, :expected_cost_bps, :net_edge_bps, :entry_interval_volume, + :adv_20, :expected_cost_bps, :cost_breakdown_json, :net_edge_bps, :entry_interval_volume, :exit_interval_volume, :calculated_at ) ON DUPLICATE KEY UPDATE r_on=VALUES(r_on), r_day=VALUES(r_day), mu_on_60=VALUES(mu_on_60), @@ -406,6 +407,7 @@ INSERT INTO features ( ewma_on=VALUES(ewma_on), spread_bps=VALUES(spread_bps), half_spread_bps=VALUES(half_spread_bps), tick_bps=VALUES(tick_bps), adv_20=VALUES(adv_20), expected_cost_bps=VALUES(expected_cost_bps), + cost_breakdown_json=VALUES(cost_breakdown_json), net_edge_bps=VALUES(net_edge_bps), entry_interval_volume=VALUES(entry_interval_volume), exit_interval_volume=VALUES(exit_interval_volume), calculated_at=VALUES(calculated_at)`, featureRowFromDomain(feature)) return err @@ -670,7 +672,10 @@ ON DUPLICATE KEY UPDATE halt_reason=IF(halted=1 AND VALUES(halted)=0, halt_reason, VALUES(halt_reason)), last_heartbeat=VALUES(last_heartbeat), context_json=VALUES(context_json)`, state, mode, halted, nullableString(reason), contextJSON) - return err + if err != nil { + return err + } + return r.insertSystemStateHistory(ctx, state, mode, halted, reason, contextJSON) } func (r *Repository) forceSaveSystemState(ctx context.Context, state domain.SystemState, mode domain.Mode, halted bool, reason string, contextJSON string) error { @@ -684,6 +689,16 @@ ON DUPLICATE KEY UPDATE state=VALUES(state), mode=VALUES(mode), halted=VALUES(halted), halt_reason=VALUES(halt_reason), last_heartbeat=VALUES(last_heartbeat), context_json=VALUES(context_json)`, state, mode, halted, nullableString(reason), contextJSON) + if err != nil { + return err + } + return r.insertSystemStateHistory(ctx, state, mode, halted, reason, contextJSON) +} + +func (r *Repository) insertSystemStateHistory(ctx context.Context, state domain.SystemState, mode domain.Mode, halted bool, reason string, contextJSON string) error { + _, err := r.execer().ExecContext(ctx, ` +INSERT INTO system_state_history (ts, state, mode, halted, halt_reason, context_json) +VALUES (UTC_TIMESTAMP(3), ?, ?, ?, ?, ?)`, state, mode, halted, nullableString(reason), contextJSON) return err } @@ -731,6 +746,10 @@ func (r *Repository) getSystemMode(ctx context.Context) (domain.Mode, error) { return mode, nil } +func (r *Repository) GetSystemMode(ctx context.Context) (domain.Mode, error) { + return r.getSystemMode(ctx) +} + func (r *Repository) WasDailyReportSent(ctx context.Context, reportDate time.Time, accountIDHash string) (bool, error) { var count int if err := r.getContext(ctx, &count, ` diff --git a/internal/repository/mysql/rows.go b/internal/repository/mysql/rows.go index 61745f4..b21a82a 100644 --- a/internal/repository/mysql/rows.go +++ b/internal/repository/mysql/rows.go @@ -80,6 +80,7 @@ type featureRow struct { TickBps decimal.Decimal `db:"tick_bps"` ADV20 decimal.Decimal `db:"adv_20"` ExpectedCostBps decimal.Decimal `db:"expected_cost_bps"` + CostBreakdownJSON sql.NullString `db:"cost_breakdown_json"` NetEdgeBps decimal.Decimal `db:"net_edge_bps"` EntryIntervalVolume decimal.Decimal `db:"entry_interval_volume"` ExitIntervalVolume decimal.Decimal `db:"exit_interval_volume"` @@ -104,6 +105,7 @@ func featureRowFromDomain(feature domain.FeatureSet) featureRow { TickBps: feature.TickBps, ADV20: feature.ADV20, ExpectedCostBps: feature.ExpectedCostBps, + CostBreakdownJSON: sql.NullString{String: feature.CostBreakdownJSON, Valid: feature.CostBreakdownJSON != ""}, NetEdgeBps: feature.NetEdgeBps, EntryIntervalVolume: feature.EntryIntervalVolume, ExitIntervalVolume: feature.ExitIntervalVolume, @@ -129,6 +131,7 @@ func (r featureRow) domain() domain.FeatureSet { TickBps: r.TickBps, ADV20: r.ADV20, ExpectedCostBps: r.ExpectedCostBps, + CostBreakdownJSON: r.CostBreakdownJSON.String, NetEdgeBps: r.NetEdgeBps, EntryIntervalVolume: r.EntryIntervalVolume, ExitIntervalVolume: r.ExitIntervalVolume, diff --git a/internal/risk/manager.go b/internal/risk/manager.go index 528290e..566c33b 100644 --- a/internal/risk/manager.go +++ b/internal/risk/manager.go @@ -31,20 +31,23 @@ type ManagerConfig struct { } type PreTradeInput struct { - Portfolio domain.Portfolio - OpenPositions int - ClosingPosition bool - DailyPnL decimal.Decimal - WeeklyPnL decimal.Decimal - MonthlyDrawdownPct decimal.Decimal - AvgSlippageBps10 decimal.Decimal - TradingStatus domain.TradingStatus - QuoteReceivedAt time.Time - Now time.Time - MarketClose time.Time - DatabaseUnavailable bool - UnknownBrokerOrder bool - UnknownBrokerHolding bool + Portfolio domain.Portfolio + OpenPositions int + ClosingPosition bool + DailyPnL decimal.Decimal + WeeklyPnL decimal.Decimal + MonthlyDrawdownPct decimal.Decimal + AvgSlippageBps10 decimal.Decimal + TradingStatus domain.TradingStatus + QuoteReceivedAt time.Time + Now time.Time + MarketClose time.Time + ServerTimeUnavailable bool + ServerClockDrift time.Duration + MaxClockDrift time.Duration + DatabaseUnavailable bool + UnknownBrokerOrder bool + UnknownBrokerHolding bool } type PreTradeResult struct { @@ -84,6 +87,10 @@ func (m Manager) PreTradeCheck(input PreTradeInput) PreTradeResult { switch { case input.DatabaseUnavailable: return reject("database_unavailable") + case input.ServerTimeUnavailable: + return reject("server_time_unavailable") + case input.MaxClockDrift > 0 && input.ServerClockDrift > input.MaxClockDrift: + return reject("server_clock_drift_too_high") case input.UnknownBrokerOrder: return reject("unknown_broker_order") case input.UnknownBrokerHolding: diff --git a/internal/risk/manager_test.go b/internal/risk/manager_test.go index abdd093..ef51411 100644 --- a/internal/risk/manager_test.go +++ b/internal/risk/manager_test.go @@ -2,6 +2,7 @@ package risk import ( "testing" + "time" "github.com/shopspring/decimal" @@ -26,3 +27,30 @@ func TestPreTradeClosingPositionBypassesOpenPositionLimit(t *testing.T) { t.Fatalf("entry result=%+v, want max_open_positions reject", result) } } + +func TestPreTradeRejectsServerClockDrift(t *testing.T) { + manager := NewManager(nil, ManagerConfig{}) + input := PreTradeInput{ + Portfolio: domain.Portfolio{Equity: decimal.NewFromInt(1000)}, + TradingStatus: domain.TradingStatusNormal, + ServerClockDrift: 3 * time.Second, + MaxClockDrift: 2 * time.Second, + } + result := manager.PreTradeCheck(input) + if result.Allowed || result.Reason != "server_clock_drift_too_high" { + t.Fatalf("result=%+v, want server_clock_drift_too_high reject", result) + } +} + +func TestPreTradeRejectsUnavailableServerTime(t *testing.T) { + manager := NewManager(nil, ManagerConfig{}) + input := PreTradeInput{ + Portfolio: domain.Portfolio{Equity: decimal.NewFromInt(1000)}, + TradingStatus: domain.TradingStatusNormal, + ServerTimeUnavailable: true, + } + result := manager.PreTradeCheck(input) + if result.Allowed || result.Reason != "server_time_unavailable" { + t.Fatalf("result=%+v, want server_time_unavailable reject", result) + } +} diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index 5e00646..0320ed3 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -32,6 +32,7 @@ import ( const ( sizeReductionWindowTrades = 20 sizeReductionFactor = 0.5 + sizeReductionTriggerBps = -10 intervalVolumeLookbackDays = 20 ) @@ -899,30 +900,36 @@ func (s *Scheduler) applySizeReductionRule(ctx context.Context, tradeDate time.T if err != nil { return err } - if !ok || count < sizeReductionWindowTrades || averageError.GreaterThanOrEqual(decimal.NewFromInt(-10)) { + if !ok || count < sizeReductionWindowTrades || averageError.GreaterThanOrEqual(decimal.NewFromInt(sizeReductionTriggerBps)) { s.svc.Sizer = s.svc.Sizer.WithSizeFactor(decimal.NewFromInt(1)) return nil } factor := decimal.NewFromFloat(sizeReductionFactor) s.svc.Sizer = s.svc.Sizer.WithSizeFactor(factor) - if !emitEvent { - return nil + if emitEvent { + if err := s.svc.Repo.InsertRiskEvent(ctx, domain.RiskEvent{ + Severity: domain.SeverityWarn, + EventType: "size_reduction_rule_triggered", + Message: fmt.Sprintf("average expected_error_bps over %d trades is %s; sizing factor set to %s", count, averageError.StringFixed(2), factor.String()), + ContextJSON: fmt.Sprintf(`{"average_expected_error_bps":%q,"trades":%d,"size_factor":%q}`, averageError.String(), count, factor.String()), + }); err != nil { + return err + } } - if err := s.svc.Repo.InsertRiskEvent(ctx, domain.RiskEvent{ - Severity: domain.SeverityWarn, - EventType: "size_reduction_rule_triggered", - Message: fmt.Sprintf("average expected_error_bps over %d trades is %s; sizing factor set to %s", count, averageError.StringFixed(2), factor.String()), - ContextJSON: fmt.Sprintf(`{"average_expected_error_bps":%q,"trades":%d,"size_factor":%q}`, averageError.String(), count, factor.String()), - }); err != nil { - return err - } - return s.recommendLiveReadonlyAfterSizeReduction(ctx, averageError, count, factor) + return s.handleLiveReadonlyAfterSizeReduction(ctx, tradeDate, averageError, count, factor, emitEvent) } func (s Scheduler) averageExpectedErrorBps(ctx context.Context, tradeDate time.Time, limit int) (decimal.Decimal, int, bool, error) { + return s.averageExpectedErrorBpsWindow(ctx, tradeDate, 0, limit) +} + +func (s Scheduler) averageExpectedErrorBpsWindow(ctx context.Context, tradeDate time.Time, offset, limit int) (decimal.Decimal, int, bool, error) { if limit <= 0 { return decimal.Zero, 0, false, nil } + if offset < 0 { + offset = 0 + } positionsList, err := s.svc.Repo.ListPositions(ctx, s.svc.AccountIDHash, tradeDate.AddDate(0, 0, -120), tradeDate) if err != nil { return decimal.Zero, 0, false, err @@ -949,6 +956,10 @@ func (s Scheduler) averageExpectedErrorBps(ctx context.Context, tradeDate time.T if sig.InstrumentUID != pos.InstrumentUID || sig.Decision != domain.DecisionEnter { continue } + if offset > 0 { + offset-- + break + } errorsBps = append(errorsBps, pos.RealizedEdgeBps.Sub(sig.NetEdgeBps)) break } @@ -1229,6 +1240,10 @@ func (s Scheduler) checkEntryInstrumentBeforeOrder(instrument domain.Instrument, } func (s Scheduler) preTradeCheck(ctx context.Context, now time.Time, instrumentUID string, portfolio domain.Portfolio, openPositions int, closingPosition bool, tradingStatus domain.TradingStatus, quoteReceivedAt time.Time) (risk.PreTradeResult, error) { + serverClockDrift, serverTimeUnavailable, err := s.preTradeClockDrift(ctx, now) + if err != nil { + return risk.PreTradeResult{}, err + } metrics, err := s.riskMetrics(ctx, now, portfolio) if err != nil { if haltErr := s.halt(ctx, "database_unavailable", fmt.Sprintf("pre-trade risk metrics unavailable: %s", err), instrumentUID); haltErr != nil { @@ -1241,19 +1256,22 @@ func (s Scheduler) preTradeCheck(ctx context.Context, now time.Time, instrumentU return risk.PreTradeResult{}, err } result := s.svc.Risk.PreTradeCheck(risk.PreTradeInput{ - Portfolio: portfolio, - OpenPositions: openPositions, - ClosingPosition: closingPosition, - DailyPnL: metrics.dailyPnL, - WeeklyPnL: metrics.weeklyPnL, - MonthlyDrawdownPct: metrics.monthlyDrawdownPct, - AvgSlippageBps10: metrics.avgSlippageBps10, - TradingStatus: tradingStatus, - QuoteReceivedAt: quoteReceivedAt, - Now: now.UTC(), - MarketClose: s.preTradeDeadlineOn(now, closingPosition), - UnknownBrokerOrder: unknownOrder, - UnknownBrokerHolding: unknownHolding, + Portfolio: portfolio, + OpenPositions: openPositions, + ClosingPosition: closingPosition, + DailyPnL: metrics.dailyPnL, + WeeklyPnL: metrics.weeklyPnL, + MonthlyDrawdownPct: metrics.monthlyDrawdownPct, + AvgSlippageBps10: metrics.avgSlippageBps10, + TradingStatus: tradingStatus, + QuoteReceivedAt: quoteReceivedAt, + Now: now.UTC(), + MarketClose: s.preTradeDeadlineOn(now, closingPosition), + ServerTimeUnavailable: serverTimeUnavailable, + ServerClockDrift: serverClockDrift, + MaxClockDrift: s.cfg.MaxClockDrift, + UnknownBrokerOrder: unknownOrder, + UnknownBrokerHolding: unknownHolding, }) if !result.Allowed && isHardHaltPreTradeReason(result.Reason) { if err := s.halt(ctx, result.Reason, fmt.Sprintf("pre-trade hard limit breached: %s", result.Reason), instrumentUID); err != nil { @@ -1264,6 +1282,20 @@ func (s Scheduler) preTradeCheck(ctx context.Context, now time.Time, instrumentU return result, nil } +func (s Scheduler) preTradeClockDrift(ctx context.Context, now time.Time) (time.Duration, bool, error) { + if s.cfg.MaxClockDrift <= 0 || s.svc.Gateway == nil { + return 0, false, nil + } + serverTime, err := s.svc.Gateway.GetServerTime(ctx) + if err != nil { + if s.cfg.Mode == domain.ModePaper { + return 0, false, nil + } + return 0, true, nil + } + return timeutil.Drift(now.UTC(), serverTime), false, nil +} + func (s Scheduler) unknownBrokerState(ctx context.Context, portfolio domain.Portfolio) (bool, bool, error) { if !s.cfg.Mode.AllowsBrokerOrders() { return false, false, nil @@ -1309,6 +1341,8 @@ func (s Scheduler) unknownBrokerState(ctx context.Context, portfolio domain.Port func isHardHaltPreTradeReason(reason string) bool { switch reason { case "database_unavailable", + "server_time_unavailable", + "server_clock_drift_too_high", "unknown_broker_order", "unknown_broker_position", "trading_status_unknown_before_order", @@ -1392,6 +1426,70 @@ func (s Scheduler) preTradeDeadlineOn(now time.Time, closingPosition bool) time. return s.marketCloseOn(now) } +func (s *Scheduler) handleLiveReadonlyAfterSizeReduction(ctx context.Context, tradeDate time.Time, averageError decimal.Decimal, count int, factor decimal.Decimal, emitRecommendation bool) error { + if s.cfg.Mode != domain.ModeLiveTrade { + return nil + } + previousAverage, previousCount, previousOK, err := s.averageExpectedErrorBpsWindow(ctx, tradeDate, sizeReductionWindowTrades, sizeReductionWindowTrades) + if err != nil { + return err + } + if previousOK && previousCount == sizeReductionWindowTrades && previousAverage.LessThan(decimal.NewFromInt(sizeReductionTriggerBps)) { + return s.activateLiveReadonly(ctx, averageError, count, previousAverage, previousCount, factor) + } + if !emitRecommendation { + return nil + } + return s.recommendLiveReadonlyAfterSizeReduction(ctx, averageError, count, factor) +} + +func (s *Scheduler) activateLiveReadonly(ctx context.Context, averageError decimal.Decimal, count int, previousAverage decimal.Decimal, previousCount int, factor decimal.Decimal) error { + if s.cfg.Mode != domain.ModeLiveTrade { + return nil + } + if s.svc.Repo == nil { + return nil + } + state, halted, reason, err := s.svc.Repo.GetSystemState(ctx) + if err != nil { + return err + } + if halted || state == domain.StateHalted { + return nil + } + message := fmt.Sprintf( + "average expected_error_bps stayed below %d for two consecutive %d-trade windows; switching to live_readonly", + sizeReductionTriggerBps, + sizeReductionWindowTrades, + ) + s.cfg.Mode = domain.ModeLiveReadonly + if s.svc.Execution != nil { + s.svc.Execution.SetMode(domain.ModeLiveReadonly) + } + s.sm = statemachine.New(s.svc.Repo, domain.ModeLiveReadonly) + contextJSON := fmt.Sprintf( + `{"average_expected_error_bps":%q,"trades":%d,"previous_average_expected_error_bps":%q,"previous_trades":%d,"size_factor":%q,"mode":%q}`, + averageError.String(), + count, + previousAverage.String(), + previousCount, + factor.String(), + domain.ModeLiveReadonly, + ) + if err := s.svc.Repo.SaveSystemState(ctx, state, domain.ModeLiveReadonly, false, reason, contextJSON); err != nil { + return err + } + if s.svc.Notifier != nil { + _ = s.svc.Notifier.Alert(ctx, message) + } + return s.svc.Repo.InsertRiskEvent(ctx, domain.RiskEvent{ + Severity: domain.SeverityAlert, + EventType: "live_readonly_activated", + Message: message, + ContextJSON: contextJSON, + }) +} + func (s Scheduler) recommendLiveReadonlyAfterSizeReduction(ctx context.Context, averageError decimal.Decimal, count int, factor decimal.Decimal) error { if s.cfg.Mode != domain.ModeLiveTrade { return nil diff --git a/internal/scheduler/scheduler_test.go b/internal/scheduler/scheduler_test.go index 40221c9..4d459f9 100644 --- a/internal/scheduler/scheduler_test.go +++ b/internal/scheduler/scheduler_test.go @@ -564,6 +564,42 @@ func TestPreTradeDailyLossBreachHalts(t *testing.T) { } } +func TestPreTradeClockDriftBreachHalts(t *testing.T) { + ctx := context.Background() + repo := testutil.NewMemoryRepository() + now := time.Date(2026, 6, 8, 18, 20, 0, 0, time.UTC) + gateway := tinvest.NewFakeGateway() + gateway.ServerTime = now.Add(3 * time.Second) + notifier := &countNotifier{} + s := Scheduler{ + cfg: Config{ + Mode: domain.ModePaper, + Location: time.UTC, + MaxClockDrift: 2 * time.Second, + }, + svc: Services{ + Repo: repo, + Gateway: gateway, + Risk: risk.NewManager(repo, risk.ManagerConfig{}), + Notifier: notifier, + AccountIDHash: "hash", + }, + } + _, err := s.preTradeCheck(ctx, now, "uid", domain.Portfolio{ + Equity: decimal.NewFromInt(10000), + Cash: decimal.NewFromInt(10000), + }, 0, false, domain.TradingStatusNormal, now) + if !errors.Is(err, statemachine.ErrSystemHalted) { + t.Fatalf("err=%v, want ErrSystemHalted", err) + } + if !repo.Halted || repo.HaltReason != "pre-trade hard limit breached: server_clock_drift_too_high" { + t.Fatalf("halted=%v reason=%q", repo.Halted, repo.HaltReason) + } + if notifier.alerts != 1 { + t.Fatalf("alerts=%d, want 1", notifier.alerts) + } +} + func TestPreTradeUsesPhaseDeadlineForMinTimeToClose(t *testing.T) { ctx := context.Background() repo := testutil.NewMemoryRepository() @@ -812,6 +848,78 @@ func TestSizeReductionRuleRecommendsLiveReadonlyInLiveTrade(t *testing.T) { } } +func TestRepeatedSizeReductionRuleActivatesLiveReadonly(t *testing.T) { + ctx := context.Background() + repo := testutil.NewMemoryRepository() + notifier := &countNotifier{} + tradeDate := time.Date(2026, 6, 30, 0, 0, 0, 0, time.UTC) + for i := 0; i < sizeReductionWindowTrades*2; i++ { + date := tradeDate.AddDate(0, 0, -i) + if err := repo.UpsertSignal(ctx, domain.Signal{ + TradeDate: date, + InstrumentUID: "uid", + Decision: domain.DecisionEnter, + NetEdgeBps: decimal.NewFromInt(20), + }); err != nil { + t.Fatal(err) + } + if err := repo.UpsertPosition(ctx, domain.Position{ + AccountIDHash: "hash", + InstrumentUID: "uid", + OpenTradeDate: date, + Lot: 1, + Status: domain.PositionExitFilled, + RealizedEdgeBps: decimal.Zero, + UpdatedAt: date.Add(time.Hour), + }); err != nil { + t.Fatal(err) + } + } + execEngine := execution.NewEngine(domain.ModeLiveTrade, "account", nil, repo) + s := Scheduler{ + cfg: Config{Mode: domain.ModeLiveTrade}, + sm: statemachine.New(repo, domain.ModeLiveTrade), + svc: Services{ + Repo: repo, + AccountIDHash: "hash", + Notifier: notifier, + Execution: &execEngine, + Sizer: risk.NewSizer(risk.SizingConfig{ + MaxPositionPct: decimal.NewFromInt(1), + MaxTotalExposurePct: decimal.NewFromInt(1), + MaxParticipationRate: decimal.NewFromInt(1), + CashUsageBuffer: decimal.NewFromInt(1), + RiskBudgetPerInstrumentPct: decimal.NewFromInt(1), + MinOrderNotionalRUB: decimal.NewFromInt(1), + }), + }, + } + if err := s.applySizeReductionRule(ctx, tradeDate, true); err != nil { + t.Fatal(err) + } + if repo.Mode != domain.ModeLiveReadonly || s.cfg.Mode != domain.ModeLiveReadonly { + t.Fatalf("modes repo=%s scheduler=%s, want live_readonly", repo.Mode, s.cfg.Mode) + } + if len(repo.RiskEvents) != 2 || repo.RiskEvents[1].EventType != "live_readonly_activated" || repo.RiskEvents[1].Severity != domain.SeverityAlert { + t.Fatalf("risk events=%+v, want live_readonly activation alert", repo.RiskEvents) + } + if notifier.alerts != 1 { + t.Fatalf("alerts=%d, want 1", notifier.alerts) + } + _, err := execEngine.PlaceLimit(ctx, domain.Order{ + ClientOrderID: "order", + InstrumentUID: "uid", + TradeDate: tradeDate, + Side: domain.SideBuy, + OrderType: domain.OrderTypeLimit, + LimitPrice: decimal.NewFromInt(100), + QuantityLots: 1, + }) + if !errors.Is(err, execution.ErrBrokerOrdersDisabled) { + t.Fatalf("PlaceLimit err=%v, want ErrBrokerOrdersDisabled after live_readonly activation", err) + } +} + func TestBatchSignalLimitsCapSlotsAndExposure(t *testing.T) { s := Scheduler{ cfg: Config{MaxOpenPositions: 5},