diff --git a/.env.example b/.env.example index dfe8a51..b23fc56 100644 --- a/.env.example +++ b/.env.example @@ -92,3 +92,11 @@ BT_USE_MINUTE_MODEL=false BT_OUTPUT_DIR=./backtest_out LIVE_TRADE_ACK= +LIVE_READONLY_DAYS=0 +LIVE_PAPER_DAYS=0 +LIVE_SANDBOX_DAYS=0 +LIVE_COMMISSION_WHITELIST_CHECKED=false +LIVE_TELEGRAM_TESTED=false +LIVE_KILL_SWITCH_TESTED=false +LIVE_SERVER_TIME_CHECKED=false +LIVE_SMALL_CAPITAL=false diff --git a/README.md b/README.md index ef65b2b..008fdc0 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ make test APP_MODE=backtest go run ./cmd/bot ``` -Для daemon-режимов (`paper`, `sandbox`, `live_readonly`, `live_trade`) нужен `DB_DSN` MariaDB/MySQL. `live_trade` дополнительно требует `LIVE_TRADE_ACK=I_ACCEPT_RISK`. +Для daemon-режимов (`paper`, `sandbox`, `live_readonly`, `live_trade`) нужен `DB_DSN` MariaDB/MySQL. `live_trade` дополнительно требует `LIVE_TRADE_ACK=I_ACCEPT_RISK` и выполненные pre-flight условия из секции `LIVE`. ## Environment Variables @@ -168,6 +168,14 @@ APP_MODE=backtest go run ./cmd/bot | Переменная | Что указывать | Дефолт | Границы/валидация | За что отвечает и что меняется | | --- | --- | --- | --- | --- | | `LIVE_TRADE_ACK` | ровно `I_ACCEPT_RISK` | пусто | обязателен только для `APP_MODE=live_trade` | Ручное подтверждение риска для режима реальной торговли. Без него `live_trade` не стартует. | +| `LIVE_READONLY_DAYS` | целое число торговых дней | `0` | для `live_trade` должно быть `>= 20` | Подтверждает накопленный период работы в `live_readonly` перед реальной торговлей. | +| `LIVE_PAPER_DAYS` | целое число торговых дней | `0` | для `live_trade` должно быть `>= 20` | Подтверждает период `paper`-прогона с bid/ask моделью. | +| `LIVE_SANDBOX_DAYS` | целое число торговых дней | `0` | для `live_trade` должно быть `>= 10` | Подтверждает период sandbox без критических ошибок. | +| `LIVE_COMMISSION_WHITELIST_CHECKED` | `true` или `false` | `false` | для `live_trade` должно быть `true` | Ручное подтверждение актуальных комиссий и whitelist инструментов. | +| `LIVE_TELEGRAM_TESTED` | `true` или `false` | `false` | для `live_trade` должно быть `true` | Подтверждает тест доставки Telegram-уведомлений. | +| `LIVE_KILL_SWITCH_TESTED` | `true` или `false` | `false` | для `live_trade` должно быть `true` | Подтверждает тест ручного halt/unhalt сценария. | +| `LIVE_SERVER_TIME_CHECKED` | `true` или `false` | `false` | для `live_trade` должно быть `true` | Подтверждает проверку server-time/drift в sandbox. | +| `LIVE_SMALL_CAPITAL` | `true` или `false` | `false` | для `live_trade` должно быть `true` | Подтверждает запуск реальной торговли с малым стартовым капиталом. | ## Commands diff --git a/internal/backtest/engine.go b/internal/backtest/engine.go index e757e41..0b87ff3 100644 --- a/internal/backtest/engine.go +++ b/internal/backtest/engine.go @@ -212,9 +212,6 @@ func (e Engine) RunWithMinuteCandles(candlesByInstrument map[string][]domain.Can return Result{}, err } if ok { - if capacity := e.windowCapacity(candidate, preparedMinutes[instrumentUID]); capacity.IsPositive() { - candidate.capacity = capacity - } candidatesByExitDate[candidate.exit.TradeDate.Format("2006-01-02")] = append(candidatesByExitDate[candidate.exit.TradeDate.Format("2006-01-02")], candidate) } } @@ -251,20 +248,29 @@ func (e Engine) RunWithMinuteCandles(candlesByInstrument map[string][]domain.Can dayStartEquity := equity dayPnL := decimal.Zero for _, c := range dayCandidates { + entryIntervalVolume, exitIntervalVolume := e.windowVolumes(c, preparedMinutes[c.instrumentUID]) + capacity := decimal.Zero + if entryIntervalVolume.IsPositive() && exitIntervalVolume.IsPositive() { + capacity = money.Min(entryIntervalVolume, exitIntervalVolume).Mul(e.cfg.MaxParticipationRate) + } else if e.cfg.UseMinuteModel { + continue + } else { + entryIntervalVolume = e.unconstrainedIntervalVolume(equity) + exitIntervalVolume = entryIntervalVolume + } sized := sizer.Size(risk.SizingInput{ Portfolio: domain.Portfolio{Equity: equity, Cash: cash}, SelectedInstruments: len(dayCandidates), LimitPrice: c.buy, Lot: c.lot, - EntryIntervalVolume: c.adv, - ExitIntervalVolume: c.adv, + EntryIntervalVolume: entryIntervalVolume, + ExitIntervalVolume: exitIntervalVolume, Q05OvernightAbs: c.q05Abs, }) if sized.Lots <= 0 { continue } lots := sized.Lots - capacity := c.capacity if e.cfg.UseMinuteModel { executedLots, minuteCapacity, ok := e.minuteExecution(c, preparedMinutes[c.instrumentUID], sized.Lots) if !ok { @@ -365,22 +371,34 @@ func (e Engine) fillableMinuteLots(minutes []domain.Candle, date time.Time, limi } func (e Engine) windowCapacity(c candidate, minutes []domain.Candle) decimal.Decimal { - if len(minutes) == 0 { + entryVolume, exitVolume := e.windowVolumes(c, minutes) + if !entryVolume.IsPositive() || !exitVolume.IsPositive() { return decimal.Zero } + return money.Min(entryVolume, exitVolume).Mul(e.cfg.MaxParticipationRate) +} + +func (e Engine) windowVolumes(c candidate, minutes []domain.Candle) (decimal.Decimal, decimal.Decimal) { + if len(minutes) == 0 { + return decimal.Zero, decimal.Zero + } lot := c.lot if lot <= 0 { lot = e.lotFor(c.instrumentUID) } if lot <= 0 { - return decimal.Zero + return decimal.Zero, decimal.Zero } entryVolume := e.windowNotional(minutes, c.entry.TradeDate, e.cfg.EntryWindow, lot) exitVolume := e.windowNotional(minutes, c.exit.TradeDate, e.cfg.ExitWindow, lot) - if !entryVolume.IsPositive() || !exitVolume.IsPositive() { + return entryVolume, exitVolume +} + +func (e Engine) unconstrainedIntervalVolume(equity decimal.Decimal) decimal.Decimal { + if !equity.IsPositive() || !e.cfg.MaxParticipationRate.IsPositive() { return decimal.Zero } - return money.Min(entryVolume, exitVolume).Mul(e.cfg.MaxParticipationRate) + return equity.Div(e.cfg.MaxParticipationRate).Mul(decimal.NewFromInt(10)) } func (e Engine) windowNotional(minutes []domain.Candle, date time.Time, window TimeWindow, lot int64) decimal.Decimal { @@ -421,7 +439,6 @@ type candidate struct { adv decimal.Decimal q05Abs decimal.Decimal overnightGap decimal.Decimal - capacity decimal.Decimal lot int64 } @@ -507,7 +524,6 @@ func (e Engine) evaluateCandidate(instrumentUID string, candles []domain.Candle, adv: adv, q05Abs: q05Abs, overnightGap: gap, - capacity: adv.Mul(e.cfg.MaxParticipationRate), lot: lot, }, true, nil } diff --git a/internal/backtest/lookahead_test.go b/internal/backtest/lookahead_test.go index d4b7371..e15dc51 100644 --- a/internal/backtest/lookahead_test.go +++ b/internal/backtest/lookahead_test.go @@ -128,6 +128,27 @@ func TestWindowCapacityUsesMinuteEntryAndExitWindows(t *testing.T) { } } +func TestBacktestWithoutMinuteDataDoesNotReportADVAsCapacity(t *testing.T) { + engine := New(Config{ + RollingShort: 2, + RollingLong: 2, + MinTStat60: decimal.NewFromInt(-1), + MinWinRate60: decimal.NewFromFloat(0.1), + MinNetEdgeBps: decimal.NewFromInt(-1000), + MinADVRUB: decimal.NewFromInt(1), + }) + result, err := engine.Run(map[string][]domain.Candle{"uid": candidateCandles("uid")}) + if err != nil { + t.Fatal(err) + } + if len(result.Trades) == 0 { + t.Fatal("expected daily-only minimal backtest trade") + } + if !result.Trades[0].CapacityRUB.IsZero() { + t.Fatalf("capacity=%s, want zero when minute windows are unavailable", result.Trades[0].CapacityRUB) + } +} + func TestLoadCandlesCSVWithMetadata(t *testing.T) { raw := strings.NewReader(`instrument_uid,trade_date,open,high,low,close,volume_lots,lot,min_price_increment uid,2024-01-02,100,101,99,100,10,10,0.05 diff --git a/internal/config/config.go b/internal/config/config.go index 3a5dd56..4ef3fc6 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -15,6 +15,12 @@ import ( const liveTradeAck = "I_ACCEPT_RISK" const maxQuoteDepth = 50 +const ( + minLiveReadonlyDays = 20 + minPaperDays = 20 + minSandboxDays = 10 +) + type Config struct { App AppConfig `envPrefix:"APP_"` TInvest TInvestConfig `envPrefix:"TINVEST_"` @@ -147,7 +153,15 @@ type BacktestConfig struct { } type LiveConfig struct { - TradeAck string `env:"TRADE_ACK"` + TradeAck string `env:"TRADE_ACK"` + ReadonlyDays int `env:"READONLY_DAYS" envDefault:"0"` + PaperDays int `env:"PAPER_DAYS" envDefault:"0"` + SandboxDays int `env:"SANDBOX_DAYS" envDefault:"0"` + CommissionWhitelistChecked bool `env:"COMMISSION_WHITELIST_CHECKED" envDefault:"false"` + TelegramTested bool `env:"TELEGRAM_TESTED" envDefault:"false"` + KillSwitchTested bool `env:"KILL_SWITCH_TESTED" envDefault:"false"` + ServerTimeChecked bool `env:"SERVER_TIME_CHECKED" envDefault:"false"` + SmallCapital bool `env:"SMALL_CAPITAL" envDefault:"false"` } func Load() (Config, error) { @@ -236,8 +250,41 @@ func (c *Config) Validate() error { if c.TInvest.UseSandbox && c.App.Mode != domain.ModeSandbox { return errors.New("TINVEST_USE_SANDBOX=true is only valid with APP_MODE=sandbox") } - if c.App.Mode == domain.ModeLiveTrade && c.Live.TradeAck != liveTradeAck { - return fmt.Errorf("LIVE_TRADE_ACK=%s is required for APP_MODE=live_trade", liveTradeAck) + if c.App.Mode == domain.ModeLiveTrade { + if c.Live.TradeAck != liveTradeAck { + return fmt.Errorf("LIVE_TRADE_ACK=%s is required for APP_MODE=live_trade", liveTradeAck) + } + if err := c.validateLiveTradePreconditions(); err != nil { + return err + } + } + return nil +} + +func (c Config) validateLiveTradePreconditions() error { + if c.Live.ReadonlyDays < minLiveReadonlyDays { + return fmt.Errorf("LIVE_READONLY_DAYS must be >= %d for APP_MODE=live_trade", minLiveReadonlyDays) + } + if c.Live.PaperDays < minPaperDays { + return fmt.Errorf("LIVE_PAPER_DAYS must be >= %d for APP_MODE=live_trade", minPaperDays) + } + if c.Live.SandboxDays < minSandboxDays { + return fmt.Errorf("LIVE_SANDBOX_DAYS must be >= %d for APP_MODE=live_trade", minSandboxDays) + } + if !c.Live.CommissionWhitelistChecked { + return errors.New("LIVE_COMMISSION_WHITELIST_CHECKED=true is required for APP_MODE=live_trade") + } + if !c.Live.TelegramTested { + return errors.New("LIVE_TELEGRAM_TESTED=true is required for APP_MODE=live_trade") + } + if !c.Live.KillSwitchTested { + return errors.New("LIVE_KILL_SWITCH_TESTED=true is required for APP_MODE=live_trade") + } + if !c.Live.ServerTimeChecked { + return errors.New("LIVE_SERVER_TIME_CHECKED=true is required for APP_MODE=live_trade") + } + if !c.Live.SmallCapital { + return errors.New("LIVE_SMALL_CAPITAL=true is required for APP_MODE=live_trade") } return nil } diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 043a9d8..f623928 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -27,6 +27,23 @@ func TestValidateAllowsCancelCountsFreeOrderPolicy(t *testing.T) { } } +func TestValidateLiveTradeRequiresPreconditions(t *testing.T) { + cfg := minimalBrokerConfig(domain.ModeLiveTrade) + cfg.Live.TradeAck = liveTradeAck + err := cfg.Validate() + if err == nil || !strings.Contains(err.Error(), "LIVE_READONLY_DAYS") { + t.Fatalf("Validate err=%v, want live_trade readonly precondition", err) + } +} + +func TestValidateLiveTradeAcceptsAllPreconditions(t *testing.T) { + cfg := minimalBrokerConfig(domain.ModeLiveTrade) + cfg.Live = validLiveTradeConfig() + if err := cfg.Validate(); err != nil { + t.Fatalf("Validate live_trade preconditions: %v", err) + } +} + func minimalBrokerConfig(mode domain.Mode) Config { return Config{ App: AppConfig{ @@ -64,6 +81,20 @@ func minimalBrokerConfig(mode domain.Mode) Config { } } +func validLiveTradeConfig() LiveConfig { + return LiveConfig{ + TradeAck: liveTradeAck, + ReadonlyDays: minLiveReadonlyDays, + PaperDays: minPaperDays, + SandboxDays: minSandboxDays, + CommissionWhitelistChecked: true, + TelegramTested: true, + KillSwitchTested: true, + ServerTimeChecked: true, + SmallCapital: true, + } +} + func mustTOD(raw string) timeutil.TimeOfDay { tod, err := timeutil.ParseTimeOfDay(raw) if err != nil { diff --git a/internal/execution/engine.go b/internal/execution/engine.go index 04a89cd..6fe2e3a 100644 --- a/internal/execution/engine.go +++ b/internal/execution/engine.go @@ -149,6 +149,9 @@ func (e *Engine) placeLimit(ctx context.Context, order domain.Order, freeOrderLi lock := e.lockFor(order.InstrumentUID) lock.Lock() defer lock.Unlock() + if e.mode != domain.ModePaper && !e.mode.AllowsBrokerOrders() { + return order, ErrBrokerOrdersDisabled + } if e.store != nil { existing, err := e.findExisting(ctx, order) if err != nil { @@ -161,13 +164,6 @@ func (e *Engine) placeLimit(ctx context.Context, order domain.Order, freeOrderLi if e.mode == domain.ModePaper { return e.placePaperLimit(ctx, order, freeOrderLimit) } - if !e.mode.AllowsBrokerOrders() { - order.Status = domain.OrderStatusNew - if e.store != nil { - return order, e.store.UpsertOrder(ctx, order) - } - return order, ErrBrokerOrdersDisabled - } if e.gateway == nil { return domain.Order{}, errors.New("gateway is nil") } @@ -569,16 +565,24 @@ func (e *Engine) checkQuoteFresh(book domain.OrderBook) error { if e.maxQuoteAge <= 0 { return nil } - if book.ReceivedAt.IsZero() { - return fmt.Errorf("quote received timestamp is missing") + quoteTs := quoteTimestamp(book) + if quoteTs.IsZero() { + return fmt.Errorf("quote timestamp is missing") } - age := e.nowUTC().Sub(book.ReceivedAt) + age := e.nowUTC().Sub(quoteTs) if age > e.maxQuoteAge { return fmt.Errorf("quote age %s exceeds %s", age, e.maxQuoteAge) } return nil } +func quoteTimestamp(book domain.OrderBook) time.Time { + if !book.Time.IsZero() { + return book.Time.UTC() + } + return book.ReceivedAt.UTC() +} + func (e *Engine) lockFor(instrumentUID string) *sync.Mutex { value, _ := e.mu.LoadOrStore(instrumentUID, &sync.Mutex{}) lock, ok := value.(*sync.Mutex) diff --git a/internal/execution/state_test.go b/internal/execution/state_test.go index b898476..2158131 100644 --- a/internal/execution/state_test.go +++ b/internal/execution/state_test.go @@ -243,6 +243,52 @@ func TestPlaceEntryRejectsStaleQuote(t *testing.T) { } } +func TestPlaceEntryRejectsStaleExchangeQuoteTime(t *testing.T) { + ctx := context.Background() + now := time.Date(2026, 6, 8, 18, 20, 0, 0, time.UTC) + engine := NewEngine(domain.ModeSandbox, "account", tinvest.NewFakeGateway(), testutil.NewMemoryRepository()) + engine.SetClock(&fixedClock{now: now}) + engine.SetMaxQuoteAge(time.Second) + _, err := engine.PlaceEntry(ctx, "hash", domain.Instrument{ + InstrumentUID: "uid", + Lot: 1, + MinPriceIncrement: decimal.NewFromInt(1), + }, now, 1, domain.OrderBook{ + InstrumentUID: "uid", + Time: now.Add(-2 * time.Second), + ReceivedAt: now, + Bids: []domain.OrderBookLevel{{Price: decimal.NewFromInt(99), QuantityLots: 10}}, + Asks: []domain.OrderBookLevel{{Price: decimal.NewFromInt(101), QuantityLots: 10}}, + }, 1, 1) + if err == nil { + t.Fatal("expected stale exchange quote timestamp error") + } +} + +func TestLiveReadonlyDoesNotPersistLocalOrder(t *testing.T) { + ctx := context.Background() + repo := testutil.NewMemoryRepository() + engine := NewEngine(domain.ModeLiveReadonly, "account", tinvest.NewFakeGateway(), repo) + _, err := engine.PlaceLimit(ctx, domain.Order{ + ClientOrderID: "readonly-order", + AccountIDHash: "hash", + InstrumentUID: "uid", + TradeDate: time.Date(2026, 6, 8, 0, 0, 0, 0, time.UTC), + Side: domain.SideBuy, + OrderType: domain.OrderTypeLimit, + LimitPrice: decimal.NewFromInt(100), + QuantityLots: 1, + Status: domain.OrderStatusNew, + AttemptNo: 1, + }) + if !errors.Is(err, ErrBrokerOrdersDisabled) { + t.Fatalf("PlaceLimit err=%v, want ErrBrokerOrdersDisabled", err) + } + if len(repo.Orders) != 0 { + t.Fatalf("readonly mode persisted orders: %+v", repo.Orders) + } +} + func TestMonitorUntilRepostsAndExpiresAtDeadline(t *testing.T) { ctx := context.Background() repo := testutil.NewMemoryRepository() diff --git a/internal/marketdata/loader.go b/internal/marketdata/loader.go index f98fa48..dc8912d 100644 --- a/internal/marketdata/loader.go +++ b/internal/marketdata/loader.go @@ -86,16 +86,24 @@ func (l Loader) LatestQuote(ctx context.Context, instrumentUID string, depth int if err != nil { return domain.OrderBook{}, err } - if book.ReceivedAt.IsZero() { - return domain.OrderBook{}, fmt.Errorf("quote received timestamp is missing") + quoteTs := quoteTimestamp(book) + if quoteTs.IsZero() { + return domain.OrderBook{}, fmt.Errorf("quote timestamp is missing") } - age := l.nowUTC().Sub(book.ReceivedAt) + age := l.nowUTC().Sub(quoteTs) if maxAge > 0 && age > maxAge { return domain.OrderBook{}, fmt.Errorf("quote age %s exceeds %s", age, maxAge) } return book, nil } +func quoteTimestamp(book domain.OrderBook) time.Time { + if !book.Time.IsZero() { + return book.Time.UTC() + } + return book.ReceivedAt.UTC() +} + func (l Loader) nowUTC() time.Time { if l.clock == nil { return time.Now().UTC() diff --git a/internal/marketdata/loader_test.go b/internal/marketdata/loader_test.go new file mode 100644 index 0000000..a46360e --- /dev/null +++ b/internal/marketdata/loader_test.go @@ -0,0 +1,43 @@ +package marketdata + +import ( + "context" + "strings" + "testing" + "time" + + "github.com/shopspring/decimal" + + "overnight-trading-bot/internal/domain" + "overnight-trading-bot/internal/tinvest" +) + +type fixedClock struct { + now time.Time +} + +func (c fixedClock) Now() time.Time { + return c.now +} + +func (c fixedClock) Sleep(<-chan struct{}, time.Duration) bool { + return true +} + +func TestLatestQuoteUsesExchangeTimestampForFreshness(t *testing.T) { + now := time.Date(2026, 6, 8, 18, 20, 0, 0, time.UTC) + gateway := tinvest.NewFakeGateway() + gateway.OrderBooks["uid"] = domain.OrderBook{ + InstrumentUID: "uid", + Time: now.Add(-2 * time.Second), + ReceivedAt: now, + Bids: []domain.OrderBookLevel{{Price: decimal.NewFromInt(99), QuantityLots: 10}}, + Asks: []domain.OrderBookLevel{{Price: decimal.NewFromInt(101), QuantityLots: 10}}, + } + loader := NewLoader(nil, gateway) + loader.SetClock(fixedClock{now: now}) + _, err := loader.LatestQuote(context.Background(), "uid", 20, time.Second) + if err == nil || !strings.Contains(err.Error(), "quote age") { + t.Fatalf("LatestQuote err=%v, want stale exchange timestamp rejection", err) + } +} diff --git a/internal/repository/mysql/repository.go b/internal/repository/mysql/repository.go index 3d3b7e5..a9e9aa3 100644 --- a/internal/repository/mysql/repository.go +++ b/internal/repository/mysql/repository.go @@ -356,7 +356,7 @@ INSERT INTO candles_minute ( :instrument_uid, :trade_date, :open, :high, :low, :close, :volume_lots, :source, :loaded_at ) ON DUPLICATE KEY UPDATE open=VALUES(open), high=VALUES(high), low=VALUES(low), close=VALUES(close), - volume_lots=VALUES(volume_lots), source=VALUES(source), loaded_at=VALUES(loaded_at)`, candleRowFromDomain(candle)) + volume_lots=VALUES(volume_lots), source=VALUES(source), loaded_at=VALUES(loaded_at)`, minuteCandleRowFromDomain(candle)) if err != nil { return err } diff --git a/internal/repository/mysql/rows.go b/internal/repository/mysql/rows.go index 528b450..61745f4 100644 --- a/internal/repository/mysql/rows.go +++ b/internal/repository/mysql/rows.go @@ -35,6 +35,20 @@ func candleRowFromDomain(candle domain.Candle) candleRow { } } +func minuteCandleRowFromDomain(candle domain.Candle) candleRow { + return candleRow{ + InstrumentUID: candle.InstrumentUID, + TradeDate: candle.TradeDate.UTC(), + Open: candle.Open, + High: candle.High, + Low: candle.Low, + Close: candle.Close, + VolumeLots: candle.VolumeLots, + Source: candle.Source, + LoadedAt: candle.LoadedAt, + } +} + func (r candleRow) domain() domain.Candle { return domain.Candle{ InstrumentUID: r.InstrumentUID, diff --git a/internal/repository/mysql/rows_test.go b/internal/repository/mysql/rows_test.go new file mode 100644 index 0000000..7106ec4 --- /dev/null +++ b/internal/repository/mysql/rows_test.go @@ -0,0 +1,27 @@ +package mysql + +import ( + "testing" + "time" + + "github.com/shopspring/decimal" + + "overnight-trading-bot/internal/domain" +) + +func TestMinuteCandleRowPreservesTimestamp(t *testing.T) { + ts := time.Date(2026, 6, 8, 15, 25, 30, 123000000, time.UTC) + row := minuteCandleRowFromDomain(domain.Candle{ + InstrumentUID: "uid", + TradeDate: ts, + Open: decimal.NewFromInt(1), + }) + if !row.TradeDate.Equal(ts) { + t.Fatalf("minute timestamp=%s, want %s", row.TradeDate, ts) + } + + daily := candleRowFromDomain(domain.Candle{InstrumentUID: "uid", TradeDate: ts}) + if daily.TradeDate.Equal(ts) || daily.TradeDate.Hour() != 0 || daily.TradeDate.Minute() != 0 { + t.Fatalf("daily timestamp was not truncated to date: %s", daily.TradeDate) + } +} diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index 5afb4e8..663e2f8 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -153,18 +153,34 @@ func (s *Scheduler) Step(ctx context.Context) error { return nil } phase := s.phase(now) + current, halted, reason, err := s.svc.Repo.GetSystemState(ctx) + if err != nil { + return err + } + if halted || current == domain.StateHalted { + return fmt.Errorf("%w: %s", statemachine.ErrSystemHalted, reason) + } switch phase { case domain.StateWaitExitWindow: return s.waitExit(ctx, now) case domain.StatePlaceExitOrders: + if current == domain.StateMonitorExitOrders { + return s.monitorExitOrders(ctx, now) + } return s.placeExitOrders(ctx, now) case domain.StateMonitorExitOrders: return s.monitorExitOrders(ctx, now) case domain.StateReconcile: return s.failOpenPositionsAtHardDeadline(ctx) case domain.StateGenerateSignals: + if signalPhaseAlreadyPrepared(current) { + return s.sm.Heartbeat(ctx, current) + } return s.prepareSignals(ctx, now) case domain.StatePlaceEntryOrders: + if current == domain.StateMonitorEntryOrders { + return s.monitorEntryOrders(ctx, now) + } return s.placeEntryOrders(ctx, now) case domain.StateMonitorEntryOrders: return s.monitorEntryOrders(ctx, now) @@ -492,7 +508,7 @@ func (s *Scheduler) placeEntryOrders(ctx context.Context, now time.Time) error { } continue } - pre, err := s.preTradeCheck(ctx, now, sig.InstrumentUID, portfolio, projectedOpenPositions, false, tradingStatus, book.ReceivedAt) + pre, err := s.preTradeCheck(ctx, now, sig.InstrumentUID, portfolio, projectedOpenPositions, false, tradingStatus, quoteTimestamp(book)) if err != nil { return err } @@ -664,7 +680,7 @@ func (s *Scheduler) placeExitOrders(ctx context.Context, now time.Time) error { if err != nil { return err } - pre, err := s.preTradeCheck(ctx, now, pos.InstrumentUID, portfolio, len(positionsList), true, tradingStatus, book.ReceivedAt) + pre, err := s.preTradeCheck(ctx, now, pos.InstrumentUID, portfolio, len(positionsList), true, tradingStatus, quoteTimestamp(book)) if err != nil { return err } @@ -954,7 +970,11 @@ func (s *Scheduler) checkInfrastructure(ctx context.Context) error { } drift := timeutil.Drift(s.nowUTC(), serverTime) if drift > s.cfg.MaxClockDrift { - return s.recordInfrastructureFailure(ctx, fmt.Errorf("server_clock_drift_too_high: %s > %s", drift, s.cfg.MaxClockDrift)) + reason := fmt.Sprintf("server_clock_drift_too_high: %s > %s", drift, s.cfg.MaxClockDrift) + if err := s.halt(ctx, "server_clock_drift_too_high", reason, ""); err != nil { + return err + } + return fmt.Errorf("%w: %s", statemachine.ErrSystemHalted, reason) } s.infraFailedSince = time.Time{} return nil @@ -1177,7 +1197,7 @@ func (s Scheduler) repostPreTradeCheck(ctx context.Context, now time.Time, order if err != nil { return err } - pre, err := s.preTradeCheck(ctx, now, order.InstrumentUID, portfolio, len(openPositions), order.Side == domain.SideSell, tradingStatus, book.ReceivedAt) + pre, err := s.preTradeCheck(ctx, now, order.InstrumentUID, portfolio, len(openPositions), order.Side == domain.SideSell, tradingStatus, quoteTimestamp(book)) if err != nil { return err } @@ -1457,6 +1477,25 @@ func isSizingSkipReason(reason string) bool { return reason == "lots_below_one" || reason == "min_order_notional" } +func signalPhaseAlreadyPrepared(state domain.SystemState) bool { + switch state { + case domain.StateWaitEntryWindow, + domain.StatePlaceEntryOrders, + domain.StateMonitorEntryOrders, + domain.StateHoldOvernight: + return true + default: + return false + } +} + +func quoteTimestamp(book domain.OrderBook) time.Time { + if !book.Time.IsZero() { + return book.Time.UTC() + } + return book.ReceivedAt.UTC() +} + func (s Scheduler) hasStateMachine() bool { return s.sm != (statemachine.System{}) } @@ -1488,7 +1527,9 @@ func (s Scheduler) transitionTo(ctx context.Context, to domain.SystemState) erro } func (s Scheduler) halt(ctx context.Context, eventType, reason, instrumentUID string) error { - _ = s.svc.Notifier.Alert(ctx, fmt.Sprintf("%s: %s", eventType, reason)) + if s.svc.Notifier != nil { + _ = s.svc.Notifier.Alert(ctx, fmt.Sprintf("%s: %s", eventType, reason)) + } return s.svc.Risk.Halt(ctx, s.cfg.Mode, eventType, reason, instrumentUID) } diff --git a/internal/scheduler/scheduler_test.go b/internal/scheduler/scheduler_test.go index 4653036..7b53971 100644 --- a/internal/scheduler/scheduler_test.go +++ b/internal/scheduler/scheduler_test.go @@ -85,23 +85,98 @@ func TestPhaseHonorsExitNotBeforeWhenWindowStartsEarlier(t *testing.T) { } } -func TestInfrastructureOutageRequiresThreshold(t *testing.T) { +func TestClockDriftHardLimitHaltsImmediately(t *testing.T) { gateway := tinvest.NewFakeGateway() gateway.ServerTime = time.Now().UTC().Add(-10 * time.Second) + repo := testutil.NewMemoryRepository() + notifier := &countNotifier{} s := &Scheduler{ cfg: Config{ Mode: domain.ModeSandbox, MaxClockDrift: 2 * time.Second, APIOutageHalt: 180 * time.Second, }, - svc: Services{Gateway: gateway}, + svc: Services{ + Gateway: gateway, + Risk: risk.NewManager(repo, risk.ManagerConfig{}), + Notifier: notifier, + }, } - if err := s.checkInfrastructure(context.Background()); err != nil { - t.Fatalf("first infrastructure failure should be tolerated: %v", err) + if err := s.checkInfrastructure(context.Background()); !errors.Is(err, statemachine.ErrSystemHalted) { + t.Fatalf("err=%v, want immediate halt on clock drift", err) } - s.infraFailedSince = time.Now().UTC().Add(-181 * time.Second) - if err := s.checkInfrastructure(context.Background()); err == nil { - t.Fatalf("expected outage after threshold") + if !repo.Halted || repo.HaltReason == "" { + t.Fatalf("system was not halted: state=%s halted=%v reason=%q", repo.State, repo.Halted, repo.HaltReason) + } + if notifier.alerts != 1 { + t.Fatalf("alerts=%d, want 1", notifier.alerts) + } +} + +func TestStepIsIdempotentAfterSignalPreparation(t *testing.T) { + ctx := context.Background() + repo := testutil.NewMemoryRepository() + now := time.Date(2026, 6, 8, 18, 15, 0, 0, time.UTC) + if err := repo.SaveSystemState(ctx, domain.StateWaitEntryWindow, domain.ModePaper, false, "", "{}"); err != nil { + t.Fatal(err) + } + s := Scheduler{ + clock: fixedClock{now: now}, + cfg: Config{ + Mode: domain.ModePaper, + Location: time.UTC, + EntrySignalTime: mustTOD("18:10:00"), + EntryWindowStart: mustTOD("18:20:00"), + }, + sm: statemachine.New(repo, domain.ModePaper), + svc: Services{ + Repo: repo, + Notifier: &countNotifier{}, + AccountIDHash: "hash", + }, + } + if err := s.Step(ctx); err != nil { + t.Fatal(err) + } + state, halted, _, err := repo.GetSystemState(ctx) + if err != nil { + t.Fatal(err) + } + if halted || state != domain.StateWaitEntryWindow { + t.Fatalf("state=%s halted=%v, want WAIT_ENTRY_WINDOW without rollback", state, halted) + } +} + +func TestStepMonitorsEntryOrdersOnRepeatedEntryWindowTick(t *testing.T) { + ctx := context.Background() + repo := testutil.NewMemoryRepository() + now := time.Date(2026, 6, 8, 18, 25, 0, 0, time.UTC) + if err := repo.SaveSystemState(ctx, domain.StateMonitorEntryOrders, domain.ModePaper, false, "", "{}"); err != nil { + t.Fatal(err) + } + s := Scheduler{ + clock: fixedClock{now: now}, + cfg: Config{ + Mode: domain.ModePaper, + Location: time.UTC, + EntryWindowStart: mustTOD("18:20:00"), + NoNewEntryAfter: mustTOD("18:38:30"), + }, + sm: statemachine.New(repo, domain.ModePaper), + svc: Services{ + Repo: repo, + AccountIDHash: "hash", + }, + } + if err := s.Step(ctx); err != nil { + t.Fatal(err) + } + state, halted, _, err := repo.GetSystemState(ctx) + if err != nil { + t.Fatal(err) + } + if halted || state != domain.StateMonitorEntryOrders { + t.Fatalf("state=%s halted=%v, want MONITOR_ENTRY_ORDERS", state, halted) } }