diff --git a/.env.example b/.env.example index a6f0de0..dfe8a51 100644 --- a/.env.example +++ b/.env.example @@ -66,6 +66,7 @@ RISK_API_OUTAGE_HALT_SEC=180 RISK_MAX_CLOCK_DRIFT_SEC=2 RISK_RECONCILIATION_WINDOW_HOURS=72 RISK_RECONCILIATION_SKEW_SEC=10 +RISK_COMMISSION_TOLERANCE_RUB=0.01 RISK_CASH_USAGE_BUFFER=0.95 RISK_RISK_BUDGET_PER_INSTRUMENT_PCT=0.005 RISK_MIN_ORDER_NOTIONAL_RUB=1000 diff --git a/README.md b/README.md index 089b5cf..8da3bda 100644 --- a/README.md +++ b/README.md @@ -40,7 +40,7 @@ APP_MODE=backtest go run ./cmd/bot | Переменная | Что указывать | Дефолт | Границы/валидация | За что отвечает и что меняется | | --- | --- | --- | --- | --- | | `TINVEST_TOKEN` | токен T-Invest API | пусто | обязателен для `sandbox`, `live_readonly`, `live_trade` | Доступ к реальному или sandbox API. В `paper` и `backtest` не нужен. | -| `TINVEST_ACCOUNT_ID` | идентификатор брокерского счёта | пусто | строка; в коде непустота не проверяется | Счёт для портфеля, заявок и сверки. Для API-режимов нужно указать реальный account id, иначе операции у брокера могут падать. | +| `TINVEST_ACCOUNT_ID` | идентификатор брокерского счёта | пусто | обязателен для `sandbox`, `live_readonly`, `live_trade` | Счёт для портфеля, заявок и сверки. Для API-режимов бот падает на старте, если account id не указан. | | `TINVEST_ENDPOINT` | gRPC endpoint T-Invest, обычно `host:port` | `invest-public-api.tinkoff.ru:443` | строка; валидации формата нет | Endpoint для API. В `sandbox` код принудительно использует sandbox endpoint. | | `TINVEST_APP_NAME` | имя приложения | `overnight-trading-bot` | строка | Передаётся в SDK как имя клиента. Меняет идентификацию приложения на стороне API/логов. | | `TINVEST_REQUEST_TIMEOUT_SEC` | целое число секунд | `10` | рекомендуется `> 0`; сейчас не применяется | Зарезервировано под таймаут API-запросов. На текущий код не влияет. | @@ -121,6 +121,7 @@ APP_MODE=backtest go run ./cmd/bot | `RISK_MAX_CLOCK_DRIFT_SEC` | целое число секунд | `2` | `> 0` включает проверку drift; `<= 0` отключает | Максимальный рассинхрон локального времени и серверного времени API в `/ready`. | | `RISK_RECONCILIATION_WINDOW_HOURS` | целое число часов | `72` | должно быть `> 0` | Глубина сверки последних заявок и операций брокера. Больше - больше история сверки, но тяжелее запросы. | | `RISK_RECONCILIATION_SKEW_SEC` | целое число секунд | `10` | `>= 0` | Grace-window для только что отправленных локальных заявок: свежие in-flight orders не считаются diff, пока брокерский active-list догоняет запись. | +| `RISK_COMMISSION_TOLERANCE_RUB` | сумма в рублях | `0.01` | `>= 0` | Допуск для reconciliation по расхождению локальной и брокерской комиссии. Ненулевая брокерская комиссия всё равно считается нарушением при `COMM_REQUIRE_ZERO_COMMISSION=true`. | | `RISK_CASH_USAGE_BUFFER` | доля cash | `0.95` | рекомендуется `0..1`; `0` запрещает использование cash | Какая часть свободных денег может идти в sizing. Меньше - больше денежный буфер. | | `RISK_RISK_BUDGET_PER_INSTRUMENT_PCT` | доля equity | `0.005` | рекомендуется `> 0` | Риск-бюджет на инструмент, используется вместе с оценкой неблагоприятного overnight-движения. Больше - крупнее позиции при прочих равных. | | `RISK_MIN_ORDER_NOTIONAL_RUB` | сумма в рублях | `1000` | `> 0` включает минимум; `<= 0` фактически отключает | Минимальный notional заявки. Если рассчитанная позиция меньше, сигнал отклоняется по sizing. | @@ -144,7 +145,7 @@ APP_MODE=backtest go run ./cmd/bot | Переменная | Что указывать | Дефолт | Границы/валидация | За что отвечает и что меняется | | --- | --- | --- | --- | --- | | `COMM_REQUIRE_ZERO_COMMISSION` | `true` или `false` | `true` | boolean | При `true` сигналы по инструментам с ожидаемой комиссией `> 0` отклоняются. | -| `COMM_QUARANTINE_ON_NONZERO` | `true` или `false` | `true` | boolean; сейчас не применяется | Зарезервировано под автоматический quarantine при ненулевой комиссии. На текущий код не влияет. | +| `COMM_QUARANTINE_ON_NONZERO` | `true` или `false` | `true` | boolean | При фактической брокерской комиссии `> 0` инструмент переводится в quarantine, а система останавливается через HALT по zero-commission policy. | | `COMM_FREE_ORDER_COUNT_POLICY` | `submitted` | `submitted` | жёстко только `submitted` | Политика учёта бесплатных заявок: счётчик увеличивается при отправке заявки. Другие значения запрещены валидацией. | ### BT @@ -192,7 +193,7 @@ TRUR,2024-01-09,100,101,99,100.5,10000 Для minute-модели используется тот же формат, но `trade_date` может быть timestamp (`2024-01-09T18:25:00Z` или `2024-01-09 18:25:00`). -`ClientOrderID` детерминирован по `(date, instrument_uid, side, attempt)` и содержит 8 hex символов SHA-256. Для дневного числа retry этого достаточно; при ручных массовых перезапусках с теми же параметрами id остаётся тем же, что намеренно подавляет дубли. +`ClientOrderID` детерминирован по `(date, instrument_uid, side, attempt)`, укладывается в лимит T-Invest `order_id <= 36` и содержит SHA-256 suffix. При ручных массовых перезапусках с теми же параметрами id остаётся тем же, что намеренно подавляет дубли. ## Deploy diff --git a/internal/app/app.go b/internal/app/app.go index 826f55b..b28b379 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -114,8 +114,9 @@ func Run(ctx context.Context, opts Options) error { } accountIDHash := accountHash(cfg.TInvest.AccountID) recon := reconciliation.New(repo, gateway, cfg.TInvest.AccountID, accountIDHash). - WithWindow(time.Duration(cfg.Risk.ReconciliationWindowHours) * time.Hour). - WithInFlightGrace(time.Duration(cfg.Risk.ReconciliationSkewSec) * time.Second) + WithWindow(time.Duration(cfg.Risk.ReconciliationWindowHours)*time.Hour). + WithInFlightGrace(time.Duration(cfg.Risk.ReconciliationSkewSec)*time.Second). + WithCommissionPolicy(cfg.Commission.RequireZeroCommission, cfg.Commission.QuarantineOnNonZero, cfg.Risk.CommissionToleranceRUB) diffs, err := recon.Run(ctx) if err != nil { return fmt.Errorf("pre-unhalt reconciliation: %w", err) @@ -155,8 +156,9 @@ func Run(ctx context.Context, opts Options) error { accountIDHash := accountHash(cfg.TInvest.AccountID) recon := reconciliation.New(repo, gateway, cfg.TInvest.AccountID, accountIDHash). - WithWindow(time.Duration(cfg.Risk.ReconciliationWindowHours) * time.Hour). - WithInFlightGrace(time.Duration(cfg.Risk.ReconciliationSkewSec) * time.Second) + WithWindow(time.Duration(cfg.Risk.ReconciliationWindowHours)*time.Hour). + WithInFlightGrace(time.Duration(cfg.Risk.ReconciliationSkewSec)*time.Second). + WithCommissionPolicy(cfg.Commission.RequireZeroCommission, cfg.Commission.QuarantineOnNonZero, cfg.Risk.CommissionToleranceRUB) sm := statemachine.New(repo, cfg.App.Mode) if _, err := sm.Recover(ctx, recon); err != nil { log.Warn("state recovery did not resume trading", "err", err) @@ -201,7 +203,8 @@ func buildScheduler(clock timeutil.Clock, sm statemachine.System, cfg config.Con Start: cfg.Execution.ExitWindowStart, End: cfg.Execution.ExitWindowEnd, }, - Location: cfg.Location, + IntervalVolumeLookback: 20, + Location: cfg.Location, }) signalEngine := signalengine.New(signalengine.Config{ MinTStat60: cfg.Strategy.MinTStat60, @@ -255,27 +258,30 @@ func buildScheduler(clock timeutil.Clock, sm statemachine.System, cfg config.Con Log: log, } return scheduler.New(clock, sm, scheduler.Config{ - Mode: cfg.App.Mode, - Location: cfg.Location, - RollingLong: cfg.Strategy.RollingLong, - TickInterval: 30 * time.Second, - EntrySignalTime: cfg.Execution.EntrySignalTime, - EntryWindowStart: cfg.Execution.EntryWindowStart, - EntryWindowEnd: cfg.Execution.EntryWindowEnd, - NoNewEntryAfter: cfg.Execution.NoNewEntryAfter, - ExitWatchStart: cfg.Execution.ExitWatchStart, - ExitWindowStart: cfg.Execution.ExitWindowStart, - ExitWindowEnd: cfg.Execution.ExitWindowEnd, - HardExitDeadline: cfg.Execution.HardExitDeadline, - QuoteDepth: cfg.Execution.QuoteDepth, - MaxQuoteAge: time.Duration(cfg.Execution.MaxQuoteAgeSec) * time.Second, - OrderPollInterval: time.Duration(cfg.Execution.OrderPollIntervalMS) * time.Millisecond, - PassiveImproveTicks: cfg.Execution.PassiveImproveTicks, - MaxEntryOrderAttempts: cfg.Execution.MaxEntryOrderAttempts, - MaxExitOrderAttempts: cfg.Execution.MaxExitOrderAttempts, - MinTimeToClose: time.Duration(cfg.Execution.MinTimeToCloseSec) * time.Second, - MaxClockDrift: time.Duration(cfg.Risk.MaxClockDriftSec) * time.Second, - APIOutageHalt: time.Duration(cfg.Risk.APIOutageHaltSec) * time.Second, + Mode: cfg.App.Mode, + Location: cfg.Location, + RollingLong: cfg.Strategy.RollingLong, + TickInterval: 30 * time.Second, + EntrySignalTime: cfg.Execution.EntrySignalTime, + EntryWindowStart: cfg.Execution.EntryWindowStart, + EntryWindowEnd: cfg.Execution.EntryWindowEnd, + NoNewEntryAfter: cfg.Execution.NoNewEntryAfter, + ExitWatchStart: cfg.Execution.ExitWatchStart, + ExitWindowStart: cfg.Execution.ExitWindowStart, + ExitWindowEnd: cfg.Execution.ExitWindowEnd, + HardExitDeadline: cfg.Execution.HardExitDeadline, + QuoteDepth: cfg.Execution.QuoteDepth, + MaxQuoteAge: time.Duration(cfg.Execution.MaxQuoteAgeSec) * time.Second, + OrderPollInterval: time.Duration(cfg.Execution.OrderPollIntervalMS) * time.Millisecond, + PassiveImproveTicks: cfg.Execution.PassiveImproveTicks, + MaxEntryOrderAttempts: cfg.Execution.MaxEntryOrderAttempts, + MaxExitOrderAttempts: cfg.Execution.MaxExitOrderAttempts, + MinTimeToClose: time.Duration(cfg.Execution.MinTimeToCloseSec) * time.Second, + MaxClockDrift: time.Duration(cfg.Risk.MaxClockDriftSec) * time.Second, + APIOutageHalt: time.Duration(cfg.Risk.APIOutageHaltSec) * time.Second, + RequireZeroCommission: cfg.Commission.RequireZeroCommission, + QuarantineOnNonZero: cfg.Commission.QuarantineOnNonZero, + ReconciliationInterval: 5 * time.Minute, }, services) } diff --git a/internal/backtest/engine.go b/internal/backtest/engine.go index 58f6574..5798584 100644 --- a/internal/backtest/engine.go +++ b/internal/backtest/engine.go @@ -45,6 +45,13 @@ type Config struct { AssumedTickBps decimal.Decimal Lot int64 UseMinuteModel bool + EntryWindow TimeWindow + ExitWindow TimeWindow +} + +type TimeWindow struct { + Start time.Duration + End time.Duration } type Trade struct { @@ -142,6 +149,12 @@ func (cfg Config) withDefaults() Config { if cfg.Lot == 0 { cfg.Lot = 1 } + if cfg.EntryWindow.Start == 0 && cfg.EntryWindow.End == 0 { + cfg.EntryWindow = TimeWindow{Start: durationOfDay(18, 20, 0), End: durationOfDay(18, 38, 30)} + } + if cfg.ExitWindow.Start == 0 && cfg.ExitWindow.End == 0 { + cfg.ExitWindow = TimeWindow{Start: durationOfDay(10, 5, 0), End: durationOfDay(10, 25, 0)} + } return cfg } @@ -153,8 +166,12 @@ func (e Engine) RunWithMinuteCandles(candlesByInstrument map[string][]domain.Can prepared := prepareCandles(candlesByInstrument) preparedMinutes := prepareCandles(minuteCandlesByInstrument) candidatesByExitDate := make(map[string][]candidate) + tradingDateSet := make(map[string]struct{}) for instrumentUID, candles := range prepared { for i := 1; i < len(candles); i++ { + if i >= e.cfg.RollingShort { + tradingDateSet[candles[i].TradeDate.Format("2006-01-02")] = struct{}{} + } candidate, ok, err := e.evaluateCandidate(instrumentUID, candles, i) if err != nil { return Result{}, err @@ -164,8 +181,8 @@ func (e Engine) RunWithMinuteCandles(candlesByInstrument map[string][]domain.Can } } } - dates := make([]string, 0, len(candidatesByExitDate)) - for date := range candidatesByExitDate { + dates := make([]string, 0, len(tradingDateSet)) + for date := range tradingDateSet { dates = append(dates, date) } sort.Strings(dates) @@ -239,15 +256,17 @@ func (e Engine) RunWithMinuteCandles(candlesByInstrument map[string][]domain.Can CapacityRUB: capacity, }) } - if !dayPnL.IsZero() { - equity = equity.Add(dayPnL) - cash = equity - points = append(points, Point{ - Date: date, - Equity: equity, - Return: dayPnL.Div(dayStartEquity), - }) + equity = equity.Add(dayPnL) + cash = equity + dayReturn := decimal.Zero + if dayStartEquity.IsPositive() { + dayReturn = dayPnL.Div(dayStartEquity) } + points = append(points, Point{ + Date: date, + Equity: equity, + Return: dayReturn, + }) } sort.Slice(trades, func(i, j int) bool { if trades[i].ExitDate == trades[j].ExitDate { @@ -266,8 +285,8 @@ func (e Engine) minuteExecution(c candidate, minutes []domain.Candle, requestedL if requestedLots <= 0 || len(minutes) == 0 { return 0, decimal.Zero, false } - entryLots, entryCapacity := e.fillableMinuteLots(minutes, c.entry.TradeDate, c.buy, domain.SideBuy) - exitLots, exitCapacity := e.fillableMinuteLots(minutes, c.exit.TradeDate, c.sell, domain.SideSell) + entryLots, entryCapacity := e.fillableMinuteLots(minutes, c.entry.TradeDate, c.buy, domain.SideBuy, e.cfg.EntryWindow) + exitLots, exitCapacity := e.fillableMinuteLots(minutes, c.exit.TradeDate, c.sell, domain.SideSell, e.cfg.ExitWindow) lots := min(requestedLots, entryLots) lots = min(lots, exitLots) if lots <= 0 { @@ -276,7 +295,7 @@ func (e Engine) minuteExecution(c candidate, minutes []domain.Candle, requestedL return lots, money.Min(entryCapacity, exitCapacity), true } -func (e Engine) fillableMinuteLots(minutes []domain.Candle, date time.Time, limitPrice decimal.Decimal, side domain.Side) (int64, decimal.Decimal) { +func (e Engine) fillableMinuteLots(minutes []domain.Candle, date time.Time, limitPrice decimal.Decimal, side domain.Side, window TimeWindow) (int64, decimal.Decimal) { if !limitPrice.IsPositive() || e.cfg.Lot <= 0 { return 0, decimal.Zero } @@ -289,6 +308,9 @@ func (e Engine) fillableMinuteLots(minutes []domain.Candle, date time.Time, limi if !sameDate(candle.TradeDate, date) { continue } + if !window.Contains(candle.TradeDate) { + continue + } reachable := side == domain.SideBuy && candle.Low.LessThanOrEqual(limitPrice) reachable = reachable || side == domain.SideSell && candle.High.GreaterThanOrEqual(limitPrice) if !reachable { @@ -300,6 +322,22 @@ func (e Engine) fillableMinuteLots(minutes []domain.Candle, date time.Time, limi return capacity.Div(lotNotional).Floor().IntPart(), capacity } +func (w TimeWindow) Contains(ts time.Time) bool { + if w.Start == 0 && w.End == 0 { + return true + } + tod := time.Duration(ts.Hour())*time.Hour + + time.Duration(ts.Minute())*time.Minute + + time.Duration(ts.Second())*time.Second + return tod >= w.Start && tod <= w.End +} + +func durationOfDay(hour, minute, second int) time.Duration { + return time.Duration(hour)*time.Hour + + time.Duration(minute)*time.Minute + + time.Duration(second)*time.Second +} + type candidate struct { instrumentUID string entry domain.Candle diff --git a/internal/backtest/lookahead_test.go b/internal/backtest/lookahead_test.go index 806f143..e50543b 100644 --- a/internal/backtest/lookahead_test.go +++ b/internal/backtest/lookahead_test.go @@ -51,6 +51,7 @@ func TestMinuteExecutionRequiresReachableLimitAndParticipation(t *testing.T) { } minutes := []domain.Candle{ {TradeDate: entryDate, Low: decimal.NewFromInt(99), High: decimal.NewFromInt(101), VolumeLots: decimal.NewFromInt(20)}, + {TradeDate: time.Date(2024, 1, 2, 12, 0, 0, 0, time.UTC), Low: decimal.NewFromInt(1), High: decimal.NewFromInt(200), VolumeLots: decimal.NewFromInt(1_000_000)}, {TradeDate: exitDate, Low: decimal.NewFromInt(104), High: decimal.NewFromInt(106), VolumeLots: decimal.NewFromInt(20)}, } lots, capacity, ok := engine.minuteExecution(c, minutes, 5) diff --git a/internal/config/config.go b/internal/config/config.go index 3e0748c..cdc2d3d 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -112,6 +112,7 @@ type RiskConfig struct { MaxClockDriftSec int `env:"MAX_CLOCK_DRIFT_SEC" envDefault:"2"` ReconciliationWindowHours int `env:"RECONCILIATION_WINDOW_HOURS" envDefault:"72"` ReconciliationSkewSec int `env:"RECONCILIATION_SKEW_SEC" envDefault:"10"` + CommissionToleranceRUB decimal.Decimal `env:"COMMISSION_TOLERANCE_RUB" envDefault:"0.01"` CashUsageBuffer decimal.Decimal `env:"CASH_USAGE_BUFFER" envDefault:"0.95"` RiskBudgetPerInstrumentPct decimal.Decimal `env:"RISK_BUDGET_PER_INSTRUMENT_PCT" envDefault:"0.005"` MinOrderNotionalRUB decimal.Decimal `env:"MIN_ORDER_NOTIONAL_RUB" envDefault:"1000"` @@ -198,6 +199,9 @@ func (c *Config) Validate() error { if c.Risk.ReconciliationSkewSec < 0 { return errors.New("RISK_RECONCILIATION_SKEW_SEC must be non-negative") } + if c.Risk.CommissionToleranceRUB.IsNegative() { + return errors.New("RISK_COMMISSION_TOLERANCE_RUB must be non-negative") + } if c.Commission.FreeOrderCountPolicy != "submitted" { return fmt.Errorf("COMM_FREE_ORDER_COUNT_POLICY must be submitted, got %q", c.Commission.FreeOrderCountPolicy) } @@ -210,6 +214,9 @@ func (c *Config) Validate() error { if (c.App.Mode == domain.ModeSandbox || c.App.Mode == domain.ModeLiveReadonly || c.App.Mode == domain.ModeLiveTrade) && c.TInvest.Token == "" { return fmt.Errorf("TINVEST_TOKEN is required for APP_MODE=%s", c.App.Mode) } + if (c.App.Mode == domain.ModeSandbox || c.App.Mode == domain.ModeLiveReadonly || c.App.Mode == domain.ModeLiveTrade) && c.TInvest.AccountID == "" { + return fmt.Errorf("TINVEST_ACCOUNT_ID is required for APP_MODE=%s", c.App.Mode) + } if c.TInvest.UseSandbox && c.App.Mode != domain.ModeSandbox { return errors.New("TINVEST_USE_SANDBOX=true is only valid with APP_MODE=sandbox") } diff --git a/internal/config/config_test.go b/internal/config/config_test.go new file mode 100644 index 0000000..455d635 --- /dev/null +++ b/internal/config/config_test.go @@ -0,0 +1,63 @@ +package config + +import ( + "strings" + "testing" + + "github.com/shopspring/decimal" + + "overnight-trading-bot/internal/domain" + "overnight-trading-bot/internal/timeutil" +) + +func TestValidateRequiresAccountIDForBrokerModes(t *testing.T) { + cfg := minimalBrokerConfig(domain.ModeSandbox) + cfg.TInvest.AccountID = "" + err := cfg.Validate() + if err == nil || !strings.Contains(err.Error(), "TINVEST_ACCOUNT_ID") { + t.Fatalf("Validate err=%v, want TINVEST_ACCOUNT_ID requirement", err) + } +} + +func minimalBrokerConfig(mode domain.Mode) Config { + return Config{ + App: AppConfig{ + Mode: mode, + Timezone: "Europe/Moscow", + ShutdownTimeoutSec: 30, + }, + TInvest: TInvestConfig{ + Token: "token", + AccountID: "account", + }, + DB: DBConfig{DSN: "user:pass@tcp(localhost:3306)/bot"}, + Execution: ExecutionConfig{ + EntrySignalTime: mustTOD("18:10:00"), + EntryWindowStart: mustTOD("18:20:00"), + EntryWindowEnd: mustTOD("18:38:30"), + NoNewEntryAfter: mustTOD("18:38:30"), + ExitWatchStart: mustTOD("09:50:00"), + ExitNotBefore: mustTOD("10:03:00"), + ExitWindowStart: mustTOD("10:05:00"), + ExitWindowEnd: mustTOD("10:25:00"), + HardExitDeadline: mustTOD("10:45:00"), + QuoteDepth: 20, + OrderPollIntervalMS: 500, + }, + Risk: RiskConfig{ + APIOutageHaltSec: 180, + ReconciliationWindowHours: 72, + ReconciliationSkewSec: 10, + CommissionToleranceRUB: decimal.NewFromFloat(0.01), + }, + Commission: CommissionConfig{FreeOrderCountPolicy: "submitted"}, + } +} + +func mustTOD(raw string) timeutil.TimeOfDay { + tod, err := timeutil.ParseTimeOfDay(raw) + if err != nil { + panic(err) + } + return tod +} diff --git a/internal/execution/engine.go b/internal/execution/engine.go index f84ea47..84d588e 100644 --- a/internal/execution/engine.go +++ b/internal/execution/engine.go @@ -11,6 +11,7 @@ import ( "overnight-trading-bot/internal/domain" "overnight-trading-bot/internal/repository" + "overnight-trading-bot/internal/risk" ) var ErrBrokerOrdersDisabled = errors.New("broker orders are disabled for current mode") @@ -113,6 +114,9 @@ func (e *Engine) PlaceLimit(ctx context.Context, order domain.Order) (domain.Ord return existing, nil } } + if e.mode == domain.ModePaper { + return e.placePaperLimit(ctx, order) + } if !e.mode.AllowsBrokerOrders() { order.Status = domain.OrderStatusNew if e.store != nil { @@ -159,6 +163,28 @@ func (e *Engine) PlaceLimit(ctx context.Context, order domain.Order) (domain.Ord return posted, nil } +func (e *Engine) placePaperLimit(ctx context.Context, order domain.Order) (domain.Order, error) { + now := time.Now().UTC() + order.BrokerOrderID = "paper-" + order.ClientOrderID + order.FilledLots = order.QuantityLots + order.AvgFillPrice = order.LimitPrice + order.Status = domain.OrderStatusFilled + order.RawStateJSON = `{"paper_fill":true}` + order.CreatedAt = now + order.UpdatedAt = now + if e.store != nil { + if err := e.store.RunInTx(ctx, func(ctx context.Context, repo repository.Repository) error { + if err := repo.UpsertOrder(ctx, order); err != nil { + return fmt.Errorf("persist paper order: %w", err) + } + return repo.IncrementFreeOrders(ctx, order.TradeDate, order.InstrumentUID, 1) + }); err != nil { + return domain.Order{}, err + } + } + return order, nil +} + func (e *Engine) findExisting(ctx context.Context, order domain.Order) (domain.Order, error) { orders, err := e.store.ListOrders(ctx, order.AccountIDHash, order.TradeDate, order.TradeDate) if err != nil { @@ -286,6 +312,9 @@ func (e *Engine) MonitorUntil(ctx context.Context, order domain.Order, cfg Monit } func (e *Engine) repost(ctx context.Context, order domain.Order, cfg MonitorConfig, remaining int64) (domain.Order, error) { + if err := e.ensureRepostBudget(ctx, order, cfg.Instrument); err != nil { + return domain.Order{}, err + } if err := e.Cancel(ctx, order); err != nil { return domain.Order{}, err } @@ -308,18 +337,28 @@ func (e *Engine) repost(ctx context.Context, order domain.Order, cfg MonitorConf } } +func (e *Engine) ensureRepostBudget(ctx context.Context, order domain.Order, instrument domain.Instrument) error { + if e.store == nil || instrument.FreeOrderLimitPerDay <= 0 { + return nil + } + sent, err := e.store.GetFreeOrdersSent(ctx, order.TradeDate, instrument.InstrumentUID) + if err != nil { + return err + } + if instrument.FreeOrderLimitPerDay-sent < 1 { + return fmt.Errorf("%w: %s remaining=0", risk.ErrFreeOrderBudget, instrument.InstrumentUID) + } + return nil +} + func (e *Engine) checkQuoteFresh(book domain.OrderBook) error { if e.maxQuoteAge <= 0 { return nil } - receivedAt := book.ReceivedAt - if receivedAt.IsZero() { - receivedAt = book.Time + if book.ReceivedAt.IsZero() { + return fmt.Errorf("quote received timestamp is missing") } - if receivedAt.IsZero() { - return fmt.Errorf("quote timestamp is missing") - } - age := time.Since(receivedAt) + age := time.Since(book.ReceivedAt) if age > e.maxQuoteAge { return fmt.Errorf("quote age %s exceeds %s", age, e.maxQuoteAge) } diff --git a/internal/execution/pricing.go b/internal/execution/pricing.go index 0d3b66f..e912282 100644 --- a/internal/execution/pricing.go +++ b/internal/execution/pricing.go @@ -4,7 +4,7 @@ import ( "crypto/sha256" "encoding/hex" "fmt" - "regexp" + "strconv" "strings" "time" @@ -14,7 +14,7 @@ import ( "overnight-trading-bot/internal/money" ) -var nonIDChar = regexp.MustCompile(`[^A-Za-z0-9_-]+`) +const maxClientOrderIDLen = 36 func LimitBuyPrice(bestBid, bestAsk, tick decimal.Decimal, improveTicks int) (decimal.Decimal, error) { if improveTicks < 0 { @@ -49,10 +49,25 @@ func LimitSellPrice(bestBid, bestAsk, tick decimal.Decimal, improveTicks int) (d func ClientOrderID(tradeDate time.Time, instrumentUID string, side domain.Side, attempt int) string { base := fmt.Sprintf("%s|%s|%s|%d", tradeDate.Format("20060102"), instrumentUID, side, attempt) sum := sha256.Sum256([]byte(base)) - suffix := hex.EncodeToString(sum[:])[:8] - cleanUID := nonIDChar.ReplaceAllString(instrumentUID, "_") - if len(cleanUID) > 24 { - cleanUID = cleanUID[:24] + suffix := hex.EncodeToString(sum[:]) + sideToken := "b" + if side == domain.SideSell { + sideToken = "s" } - return strings.ToLower(fmt.Sprintf("otb-%s-%s-%s-%02d-%s", tradeDate.Format("20060102"), cleanUID, side, attempt, suffix)) + prefix := fmt.Sprintf("otb-%s-%s-%s-", tradeDate.Format("20060102"), sideToken, attemptToken(attempt)) + return strings.ToLower(prefix + suffix[:maxClientOrderIDLen-len(prefix)]) +} + +func attemptToken(attempt int) string { + if attempt < 0 { + attempt = 0 + } + token := strings.ToLower(strconv.FormatInt(int64(attempt), 36)) + if len(token) > 2 { + token = token[len(token)-2:] + } + for len(token) < 2 { + token = "0" + token + } + return token } diff --git a/internal/execution/pricing_test.go b/internal/execution/pricing_test.go index fd3f916..a8ce7bd 100644 --- a/internal/execution/pricing_test.go +++ b/internal/execution/pricing_test.go @@ -40,10 +40,14 @@ func TestLimitPricesDoNotCross(t *testing.T) { func TestClientOrderIDDeterministic(t *testing.T) { date := time.Date(2024, 1, 2, 0, 0, 0, 0, time.UTC) - a := ClientOrderID(date, "uid", domain.SideBuy, 1) - b := ClientOrderID(date, "uid", domain.SideBuy, 1) - c := ClientOrderID(date, "uid", domain.SideBuy, 2) + longUID := "a-realistic-instrument-uid-that-is-much-longer-than-the-order-id-limit" + a := ClientOrderID(date, longUID, domain.SideBuy, 1) + b := ClientOrderID(date, longUID, domain.SideBuy, 1) + c := ClientOrderID(date, longUID, domain.SideBuy, 2) if a != b || a == c { t.Fatalf("unexpected ids: %s %s %s", a, b, c) } + if len(a) > maxClientOrderIDLen { + t.Fatalf("client order id len=%d, want <=%d: %s", len(a), maxClientOrderIDLen, a) + } } diff --git a/internal/execution/state_test.go b/internal/execution/state_test.go index 9c55a58..e478471 100644 --- a/internal/execution/state_test.go +++ b/internal/execution/state_test.go @@ -66,6 +66,36 @@ func TestPlaceLimitSuppressesDuplicateSubmit(t *testing.T) { } } +func TestPaperPlaceEntryFillsAndCountsSubmittedOrder(t *testing.T) { + ctx := context.Background() + repo := testutil.NewMemoryRepository() + engine := NewEngine(domain.ModePaper, "account", tinvest.NewFakeGateway(), repo) + tradeDate := time.Date(2026, 6, 6, 0, 0, 0, 0, time.UTC) + order, err := engine.PlaceEntry(ctx, "hash", domain.Instrument{ + InstrumentUID: "uid", + Lot: 1, + MinPriceIncrement: decimal.NewFromInt(1), + }, tradeDate, 2, domain.OrderBook{ + InstrumentUID: "uid", + Bids: []domain.OrderBookLevel{{Price: decimal.NewFromInt(99), QuantityLots: 10}}, + Asks: []domain.OrderBookLevel{{Price: decimal.NewFromInt(101), QuantityLots: 10}}, + ReceivedAt: time.Now().UTC(), + }, 1, 1) + if err != nil { + t.Fatal(err) + } + if order.Status != domain.OrderStatusFilled || order.FilledLots != 2 || order.BrokerOrderID == "" { + t.Fatalf("paper order=%+v, want filled broker-like order", order) + } + sent, err := repo.GetFreeOrdersSent(ctx, tradeDate, "uid") + if err != nil { + t.Fatal(err) + } + if sent != 1 { + t.Fatalf("free order counter=%d, want 1", sent) + } +} + func TestPlaceEntryRejectsStaleQuote(t *testing.T) { ctx := context.Background() engine := NewEngine(domain.ModeSandbox, "account", tinvest.NewFakeGateway(), testutil.NewMemoryRepository()) diff --git a/internal/features/pipeline.go b/internal/features/pipeline.go index 70944f4..fc38d3e 100644 --- a/internal/features/pipeline.go +++ b/internal/features/pipeline.go @@ -3,6 +3,7 @@ package features import ( "context" "fmt" + "sort" "time" "github.com/shopspring/decimal" @@ -12,6 +13,8 @@ import ( "overnight-trading-bot/internal/timeutil" ) +const defaultIntervalVolumeLookback = 20 + type PipelineConfig struct { RollingShort int RollingLong int @@ -22,6 +25,7 @@ type PipelineConfig struct { CommissionRoundtripBps decimal.Decimal EntryWindow timeutil.Window ExitWindow timeutil.Window + IntervalVolumeLookback int Location *time.Location } @@ -44,7 +48,7 @@ func (p Pipeline) Recompute(ctx context.Context, instrument domain.Instrument, t if err != nil { return domain.FeatureSet{}, err } - exitVolume, err := p.intervalVolume(ctx, instrument, tradeDate.AddDate(0, 0, 1), p.cfg.ExitWindow) + exitVolume, err := p.intervalVolume(ctx, instrument, tradeDate, p.cfg.ExitWindow) if err != nil { return domain.FeatureSet{}, err } @@ -66,13 +70,17 @@ func (p Pipeline) intervalVolume(ctx context.Context, instrument domain.Instrume if loc == nil { loc = time.UTC } - from := window.Start.On(date, loc).UTC() + lookback := p.cfg.IntervalVolumeLookback + if lookback <= 0 { + lookback = defaultIntervalVolumeLookback + } + from := window.Start.On(date.AddDate(0, 0, -lookback), loc).UTC() to := window.End.On(date, loc).UTC() candles, err := p.repo.ListMinuteCandles(ctx, instrument.InstrumentUID, from, to) if err != nil { return decimal.Zero, err } - return IntervalVolume(candles, instrument.Lot), nil + return AverageIntervalVolume(candles, instrument.Lot, window, loc), nil } func Compute(instrument domain.Instrument, candles []domain.Candle, tradeDate time.Time, spread SpreadResult, cfg PipelineConfig, entryVolume, exitVolume decimal.Decimal) (domain.FeatureSet, error) { @@ -146,3 +154,37 @@ func IntervalVolume(candles []domain.Candle, lot int64) decimal.Decimal { } return total } + +func AverageIntervalVolume(candles []domain.Candle, lot int64, window timeutil.Window, loc *time.Location) decimal.Decimal { + if lot <= 0 || len(candles) == 0 { + return decimal.Zero + } + if loc == nil { + loc = time.UTC + } + byDate := make(map[string][]domain.Candle) + for _, candle := range candles { + local := candle.TradeDate.In(loc) + tod := time.Duration(local.Hour())*time.Hour + + time.Duration(local.Minute())*time.Minute + + time.Duration(local.Second())*time.Second + if tod < window.Start.Duration || tod > window.End.Duration { + continue + } + key := local.Format("2006-01-02") + byDate[key] = append(byDate[key], candle) + } + if len(byDate) == 0 { + return decimal.Zero + } + keys := make([]string, 0, len(byDate)) + for key := range byDate { + keys = append(keys, key) + } + sort.Strings(keys) + sum := decimal.Zero + for _, key := range keys { + sum = sum.Add(IntervalVolume(byDate[key], lot)) + } + return sum.Div(decimal.NewFromInt(int64(len(keys)))) +} diff --git a/internal/features/pipeline_test.go b/internal/features/pipeline_test.go index 472efb5..ad365bb 100644 --- a/internal/features/pipeline_test.go +++ b/internal/features/pipeline_test.go @@ -7,6 +7,7 @@ import ( "github.com/shopspring/decimal" "overnight-trading-bot/internal/domain" + "overnight-trading-bot/internal/timeutil" ) func TestComputeExpectedCostIncludesCommissionAndSlippage(t *testing.T) { @@ -55,3 +56,28 @@ func TestIntervalVolume(t *testing.T) { t.Fatalf("interval volume=%s, want 6040", got) } } + +func TestAverageIntervalVolumeUsesExecutionWindowsAcrossDays(t *testing.T) { + loc := time.FixedZone("MSK", 3*60*60) + window := timeutil.Window{ + Start: mustTOD("18:20:00"), + End: mustTOD("18:40:00"), + } + candles := []domain.Candle{ + {TradeDate: time.Date(2026, 6, 1, 15, 20, 0, 0, time.UTC), Close: decimal.NewFromInt(100), VolumeLots: decimal.NewFromInt(10)}, + {TradeDate: time.Date(2026, 6, 1, 15, 50, 0, 0, time.UTC), Close: decimal.NewFromInt(999), VolumeLots: decimal.NewFromInt(999)}, + {TradeDate: time.Date(2026, 6, 2, 15, 25, 0, 0, time.UTC), Close: decimal.NewFromInt(200), VolumeLots: decimal.NewFromInt(10)}, + } + got := AverageIntervalVolume(candles, 1, window, loc) + if !got.Equal(decimal.NewFromInt(1500)) { + t.Fatalf("average interval volume=%s, want 1500", got) + } +} + +func mustTOD(raw string) timeutil.TimeOfDay { + tod, err := timeutil.ParseTimeOfDay(raw) + if err != nil { + panic(err) + } + return tod +} diff --git a/internal/healthcheck/healthcheck.go b/internal/healthcheck/healthcheck.go index 49c31bc..43078da 100644 --- a/internal/healthcheck/healthcheck.go +++ b/internal/healthcheck/healthcheck.go @@ -89,7 +89,8 @@ func CheckEndpoint(ctx context.Context, url string) error { if err != nil { return err } - resp, err := http.DefaultClient.Do(req) + client := http.Client{Timeout: 5 * time.Second} + resp, err := client.Do(req) if err != nil { return err } diff --git a/internal/instruments/registry.go b/internal/instruments/registry.go index 40bd04f..7b4f1e4 100644 --- a/internal/instruments/registry.go +++ b/internal/instruments/registry.go @@ -3,7 +3,6 @@ package instruments import ( "context" "fmt" - "strings" "overnight-trading-bot/internal/domain" "overnight-trading-bot/internal/repository" @@ -25,21 +24,19 @@ func (r Registry) SyncMetadata(ctx context.Context) error { return err } for _, instrument := range instruments { - if strings.HasPrefix(instrument.InstrumentUID, "PENDING:") || !instrument.MetadataValid() { - remote, err := r.gateway.GetInstrument(ctx, instrument.Ticker, instrument.ClassCode) - if err != nil { - return fmt.Errorf("sync %s: %w", instrument.Ticker, err) - } - remote.Enabled = instrument.Enabled && remote.Enabled - remote.FundType = instrument.FundType - remote.ExpectedCommissionBpsPerSide = instrument.ExpectedCommissionBpsPerSide - remote.FreeOrderLimitPerDay = instrument.FreeOrderLimitPerDay - remote.Quarantine = instrument.Quarantine - remote.QuarantineReason = instrument.QuarantineReason - remote.ExcludeReason = instrument.ExcludeReason - if err := r.repo.ReplaceInstrument(ctx, instrument.InstrumentUID, remote); err != nil { - return fmt.Errorf("replace synced instrument %s: %w", instrument.Ticker, err) - } + remote, err := r.gateway.GetInstrument(ctx, instrument.Ticker, instrument.ClassCode) + if err != nil { + return fmt.Errorf("sync %s: %w", instrument.Ticker, err) + } + remote.Enabled = instrument.Enabled && remote.Enabled + remote.FundType = instrument.FundType + remote.ExpectedCommissionBpsPerSide = instrument.ExpectedCommissionBpsPerSide + remote.FreeOrderLimitPerDay = instrument.FreeOrderLimitPerDay + remote.Quarantine = instrument.Quarantine + remote.QuarantineReason = instrument.QuarantineReason + remote.ExcludeReason = instrument.ExcludeReason + if err := r.repo.ReplaceInstrument(ctx, instrument.InstrumentUID, remote); err != nil { + return fmt.Errorf("replace synced instrument %s: %w", instrument.Ticker, err) } } return nil diff --git a/internal/marketdata/loader.go b/internal/marketdata/loader.go index b6d9b6c..26f2936 100644 --- a/internal/marketdata/loader.go +++ b/internal/marketdata/loader.go @@ -56,10 +56,10 @@ func (l Loader) LatestQuote(ctx context.Context, instrumentUID string, depth int if err != nil { return domain.OrderBook{}, err } - age := time.Since(book.ReceivedAt) if book.ReceivedAt.IsZero() { - age = time.Since(book.Time) + return domain.OrderBook{}, fmt.Errorf("quote received timestamp is missing") } + age := time.Since(book.ReceivedAt) if maxAge > 0 && age > maxAge { return domain.OrderBook{}, fmt.Errorf("quote age %s exceeds %s", age, maxAge) } diff --git a/internal/notify/notify.go b/internal/notify/notify.go index 75e9ec1..e1e397e 100644 --- a/internal/notify/notify.go +++ b/internal/notify/notify.go @@ -59,6 +59,9 @@ type outbound struct { func NewTelegram(cfg TelegramConfig, log *slog.Logger) (Notifier, error) { if cfg.BotToken == "" || cfg.ChatID == 0 { + if log != nil { + log.Warn("telegram notifier disabled; TELEGRAM_BOT_TOKEN or TELEGRAM_CHAT_ID is empty") + } return Noop{}, nil } bot, err := tgbotapi.NewBotAPI(cfg.BotToken) diff --git a/internal/reconciliation/engine.go b/internal/reconciliation/engine.go index d8cd29f..310d91e 100644 --- a/internal/reconciliation/engine.go +++ b/internal/reconciliation/engine.go @@ -16,16 +16,26 @@ import ( ) type Engine struct { - repo repository.Repository - gateway tinvest.Gateway - accountID string - accountIDHash string - window time.Duration - inFlightGrace time.Duration + repo repository.Repository + gateway tinvest.Gateway + accountID string + accountIDHash string + window time.Duration + inFlightGrace time.Duration + commissionTolerance decimal.Decimal + requireZeroCommission bool + quarantineOnNonZero bool } func New(repo repository.Repository, gateway tinvest.Gateway, accountID, accountIDHash string) Engine { - return Engine{repo: repo, gateway: gateway, accountID: accountID, accountIDHash: accountIDHash, window: 72 * time.Hour} + return Engine{ + repo: repo, + gateway: gateway, + accountID: accountID, + accountIDHash: accountIDHash, + window: 72 * time.Hour, + commissionTolerance: decimal.NewFromFloat(0.01), + } } func (e Engine) WithWindow(window time.Duration) Engine { @@ -42,6 +52,15 @@ func (e Engine) WithInFlightGrace(grace time.Duration) Engine { return e } +func (e Engine) WithCommissionPolicy(requireZero, quarantineOnNonZero bool, tolerance decimal.Decimal) Engine { + e.requireZeroCommission = requireZero + e.quarantineOnNonZero = quarantineOnNonZero + if !tolerance.IsNegative() { + e.commissionTolerance = tolerance + } + return e +} + func (e Engine) Run(ctx context.Context) ([]domain.ReconciliationDiff, error) { localOrders, err := e.repo.ListActiveOrders(ctx, e.accountIDHash) if err != nil { @@ -138,7 +157,17 @@ func (e Engine) Run(ctx context.Context) ([]domain.ReconciliationDiff, error) { if err != nil { return nil, err } - diffs = append(diffs, compareOperations(recentOrders, operations)...) + diffs = append(diffs, compareOperationsWithPolicy(recentOrders, operations, e.requireZeroCommission, e.commissionTolerance)...) + if e.requireZeroCommission && e.quarantineOnNonZero { + for _, diff := range diffs { + if diff.Kind != "actual_commission_nonzero" || diff.InstrumentUID == "" { + continue + } + if err := e.repo.QuarantineInstrument(ctx, diff.InstrumentUID, diff.Message); err != nil { + return nil, err + } + } + } raw, _ := json.Marshal(diffs) if err := e.repo.InsertReconciliation(ctx, now, string(raw), len(diffs) > 0); err != nil { return nil, err @@ -163,7 +192,14 @@ func HasCritical(diffs []domain.ReconciliationDiff) bool { } func compareOperations(orders []domain.Order, operations []domain.Operation) []domain.ReconciliationDiff { + return compareOperationsWithPolicy(orders, operations, false, decimal.NewFromFloat(0.01)) +} + +func compareOperationsWithPolicy(orders []domain.Order, operations []domain.Operation, requireZeroCommission bool, commissionTolerance decimal.Decimal) []domain.ReconciliationDiff { var diffs []domain.ReconciliationDiff + if commissionTolerance.IsNegative() { + commissionTolerance = decimal.Zero + } localCommissionByInstrument := make(map[string]decimal.Decimal) localTraded := make(map[string]bool) for _, order := range orders { @@ -192,7 +228,15 @@ func compareOperations(orders []domain.Order, operations []domain.Operation) []d for instrumentUID := range instruments { localCommission := localCommissionByInstrument[instrumentUID] brokerCommission := brokerCommissionByInstrument[instrumentUID] - if diff := money.Abs(localCommission.Sub(brokerCommission)); diff.GreaterThan(decimal.NewFromFloat(0.01)) { + if requireZeroCommission && brokerCommission.IsPositive() { + diffs = append(diffs, domain.ReconciliationDiff{ + Kind: "actual_commission_nonzero", + InstrumentUID: instrumentUID, + Message: fmt.Sprintf("broker commission=%s", brokerCommission.StringFixed(2)), + Critical: true, + }) + } + if diff := money.Abs(localCommission.Sub(brokerCommission)); diff.GreaterThan(commissionTolerance) { diffs = append(diffs, domain.ReconciliationDiff{ Kind: "commission_mismatch", InstrumentUID: instrumentUID, diff --git a/internal/reconciliation/engine_test.go b/internal/reconciliation/engine_test.go index 92febb0..4154284 100644 --- a/internal/reconciliation/engine_test.go +++ b/internal/reconciliation/engine_test.go @@ -101,6 +101,47 @@ func TestCompareOperationsCommissionPerInstrument(t *testing.T) { } } +func TestReconciliationQuarantinesOnNonZeroBrokerCommission(t *testing.T) { + ctx := context.Background() + repo := testutil.NewMemoryRepository() + gateway := tinvest.NewFakeGateway() + if err := repo.UpsertInstrument(ctx, domain.Instrument{ + InstrumentUID: "uid", + Ticker: "TRUR", + Enabled: true, + }); err != nil { + t.Fatal(err) + } + gateway.Operations = []domain.Operation{{ + InstrumentUID: "uid", + Type: "OPERATION_TYPE_BROKER_FEE", + Commission: decimal.NewFromFloat(0.01), + ExecutedAt: time.Now().UTC(), + }} + diffs, err := New(repo, gateway, "account", "hash"). + WithCommissionPolicy(true, true, decimal.NewFromFloat(0.01)). + Run(ctx) + if err != nil { + t.Fatal(err) + } + found := false + for _, diff := range diffs { + if diff.Kind == "actual_commission_nonzero" && diff.Critical { + found = true + } + } + if !found { + t.Fatalf("expected actual_commission_nonzero diff, got %+v", diffs) + } + instruments, err := repo.ListInstruments(ctx, true) + if err != nil { + t.Fatal(err) + } + if len(instruments) != 1 || !instruments[0].Quarantine { + t.Fatalf("instrument not quarantined: %+v", instruments) + } +} + func TestReconciliationSkipsFreshInFlightLocalOrders(t *testing.T) { ctx := context.Background() repo := testutil.NewMemoryRepository() diff --git a/internal/report/daily.go b/internal/report/daily.go index 3a965a7..d12a67c 100644 --- a/internal/report/daily.go +++ b/internal/report/daily.go @@ -1,7 +1,9 @@ package report import ( + "encoding/json" "fmt" + "sort" "strings" "time" @@ -15,6 +17,7 @@ type DailyInput struct { Mode domain.Mode Signals []domain.Signal Positions []domain.Position + Orders []domain.Order AverageSpreadBps decimal.Decimal AverageSlipBps decimal.Decimal RiskStatus string @@ -28,19 +31,134 @@ func ComposeDaily(input DailyInput) string { for _, signal := range input.Signals { fmt.Fprintf(&b, "- %s %s edge=%s reason=%s\n", signal.InstrumentUID, signal.Decision, signal.NetEdgeBps.StringFixed(2), signal.RejectReason) } + reasons := groupedReasons(input.Signals) + if len(reasons) > 0 { + fmt.Fprintf(&b, "Причины skip/reject:\n") + for _, reason := range sortedKeys(reasons) { + count := reasons[reason] + fmt.Fprintf(&b, "- %s: %d\n", reason, count) + } + } gross := decimal.Zero net := decimal.Zero commission := decimal.Zero + expectedByInstrument := expectedEdgeByInstrument(input.Signals) for _, pos := range input.Positions { gross = gross.Add(pos.GrossPnL) net = net.Add(pos.NetPnL) commission = commission.Add(pos.CommissionTotal) } + if len(input.Positions) > 0 { + fmt.Fprintf(&b, "Позиции:\n") + for _, pos := range input.Positions { + expected := expectedByInstrument[pos.InstrumentUID] + expectedError := pos.RealizedEdgeBps.Sub(expected) + fmt.Fprintf(&b, "- %s status=%s net=%s commission=%s realized_edge_bps=%s expected_error_bps=%s\n", + pos.InstrumentUID, + pos.Status, + pos.NetPnL.StringFixed(2), + pos.CommissionTotal.StringFixed(2), + pos.RealizedEdgeBps.StringFixed(2), + expectedError.StringFixed(2), + ) + } + } fmt.Fprintf(&b, "Gross PnL: %s\n", gross.StringFixed(2)) fmt.Fprintf(&b, "Net PnL: %s\n", net.StringFixed(2)) fmt.Fprintf(&b, "Комиссии: %s\n", commission.StringFixed(2)) - fmt.Fprintf(&b, "Средний spread: %s bps\n", input.AverageSpreadBps.StringFixed(2)) + averageSpread := input.AverageSpreadBps + if averageSpread.IsZero() { + averageSpread = averageContextDecimal(input.Signals, "spread_bps") + } + fmt.Fprintf(&b, "Средний spread: %s bps\n", averageSpread.StringFixed(2)) fmt.Fprintf(&b, "Среднее проскальзывание: %s bps\n", input.AverageSlipBps.StringFixed(2)) + writeExecutionErrors(&b, input.Orders) fmt.Fprintf(&b, "Risk: %s", input.RiskStatus) return b.String() } + +func groupedReasons(signals []domain.Signal) map[string]int { + out := make(map[string]int) + for _, sig := range signals { + if sig.Decision == domain.DecisionEnter || sig.RejectReason == "" { + continue + } + out[sig.RejectReason]++ + } + return out +} + +func sortedKeys(values map[string]int) []string { + keys := make([]string, 0, len(values)) + for key := range values { + keys = append(keys, key) + } + sort.Strings(keys) + return keys +} + +func expectedEdgeByInstrument(signals []domain.Signal) map[string]decimal.Decimal { + out := make(map[string]decimal.Decimal) + for _, sig := range signals { + if sig.Decision == domain.DecisionEnter { + out[sig.InstrumentUID] = sig.NetEdgeBps + } + } + return out +} + +func averageContextDecimal(signals []domain.Signal, key string) decimal.Decimal { + sum := decimal.Zero + count := int64(0) + for _, sig := range signals { + var context map[string]any + if err := json.Unmarshal([]byte(sig.ContextJSON), &context); err != nil { + continue + } + value, ok := decimalFromAny(context[key]) + if !ok { + continue + } + sum = sum.Add(value) + count++ + } + if count == 0 { + return decimal.Zero + } + return sum.Div(decimal.NewFromInt(count)) +} + +func decimalFromAny(value any) (decimal.Decimal, bool) { + switch typed := value.(type) { + case string: + parsed, err := decimal.NewFromString(typed) + return parsed, err == nil + case float64: + return decimal.NewFromFloat(typed), true + default: + return decimal.Zero, false + } +} + +func writeExecutionErrors(b *strings.Builder, orders []domain.Order) { + wroteHeader := false + for _, order := range orders { + if !isExecutionError(order.Status) { + continue + } + if !wroteHeader { + fmt.Fprintf(b, "Ошибки исполнения:\n") + wroteHeader = true + } + fmt.Fprintf(b, "- %s %s status=%s filled=%d/%d\n", order.InstrumentUID, order.Side, order.Status, order.FilledLots, order.QuantityLots) + } +} + +func isExecutionError(status domain.OrderStatus) bool { + switch status { + case domain.OrderStatusFailed, domain.OrderStatusRejected, domain.OrderStatusExpired: + return true + default: + return false + } +} diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index 0acee3f..5960b6e 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -30,32 +30,36 @@ import ( ) const ( - sizeReductionWindowTrades = 20 - sizeReductionFactor = 0.5 + sizeReductionWindowTrades = 20 + sizeReductionFactor = 0.5 + intervalVolumeLookbackDays = 20 ) type Config struct { - Mode domain.Mode - Location *time.Location - RollingLong int - TickInterval time.Duration - EntrySignalTime timeutil.TimeOfDay - EntryWindowStart timeutil.TimeOfDay - EntryWindowEnd timeutil.TimeOfDay - NoNewEntryAfter timeutil.TimeOfDay - ExitWatchStart timeutil.TimeOfDay - ExitWindowStart timeutil.TimeOfDay - ExitWindowEnd timeutil.TimeOfDay - HardExitDeadline timeutil.TimeOfDay - QuoteDepth int32 - MaxQuoteAge time.Duration - OrderPollInterval time.Duration - PassiveImproveTicks int - MaxEntryOrderAttempts int - MaxExitOrderAttempts int - MinTimeToClose time.Duration - MaxClockDrift time.Duration - APIOutageHalt time.Duration + Mode domain.Mode + Location *time.Location + RollingLong int + TickInterval time.Duration + EntrySignalTime timeutil.TimeOfDay + EntryWindowStart timeutil.TimeOfDay + EntryWindowEnd timeutil.TimeOfDay + NoNewEntryAfter timeutil.TimeOfDay + ExitWatchStart timeutil.TimeOfDay + ExitWindowStart timeutil.TimeOfDay + ExitWindowEnd timeutil.TimeOfDay + HardExitDeadline timeutil.TimeOfDay + QuoteDepth int32 + MaxQuoteAge time.Duration + OrderPollInterval time.Duration + PassiveImproveTicks int + MaxEntryOrderAttempts int + MaxExitOrderAttempts int + MinTimeToClose time.Duration + MaxClockDrift time.Duration + APIOutageHalt time.Duration + RequireZeroCommission bool + QuarantineOnNonZero bool + ReconciliationInterval time.Duration } type Services struct { @@ -84,6 +88,7 @@ type Scheduler struct { svc Services infraFailedSince time.Time + lastReconciledAt time.Time } func New(clock timeutil.Clock, sm statemachine.System, cfg Config, svc Services) Scheduler { @@ -93,6 +98,9 @@ func New(clock timeutil.Clock, sm statemachine.System, cfg Config, svc Services) if cfg.Location == nil { cfg.Location = time.UTC } + if cfg.ReconciliationInterval <= 0 { + cfg.ReconciliationInterval = 5 * time.Minute + } return Scheduler{clock: clock, sm: sm, cfg: cfg, svc: svc} } @@ -181,8 +189,8 @@ func (s *Scheduler) prepareSignals(ctx context.Context, now time.Time) error { if err := s.svc.MarketData.BackfillDaily(ctx, instrumentsList, tradeDate.AddDate(0, 0, -s.cfg.RollingLong-10), tradeDate); err != nil { return err } - minuteFrom := s.cfg.EntryWindowStart.On(tradeDate, s.cfg.Location) - minuteTo := s.cfg.ExitWindowEnd.On(tradeDate.AddDate(0, 0, 1), s.cfg.Location) + minuteFrom := s.cfg.EntryWindowStart.On(tradeDate.AddDate(0, 0, -intervalVolumeLookbackDays), s.cfg.Location) + minuteTo := s.cfg.ExitWindowEnd.On(tradeDate, s.cfg.Location) if err := s.svc.MarketData.BackfillMinute(ctx, instrumentsList, minuteFrom, minuteTo); err != nil { s.logWarn("minute backfill failed; liquidity will fall back to ADV", "err", err) } @@ -222,7 +230,7 @@ func (s Scheduler) generateInstrumentSignal(ctx context.Context, now, tradeDate if err != nil { return s.saveRejectedSignal(ctx, tradeDate, instrument, "features_unavailable", err) } - remaining, err := s.svc.FreeOrders.Check(ctx, tradeDate, instrument, 1) + remaining, err := s.svc.FreeOrders.Check(ctx, tradeDate, instrument, s.maxOrderAttemptsPerTrade()) freeOrderOK := err == nil sig := s.svc.Signals.Evaluate(signal.Candidate{ Instrument: instrument, @@ -234,6 +242,7 @@ func (s Scheduler) generateInstrumentSignal(ctx context.Context, now, tradeDate ExtraContext: map[string]any{ "free_orders_remaining": remaining, "quote_time": book.Time.Format(time.RFC3339), + "spread_bps": spread.SpreadBps.String(), }, }) if sig.Decision == domain.DecisionEnter { @@ -244,6 +253,9 @@ func (s Scheduler) generateInstrumentSignal(ctx context.Context, now, tradeDate sig.RejectReason = sizingErr.Error() case sized.Lots <= 0: sig.Decision = domain.DecisionReject + if isSizingSkipReason(sized.Reason) { + sig.Decision = domain.DecisionSkip + } sig.RejectReason = sized.Reason default: sig.TargetLots = sized.Lots @@ -288,11 +300,15 @@ func (s Scheduler) sizeSignal(_ context.Context, portfolio domain.Portfolio, ins }), nil } -func (s Scheduler) placeEntryOrders(ctx context.Context, now time.Time) error { +func (s *Scheduler) placeEntryOrders(ctx context.Context, now time.Time) error { if err := s.transitionTo(ctx, domain.StatePlaceEntryOrders); err != nil { return err } tradeDate := tradingDate(now) + entryDeadline := s.cfg.NoNewEntryAfter.On(now, s.cfg.Location).UTC() + if !s.nowUTC().Before(entryDeadline) { + return s.closeEntryWindow(ctx) + } signals, err := s.svc.Repo.ListSignals(ctx, tradeDate) if err != nil { return err @@ -317,6 +333,21 @@ func (s Scheduler) placeEntryOrders(ctx context.Context, now time.Time) error { if !ok { return fmt.Errorf("instrument %s is not in registry", sig.InstrumentUID) } + if !s.nowUTC().Before(entryDeadline) { + return s.closeEntryWindow(ctx) + } + if _, err := s.svc.FreeOrders.Check(ctx, tradeDate, instrument, s.maxOrderAttemptsPerTrade()); err != nil { + if insertErr := s.svc.Repo.InsertRiskEvent(ctx, domain.RiskEvent{ + Severity: domain.SeverityWarn, + EventType: "pre_trade_reject", + InstrumentUID: sig.InstrumentUID, + Message: err.Error(), + ContextJSON: `{"reason":"free_order_budget_insufficient"}`, + }); insertErr != nil { + return insertErr + } + continue + } book, err := s.svc.MarketData.LatestQuote(ctx, sig.InstrumentUID, s.cfg.QuoteDepth, s.cfg.MaxQuoteAge) if err != nil { return err @@ -354,12 +385,17 @@ func (s Scheduler) placeEntryOrders(ctx context.Context, now time.Time) error { return err } _ = s.svc.Notifier.Info(ctx, fmt.Sprintf("entry order %s %s lots=%d status=%s", instrument.Ticker, placed.Side, placed.QuantityLots, placed.Status)) + if placed.FilledLots > 0 { + if err := s.recordEntryFill(ctx, instrument, placed); err != nil { + return err + } + } existing = append(existing, placed) } return s.transitionTo(ctx, domain.StateMonitorEntryOrders) } -func (s Scheduler) monitorEntryOrders(ctx context.Context, now time.Time) error { +func (s *Scheduler) monitorEntryOrders(ctx context.Context, now time.Time) error { if err := s.transitionTo(ctx, domain.StateMonitorEntryOrders); err != nil { return err } @@ -372,6 +408,9 @@ func (s Scheduler) monitorEntryOrders(ctx context.Context, now time.Time) error return err } deadline := s.cfg.NoNewEntryAfter.On(now, s.cfg.Location).UTC() + if !s.nowUTC().Before(deadline) { + return s.closeEntryWindow(ctx) + } for _, order := range orders { if order.Side != domain.SideBuy || order.BrokerOrderID == "" { continue @@ -395,18 +434,13 @@ func (s Scheduler) monitorEntryOrders(ctx context.Context, now time.Time) error return err } if monitored.FilledLots > order.FilledLots || monitored.Commission.GreaterThan(order.Commission) { - pos, err := s.svc.Positions.OnEntryFill(ctx, s.svc.AccountIDHash, instrument, monitored) - if err != nil { + if err := s.recordEntryFill(ctx, instrument, monitored); err != nil { return err } - _ = s.svc.Notifier.Info(ctx, fmt.Sprintf("entry fill %s lots=%d status=%s", monitored.InstrumentUID, monitored.FilledLots, pos.Status)) } } if sinceMidnight(s.nowUTC().In(s.cfg.Location)) >= s.cfg.NoNewEntryAfter.Duration { - if err := s.cancelActiveOrders(ctx, domain.SideBuy, domain.OrderStatusCancelled, "entry_window_closed"); err != nil { - return err - } - return s.transitionTo(ctx, domain.StateHoldOvernight) + return s.closeEntryWindow(ctx) } return nil } @@ -415,14 +449,14 @@ func (s Scheduler) waitExit(ctx context.Context, _ time.Time) error { return s.transitionTo(ctx, domain.StateWaitExitWindow) } -func (s Scheduler) holdOvernight(ctx context.Context) error { - if err := s.cancelActiveOrders(ctx, domain.SideBuy, domain.OrderStatusCancelled, "entry_window_closed"); err != nil { +func (s *Scheduler) holdOvernight(ctx context.Context) error { + if err := s.closeEntryWindow(ctx); err != nil { return err } - return s.transitionTo(ctx, domain.StateHoldOvernight) + return s.periodicReconcile(ctx) } -func (s Scheduler) placeExitOrders(ctx context.Context, now time.Time) error { +func (s *Scheduler) placeExitOrders(ctx context.Context, now time.Time) error { if err := s.transitionTo(ctx, domain.StatePlaceExitOrders); err != nil { return err } @@ -473,6 +507,13 @@ func (s Scheduler) placeExitOrders(ctx context.Context, now time.Time) error { if err != nil && !errors.Is(err, execution.ErrBrokerOrdersDisabled) { return err } + if placed.FilledLots > 0 || placed.Commission.IsPositive() { + if err := s.recordExitFill(ctx, pos, placed); err != nil { + return err + } + existing = append(existing, placed) + continue + } pos.Status = domain.PositionExitOrderSent if err := s.svc.Repo.UpsertPosition(ctx, pos); err != nil { return err @@ -483,7 +524,7 @@ func (s Scheduler) placeExitOrders(ctx context.Context, now time.Time) error { return s.transitionTo(ctx, domain.StateMonitorExitOrders) } -func (s Scheduler) monitorExitOrders(ctx context.Context, now time.Time) error { +func (s *Scheduler) monitorExitOrders(ctx context.Context, now time.Time) error { if err := s.transitionTo(ctx, domain.StateMonitorExitOrders); err != nil { return err } @@ -535,12 +576,11 @@ func (s Scheduler) monitorExitOrders(ctx context.Context, now time.Time) error { if !ok { return fmt.Errorf("exit fill for unknown local position %s", monitored.InstrumentUID) } - updated, err := s.svc.Positions.OnExitFill(ctx, pos, fill) + updated, err := s.recordExitFillWithPosition(ctx, pos, fill) if err != nil { return err } positionByInstrument[monitored.InstrumentUID] = updated - _ = s.svc.Notifier.Info(ctx, fmt.Sprintf("exit fill %s lots=%d status=%s pnl=%s", monitored.InstrumentUID, monitored.FilledLots, updated.Status, updated.NetPnL.StringFixed(2))) } } if sinceMidnight(s.nowUTC().In(s.cfg.Location)) >= s.cfg.HardExitDeadline.Duration { @@ -550,16 +590,6 @@ func (s Scheduler) monitorExitOrders(ctx context.Context, now time.Time) error { } func (s *Scheduler) reconcileAndReport(ctx context.Context, now time.Time) error { - if err := s.transitionTo(ctx, domain.StateReconcile); err != nil { - return err - } - diffs, err := s.svc.Reconcile.Run(ctx) - if err != nil { - return err - } - if reconciliation.HasCritical(diffs) { - return s.halt(ctx, "reconciliation_critical", "critical reconciliation diff", "") - } tradeDate := tradingDate(now) sent, err := s.svc.Repo.WasDailyReportSent(ctx, tradeDate, s.svc.AccountIDHash) if err != nil { @@ -569,6 +599,28 @@ func (s *Scheduler) reconcileAndReport(ctx context.Context, now time.Time) error s.logWarn("daily report already sent; skipping duplicate", "date", tradeDate.Format("2006-01-02")) return s.transitionTo(ctx, domain.StateSleep) } + if err := s.transitionTo(ctx, domain.StateReconcile); err != nil { + return err + } + if err := s.reconcileCritical(ctx, "reconciliation_critical"); err != nil { + return err + } + return s.sendDailyReport(ctx, now, "ok") +} + +func (s *Scheduler) sendDailyReport(ctx context.Context, now time.Time, riskStatus string) error { + tradeDate := tradingDate(now) + sent, err := s.svc.Repo.WasDailyReportSent(ctx, tradeDate, s.svc.AccountIDHash) + if err != nil { + return err + } + if sent { + s.logWarn("daily report already sent; skipping duplicate", "date", tradeDate.Format("2006-01-02")) + if !s.hasStateMachine() { + return nil + } + return s.transitionTo(ctx, domain.StateSleep) + } signals, err := s.svc.Repo.ListSignals(ctx, tradeDate) if err != nil && !errors.Is(err, sql.ErrNoRows) { return err @@ -577,18 +629,25 @@ func (s *Scheduler) reconcileAndReport(ctx context.Context, now time.Time) error if err != nil { return err } + orders, err := s.svc.Repo.ListOrders(ctx, s.svc.AccountIDHash, tradeDate.AddDate(0, 0, -1), tradeDate) + if err != nil { + return err + } if err := s.applySizeReductionRule(ctx, tradeDate, true); err != nil { return err } - if err := s.transitionTo(ctx, domain.StateReport); err != nil { - return err + if s.hasStateMachine() { + if err := s.transitionTo(ctx, domain.StateReport); err != nil { + return err + } } msg := report.ComposeDaily(report.DailyInput{ Date: tradeDate, Mode: s.cfg.Mode, Signals: signals, Positions: positionsList, - RiskStatus: "ok", + Orders: orders, + RiskStatus: riskStatus, }) if err := s.svc.Notifier.Report(ctx, msg); err != nil { return err @@ -596,6 +655,9 @@ func (s *Scheduler) reconcileAndReport(ctx context.Context, now time.Time) error if err := s.svc.Repo.MarkDailyReportSent(ctx, tradeDate, s.svc.AccountIDHash); err != nil { return err } + if !s.hasStateMachine() { + return nil + } return s.transitionTo(ctx, domain.StateSleep) } @@ -675,6 +737,7 @@ func (s *Scheduler) checkInfrastructure(ctx context.Context) error { serverTime, err := s.svc.Gateway.GetServerTime(ctx) if err != nil { if s.cfg.Mode == domain.ModePaper { + s.infraFailedSince = time.Time{} return nil } return s.recordInfrastructureFailure(fmt.Errorf("server_time_unavailable: %w", err)) @@ -737,7 +800,96 @@ func (s Scheduler) cancelActiveOrders(ctx context.Context, side domain.Side, fal return nil } -func (s Scheduler) failOpenPositionsAtHardDeadline(ctx context.Context) error { +func (s Scheduler) closeEntryWindow(ctx context.Context) error { + if err := s.cancelActiveOrders(ctx, domain.SideBuy, domain.OrderStatusCancelled, "entry_window_closed"); err != nil { + return err + } + return s.transitionTo(ctx, domain.StateHoldOvernight) +} + +func (s *Scheduler) recordEntryFill(ctx context.Context, instrument domain.Instrument, order domain.Order) error { + pos, err := s.svc.Positions.OnEntryFill(ctx, s.svc.AccountIDHash, instrument, order) + if err != nil { + return err + } + _ = s.svc.Notifier.Info(ctx, fmt.Sprintf("entry fill %s lots=%d status=%s", order.InstrumentUID, order.FilledLots, pos.Status)) + if err := s.handleCommission(ctx, order.InstrumentUID, order.Commission); err != nil { + return err + } + return s.reconcileAfterFill(ctx) +} + +func (s *Scheduler) recordExitFill(ctx context.Context, pos domain.Position, order domain.Order) error { + _, err := s.recordExitFillWithPosition(ctx, pos, order) + return err +} + +func (s *Scheduler) recordExitFillWithPosition(ctx context.Context, pos domain.Position, fill domain.Order) (domain.Position, error) { + updated, err := s.svc.Positions.OnExitFill(ctx, pos, fill) + if err != nil { + return domain.Position{}, err + } + _ = s.svc.Notifier.Info(ctx, fmt.Sprintf("exit fill %s lots=%d status=%s pnl=%s", fill.InstrumentUID, fill.FilledLots, updated.Status, updated.NetPnL.StringFixed(2))) + if err := s.handleCommission(ctx, fill.InstrumentUID, fill.Commission); err != nil { + return domain.Position{}, err + } + if err := s.reconcileAfterFill(ctx); err != nil { + return domain.Position{}, err + } + return updated, nil +} + +func (s *Scheduler) handleCommission(ctx context.Context, instrumentUID string, commission decimal.Decimal) error { + if !risk.CommissionBreached(commission, s.cfg.RequireZeroCommission) { + return nil + } + reason := fmt.Sprintf("actual commission %s > 0", commission.StringFixed(2)) + if s.cfg.QuarantineOnNonZero { + if err := s.svc.Repo.QuarantineInstrument(ctx, instrumentUID, reason); err != nil { + return err + } + } + return s.halt(ctx, "actual_commission_nonzero", reason, instrumentUID) +} + +func (s *Scheduler) reconcileAfterFill(ctx context.Context) error { + if !s.cfg.Mode.AllowsBrokerOrders() { + return nil + } + return s.reconcileCritical(ctx, "reconciliation_after_fill_critical") +} + +func (s *Scheduler) periodicReconcile(ctx context.Context) error { + if !s.cfg.Mode.AllowsBrokerOrders() { + return nil + } + now := s.nowUTC() + if !s.lastReconciledAt.IsZero() && now.Sub(s.lastReconciledAt) < s.cfg.ReconciliationInterval { + return nil + } + return s.reconcileCritical(ctx, "periodic_reconciliation_critical") +} + +func (s *Scheduler) reconcileCritical(ctx context.Context, eventType string) error { + diffs, err := s.svc.Reconcile.Run(ctx) + if err != nil { + return err + } + s.lastReconciledAt = s.nowUTC() + for _, diff := range diffs { + if diff.Kind == "actual_commission_nonzero" && diff.InstrumentUID != "" && s.cfg.QuarantineOnNonZero { + if err := s.svc.Repo.QuarantineInstrument(ctx, diff.InstrumentUID, diff.Message); err != nil { + return err + } + } + } + if reconciliation.HasCritical(diffs) { + return s.halt(ctx, eventType, "critical reconciliation diff", "") + } + return nil +} + +func (s *Scheduler) failOpenPositionsAtHardDeadline(ctx context.Context) error { if err := s.cancelActiveOrders(ctx, domain.SideSell, domain.OrderStatusExpired, "hard_exit_deadline_cancel"); err != nil { return err } @@ -763,6 +915,9 @@ func (s Scheduler) failOpenPositionsAtHardDeadline(ctx context.Context) error { if len(failed) == 0 { return s.reconcileAndReport(ctx, s.nowUTC().In(s.cfg.Location)) } + if err := s.sendDailyReport(ctx, s.nowUTC().In(s.cfg.Location), "hard_exit_deadline_missed"); err != nil { + s.logWarn("daily report failed after hard deadline", "err", err) + } return s.svc.Risk.Halt(ctx, s.cfg.Mode, "hard_exit_deadline_missed", fmt.Sprintf("%d positions remain open after hard deadline", len(failed)), "") } @@ -791,6 +946,22 @@ func repostAfter(now, deadline time.Time, attempts int, poll time.Duration) time return after } +func (s Scheduler) maxOrderAttemptsPerTrade() int { + needed := s.cfg.MaxEntryOrderAttempts + s.cfg.MaxExitOrderAttempts + if needed <= 0 { + return 1 + } + return needed +} + +func isSizingSkipReason(reason string) bool { + return reason == "lots_below_one" || reason == "min_order_notional" +} + +func (s Scheduler) hasStateMachine() bool { + return s.sm != (statemachine.System{}) +} + func (s Scheduler) transitionSequence(ctx context.Context, states ...domain.SystemState) error { for _, state := range states { if err := s.transitionTo(ctx, state); err != nil { @@ -812,9 +983,6 @@ func (s Scheduler) transitionTo(ctx context.Context, to domain.SystemState) erro return s.sm.Heartbeat(ctx, to) } if err := s.sm.Transition(ctx, from, to); err != nil { - if errors.Is(err, statemachine.ErrIllegalTransition) { - return s.sm.Heartbeat(ctx, to) - } return err } return nil diff --git a/internal/scheduler/scheduler_test.go b/internal/scheduler/scheduler_test.go index d3cfaf8..286947a 100644 --- a/internal/scheduler/scheduler_test.go +++ b/internal/scheduler/scheduler_test.go @@ -80,6 +80,9 @@ func TestReconcileAndReportIsIdempotentPerDate(t *testing.T) { gateway := tinvest.NewFakeGateway() notifier := &countNotifier{} recon := reconciliation.New(repo, gateway, "account", "hash") + if err := repo.SaveSystemState(ctx, domain.StateMonitorExitOrders, domain.ModePaper, false, "", "{}"); err != nil { + t.Fatal(err) + } s := Scheduler{ cfg: Config{Mode: domain.ModePaper, Location: time.UTC}, sm: statemachine.New(repo, domain.ModePaper), @@ -168,6 +171,9 @@ func TestHardDeadlineMarksOpenPositionFailedAndHalts(t *testing.T) { if notifier.alerts != 1 { t.Fatalf("alerts=%d, want 1", notifier.alerts) } + if notifier.reports != 1 { + t.Fatalf("reports=%d, want daily report before HALT", notifier.reports) + } } func TestHoldOvernightCancelsActiveBuyOrders(t *testing.T) { @@ -195,6 +201,9 @@ func TestHoldOvernightCancelsActiveBuyOrders(t *testing.T) { AccountIDHash: "hash", }, } + if err := repo.SaveSystemState(ctx, domain.StateMonitorEntryOrders, domain.ModePaper, false, "", "{}"); err != nil { + t.Fatal(err) + } if err := s.holdOvernight(ctx); err != nil { t.Fatal(err) } @@ -207,6 +216,47 @@ func TestHoldOvernightCancelsActiveBuyOrders(t *testing.T) { } } +func TestNonZeroCommissionQuarantinesInstrumentAndHalts(t *testing.T) { + ctx := context.Background() + repo := testutil.NewMemoryRepository() + if err := repo.UpsertInstrument(ctx, domain.Instrument{ + InstrumentUID: "uid", + Ticker: "TRUR", + Enabled: true, + }); err != nil { + t.Fatal(err) + } + notifier := &countNotifier{} + s := Scheduler{ + cfg: Config{ + Mode: domain.ModePaper, + RequireZeroCommission: true, + QuarantineOnNonZero: true, + }, + svc: Services{ + Repo: repo, + Risk: risk.NewManager(repo, risk.ManagerConfig{}), + Notifier: notifier, + }, + } + if err := s.handleCommission(ctx, "uid", decimal.NewFromFloat(0.01)); err != nil { + t.Fatal(err) + } + if !repo.Halted || repo.State != domain.StateHalted { + t.Fatalf("system not halted: state=%s halted=%v", repo.State, repo.Halted) + } + instruments, err := repo.ListInstruments(ctx, true) + if err != nil { + t.Fatal(err) + } + if len(instruments) != 1 || !instruments[0].Quarantine { + t.Fatalf("instrument not quarantined: %+v", instruments) + } + if notifier.alerts != 1 { + t.Fatalf("alerts=%d, want 1", notifier.alerts) + } +} + func TestSizeReductionRuleCutsSizerAfterBadExpectedErrors(t *testing.T) { ctx := context.Background() repo := testutil.NewMemoryRepository() diff --git a/internal/tinvest/real.go b/internal/tinvest/real.go index 157ff36..58fc8d0 100644 --- a/internal/tinvest/real.go +++ b/internal/tinvest/real.go @@ -251,7 +251,7 @@ func (g *RealGateway) GetPortfolio(ctx context.Context, accountID string) (domai for _, position := range positions { holdings = append(holdings, domain.Holding{ InstrumentUID: position.GetInstrumentUid(), - QuantityLots: money.QuotationToDecimal(position.GetQuantity()).IntPart(), + QuantityLots: portfolioQuantityLots(position), AveragePrice: money.MoneyValueToDecimal(position.GetAveragePositionPrice()), MarketValue: money.MoneyValueToDecimal(position.GetCurrentPrice()).Mul(money.QuotationToDecimal(position.GetQuantity())), }) @@ -337,6 +337,29 @@ func rubMoneyValueToDecimal(value *pb.MoneyValue) (decimal.Decimal, error) { return money.MoneyValueToDecimal(value), nil } +func portfolioQuantityLots(position *pb.PortfolioPosition) int64 { + if position == nil { + return 0 + } + if lots, ok := portfolioDeprecatedQuantityLots(position); ok { + return lots.IntPart() + } + return money.QuotationToDecimal(position.GetQuantity()).IntPart() +} + +func portfolioDeprecatedQuantityLots(position *pb.PortfolioPosition) (decimal.Decimal, bool) { + message := position.ProtoReflect() + field := message.Descriptor().Fields().ByName("quantity_lots") + if field == nil || !message.Has(field) { + return decimal.Zero, false + } + quotation, ok := message.Get(field).Message().Interface().(*pb.Quotation) + if !ok || quotation == nil { + return decimal.Zero, false + } + return money.QuotationToDecimal(quotation), true +} + func serverTimeFromHeader(header map[string][]string) (time.Time, bool) { for _, key := range []string{"date", "Date"} { values := header[key]