diff --git a/internal/execution/engine.go b/internal/execution/engine.go index aeb100a..689aaf6 100644 --- a/internal/execution/engine.go +++ b/internal/execution/engine.go @@ -192,7 +192,14 @@ func (e *Engine) placeLimit(ctx context.Context, order domain.Order, freeOrderLi if err != nil { draft.Status = domain.OrderStatusFailed if e.store != nil { - _ = e.store.UpsertOrder(ctx, draft) + if persistErr := e.store.RunInTx(ctx, func(ctx context.Context, repo repository.Repository) error { + if err := repo.UpsertOrder(ctx, draft); err != nil { + return fmt.Errorf("persist failed order: %w", err) + } + return repo.IncrementFreeOrders(ctx, order.TradeDate, order.InstrumentUID, -1) + }); persistErr != nil { + return domain.Order{}, errors.Join(err, fmt.Errorf("rollback failed order reservation: %w", persistErr)) + } } return domain.Order{}, err } diff --git a/internal/execution/state_test.go b/internal/execution/state_test.go index 8a969b3..e09f010 100644 --- a/internal/execution/state_test.go +++ b/internal/execution/state_test.go @@ -111,6 +111,59 @@ func TestPlaceEntryReservesFreeOrderBudgetAtomically(t *testing.T) { } } +func TestPlaceEntryReleasesFreeOrderBudgetWhenBrokerRejects(t *testing.T) { + ctx := context.Background() + repo := testutil.NewMemoryRepository() + engine := NewEngine(domain.ModeSandbox, "account", postErrorGateway{err: errors.New("broker rejected")}, repo) + instrument := domain.Instrument{ + InstrumentUID: "uid", + Lot: 1, + MinPriceIncrement: decimal.NewFromInt(1), + FreeOrderLimitPerDay: 1, + } + book := domain.OrderBook{ + InstrumentUID: "uid", + Bids: []domain.OrderBookLevel{{Price: decimal.NewFromInt(99), QuantityLots: 10}}, + Asks: []domain.OrderBookLevel{{Price: decimal.NewFromInt(101), QuantityLots: 10}}, + ReceivedAt: time.Now().UTC(), + } + tradeDate := time.Date(2026, 6, 6, 0, 0, 0, 0, time.UTC) + _, err := engine.PlaceEntry(ctx, "hash", instrument, tradeDate, 1, book, 1, 1) + if err == nil { + t.Fatal("expected broker error") + } + sent, err := repo.GetFreeOrdersSent(ctx, tradeDate, "uid") + if err != nil { + t.Fatal(err) + } + if sent != 0 { + t.Fatalf("free order counter=%d, want rollback to 0", sent) + } + orders, err := repo.ListOrders(ctx, "hash", tradeDate, tradeDate) + if err != nil { + t.Fatal(err) + } + if len(orders) != 1 || orders[0].Status != domain.OrderStatusFailed { + t.Fatalf("orders=%+v, want one failed local order", orders) + } +} + +type postErrorGateway struct { + err error +} + +func (g postErrorGateway) PostLimitOrder(context.Context, string, string, domain.Side, int64, decimal.Decimal, string) (domain.Order, error) { + return domain.Order{}, g.err +} + +func (g postErrorGateway) CancelOrder(context.Context, string, string) error { + return nil +} + +func (g postErrorGateway) GetOrderState(context.Context, string, string) (domain.Order, error) { + return domain.Order{}, g.err +} + func TestRefreshPreservesLocalQuoteContext(t *testing.T) { ctx := context.Background() repo := testutil.NewMemoryRepository() diff --git a/internal/instruments/registry.go b/internal/instruments/registry.go index d5e4415..0e6c6c0 100644 --- a/internal/instruments/registry.go +++ b/internal/instruments/registry.go @@ -29,7 +29,16 @@ func (r Registry) SyncMetadata(ctx context.Context) error { } remote, err := r.gateway.GetInstrument(ctx, instrument.Ticker, instrument.ClassCode) if err != nil { - return fmt.Errorf("sync instrument metadata %s: %w", instrument.Ticker, err) + if insertErr := r.repo.InsertRiskEvent(ctx, domain.RiskEvent{ + Severity: domain.SeverityWarn, + EventType: "instrument_metadata_sync_failed", + InstrumentUID: instrument.InstrumentUID, + Message: fmt.Sprintf("sync instrument metadata %s: %s", instrument.Ticker, err), + ContextJSON: fmt.Sprintf(`{"ticker":%q,"class_code":%q}`, instrument.Ticker, instrument.ClassCode), + }); insertErr != nil { + return fmt.Errorf("record metadata sync failure %s: %w", instrument.Ticker, insertErr) + } + continue } remote.Enabled = instrument.Enabled && remote.Enabled remote.FundType = instrument.FundType diff --git a/internal/instruments/registry_test.go b/internal/instruments/registry_test.go index bcca5a9..e81f70f 100644 --- a/internal/instruments/registry_test.go +++ b/internal/instruments/registry_test.go @@ -12,11 +12,11 @@ import ( "overnight-trading-bot/internal/tinvest" ) -func TestSyncMetadataFailsWhenEnabledInstrumentCannotBeLoaded(t *testing.T) { +func TestSyncMetadataSkipsInstrumentWhenRemoteMetadataFails(t *testing.T) { ctx := context.Background() repo := testutil.NewMemoryRepository() gateway := tinvest.NewFakeGateway() - instrument := domain.Instrument{ + bad := domain.Instrument{ InstrumentUID: "uid", Ticker: "TRUR", ClassCode: "TQTF", @@ -25,14 +25,46 @@ func TestSyncMetadataFailsWhenEnabledInstrumentCannotBeLoaded(t *testing.T) { Currency: "RUB", Enabled: true, } - if err := repo.UpsertInstrument(ctx, instrument); err != nil { + good := domain.Instrument{ + InstrumentUID: "good-local", + Ticker: "TGLD", + ClassCode: "TQTF", + Lot: 1, + MinPriceIncrement: decimal.NewFromInt(1), + Currency: "RUB", + Enabled: true, + } + remoteGood := good + remoteGood.InstrumentUID = "good-remote" + remoteGood.Name = "remote metadata" + if err := repo.UpsertInstrument(ctx, bad); err != nil { t.Fatal(err) } - gateway.Instruments["uid"] = instrument + if err := repo.UpsertInstrument(ctx, good); err != nil { + t.Fatal(err) + } + gateway.Instruments["uid"] = bad + gateway.Instruments["good-remote"] = remoteGood gateway.InstrumentErrors["uid"] = errors.New("metadata unavailable") err := NewRegistry(repo, gateway).SyncMetadata(ctx) - if err == nil { - t.Fatal("expected sync metadata error") + if err != nil { + t.Fatal(err) + } + if len(repo.RiskEvents) != 1 || repo.RiskEvents[0].EventType != "instrument_metadata_sync_failed" { + t.Fatalf("risk events=%+v, want one metadata sync failure", repo.RiskEvents) + } + instruments, err := repo.ListInstruments(ctx, true) + if err != nil { + t.Fatal(err) + } + foundRemote := false + for _, instrument := range instruments { + if instrument.InstrumentUID == "good-remote" && instrument.Name == "remote metadata" { + foundRemote = true + } + } + if !foundRemote { + t.Fatalf("instruments=%+v, want successful instruments to keep syncing", instruments) } } diff --git a/internal/repository/migrations/0011_whitelist_free_order_policy.down.sql b/internal/repository/migrations/0011_whitelist_free_order_policy.down.sql new file mode 100644 index 0000000..6cab1df --- /dev/null +++ b/internal/repository/migrations/0011_whitelist_free_order_policy.down.sql @@ -0,0 +1,6 @@ +UPDATE instruments +SET free_order_limit_per_day=0 +WHERE ticker IN ('TBRU', 'TDIV', 'TMON', 'TOFZ', 'TLCB', 'TITR', 'TRND') + AND free_order_limit_per_day=-1; + +UPDATE schema_meta SET meta_value='0010' WHERE meta_key='schema_version'; diff --git a/internal/repository/migrations/0011_whitelist_free_order_policy.up.sql b/internal/repository/migrations/0011_whitelist_free_order_policy.up.sql new file mode 100644 index 0000000..2a8b6d1 --- /dev/null +++ b/internal/repository/migrations/0011_whitelist_free_order_policy.up.sql @@ -0,0 +1,6 @@ +UPDATE instruments +SET free_order_limit_per_day=-1 +WHERE ticker IN ('TBRU', 'TDIV', 'TMON', 'TOFZ', 'TLCB', 'TITR', 'TRND') + AND free_order_limit_per_day=0; + +UPDATE schema_meta SET meta_value='0011' WHERE meta_key='schema_version'; diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index 0320ed3..d77edbd 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -135,10 +135,7 @@ func (s Scheduler) GracefulShutdown(ctx context.Context) error { if s.svc.Repo == nil || s.svc.Execution == nil { return nil } - if err := s.cancelActiveOrders(ctx, domain.SideBuy, domain.OrderStatusCancelled, "shutdown_cancel_active_orders"); err != nil { - return err - } - return s.cancelActiveOrders(ctx, domain.SideSell, domain.OrderStatusCancelled, "shutdown_cancel_active_orders") + return s.cancelActiveOrders(ctx, domain.SideBuy, domain.OrderStatusCancelled, "shutdown_cancel_active_orders") } func (s *Scheduler) Step(ctx context.Context) error { @@ -655,7 +652,7 @@ func (s *Scheduler) placeExitOrders(ctx context.Context, now time.Time) error { return err } for _, pos := range positionsList { - if pos.Lots <= 0 || hasOrder(existing, pos.InstrumentUID, domain.SideSell) { + if pos.Lots <= 0 || hasActiveBrokerOrder(existing, pos.InstrumentUID, domain.SideSell) { continue } instrument, ok := instrumentByUID[pos.InstrumentUID] @@ -739,6 +736,7 @@ func (s *Scheduler) monitorExitOrders(ctx context.Context, now time.Time) error } deadline := s.cfg.HardExitDeadline.On(now, s.cfg.Location).UTC() exitTradeDate := tradingDate(now) + activeExitOrders := make([]domain.Order, 0, len(orders)) for _, order := range orders { if order.Side != domain.SideSell || order.BrokerOrderID == "" || !sameTradingDate(order.TradeDate, exitTradeDate) { continue @@ -764,6 +762,9 @@ func (s *Scheduler) monitorExitOrders(ctx context.Context, now time.Time) error if err != nil { return err } + if isActiveOrder(monitored.Status) && monitored.BrokerOrderID != "" { + activeExitOrders = append(activeExitOrders, monitored) + } previousFill := execution.AggregatedOrderFill(order) if monitored.FilledLots > previousFill.FilledLots || monitored.Commission.GreaterThan(previousFill.Commission) { fill := exitFillDelta(order, monitored) @@ -781,9 +782,16 @@ func (s *Scheduler) monitorExitOrders(ctx context.Context, now time.Time) error positionByInstrument[monitored.InstrumentUID] = updated } } - if sinceMidnight(s.nowUTC().In(s.cfg.Location)) >= s.cfg.HardExitDeadline.Duration { + if sinceMidnight(now.In(s.cfg.Location)) >= s.cfg.HardExitDeadline.Duration { return s.failOpenPositionsAtHardDeadline(ctx) } + openPositions, err = s.svc.Repo.ListOpenPositions(ctx, s.svc.AccountIDHash) + if err != nil { + return err + } + if hasOpenPositionWithoutActiveExitOrder(openPositions, activeExitOrders, exitTradeDate) { + return s.placeExitOrders(ctx, now) + } return nil } @@ -1719,6 +1727,38 @@ func hasOrder(orders []domain.Order, instrumentUID string, side domain.Side) boo return false } +func hasActiveBrokerOrder(orders []domain.Order, instrumentUID string, side domain.Side) bool { + for _, order := range orders { + if order.InstrumentUID == instrumentUID && order.Side == side && order.BrokerOrderID != "" && isActiveOrder(order.Status) { + return true + } + } + return false +} + +func hasOpenPositionWithoutActiveExitOrder(positions []domain.Position, orders []domain.Order, tradeDate time.Time) bool { + for _, pos := range positions { + if pos.Lots <= 0 { + continue + } + hasActiveSell := false + for _, order := range orders { + if order.InstrumentUID == pos.InstrumentUID && + order.Side == domain.SideSell && + order.BrokerOrderID != "" && + sameTradingDate(order.TradeDate, tradeDate) && + isActiveOrder(order.Status) { + hasActiveSell = true + break + } + } + if !hasActiveSell { + return true + } + } + return false +} + func sortSignalsForEntry(signals []domain.Signal) { sort.SliceStable(signals, func(i, j int) bool { if signals[i].Decision != signals[j].Decision { diff --git a/internal/scheduler/scheduler_test.go b/internal/scheduler/scheduler_test.go index 4d459f9..79597d4 100644 --- a/internal/scheduler/scheduler_test.go +++ b/internal/scheduler/scheduler_test.go @@ -1200,7 +1200,105 @@ func TestPlaceExitUsesCurrentTradeDateForOrderAndFreeCounter(t *testing.T) { } } -func TestGracefulShutdownCancelsActiveOrders(t *testing.T) { +func TestMonitorExitOrdersReinitializesMissingActiveSell(t *testing.T) { + ctx := context.Background() + repo := testutil.NewMemoryRepository() + openDate := time.Date(2026, 6, 6, 0, 0, 0, 0, time.UTC) + exitDate := openDate.AddDate(0, 0, 1) + instrument := domain.Instrument{ + InstrumentUID: "uid", + Ticker: "TRUR", + ClassCode: "TQTF", + Enabled: true, + Lot: 1, + MinPriceIncrement: decimal.RequireFromString("0.01"), + Currency: "RUB", + FreeOrderLimitPerDay: -1, + } + if err := repo.UpsertInstrument(ctx, instrument); err != nil { + t.Fatal(err) + } + if err := repo.UpsertPosition(ctx, domain.Position{ + AccountIDHash: "hash", + InstrumentUID: "uid", + OpenTradeDate: openDate, + Lots: 2, + Lot: 1, + AvgBuyPrice: decimal.NewFromInt(100), + Status: domain.PositionExitOrderSent, + }); err != nil { + t.Fatal(err) + } + cancelledSell := domain.Order{ + ClientOrderID: "old-sell", + BrokerOrderID: "broker-old-sell", + AccountIDHash: "hash", + InstrumentUID: "uid", + TradeDate: exitDate, + Side: domain.SideSell, + OrderType: domain.OrderTypeLimit, + LimitPrice: decimal.NewFromInt(100), + QuantityLots: 2, + Status: domain.OrderStatusCancelled, + RawStateJSON: "{}", + } + if err := repo.UpsertOrder(ctx, cancelledSell); err != nil { + t.Fatal(err) + } + gateway := tinvest.NewFakeGateway() + gateway.OrderBooks["uid"] = domain.OrderBook{ + InstrumentUID: "uid", + Bids: []domain.OrderBookLevel{{Price: decimal.NewFromInt(100), QuantityLots: 10}}, + Asks: []domain.OrderBookLevel{{Price: decimal.RequireFromString("100.10"), QuantityLots: 10}}, + ReceivedAt: time.Now().UTC(), + } + execEngine := execution.NewEngine(domain.ModePaper, "account", gateway, repo) + s := Scheduler{ + cfg: Config{ + Mode: domain.ModePaper, + Location: time.UTC, + HardExitDeadline: mustTOD("23:00:00"), + }, + sm: statemachine.New(repo, domain.ModePaper), + svc: Services{ + Repo: repo, + Gateway: gateway, + MarketData: marketdata.NewLoader(repo, gateway), + Signals: signalengine.New(signalengine.Config{}), + FreeOrders: risk.NewFreeOrderBudget(repo), + Risk: risk.NewManager(repo, risk.ManagerConfig{}), + Execution: &execEngine, + Positions: position.NewManager(repo), + Notifier: &countNotifier{}, + AccountID: "account", + AccountIDHash: "hash", + }, + } + if err := repo.SaveSystemState(ctx, domain.StateMonitorExitOrders, domain.ModePaper, false, "", "{}"); err != nil { + t.Fatal(err) + } + if err := s.monitorExitOrders(ctx, exitDate.Add(10*time.Hour)); err != nil { + t.Fatal(err) + } + orders, err := repo.ListOrders(ctx, "hash", exitDate, exitDate) + if err != nil { + t.Fatal(err) + } + activeSellCount := 0 + for _, order := range orders { + if order.Side == domain.SideSell && order.Status == domain.OrderStatusSent && order.BrokerOrderID != "" { + activeSellCount++ + } + } + if activeSellCount != 1 { + t.Fatalf("orders=%+v, want one newly active sell order", orders) + } + if repo.State != domain.StateMonitorExitOrders { + t.Fatalf("state=%s, want MONITOR_EXIT_ORDERS after reinit", repo.State) + } +} + +func TestGracefulShutdownCancelsActiveBuyOrdersOnly(t *testing.T) { ctx := context.Background() repo := testutil.NewMemoryRepository() gateway := tinvest.NewFakeGateway() @@ -1222,6 +1320,14 @@ func TestGracefulShutdownCancelsActiveOrders(t *testing.T) { t.Fatal(err) } gateway.Orders[order.BrokerOrderID] = order + sell := order + sell.ClientOrderID = "shutdown-sell-order" + sell.BrokerOrderID = "broker-shutdown-sell-order" + sell.Side = domain.SideSell + if err := repo.UpsertOrder(ctx, sell); err != nil { + t.Fatal(err) + } + gateway.Orders[sell.BrokerOrderID] = sell execEngine := execution.NewEngine(domain.ModeSandbox, "account", gateway, repo) s := Scheduler{ cfg: Config{Mode: domain.ModeSandbox}, @@ -1238,8 +1344,15 @@ func TestGracefulShutdownCancelsActiveOrders(t *testing.T) { if err != nil { t.Fatal(err) } - if len(orders) != 1 || orders[0].Status != domain.OrderStatusCancelled { - t.Fatalf("orders=%+v, want cancelled", orders) + byClientID := make(map[string]domain.Order, len(orders)) + for _, order := range orders { + byClientID[order.ClientOrderID] = order + } + if byClientID["shutdown-order"].Status != domain.OrderStatusCancelled { + t.Fatalf("orders=%+v, want buy cancelled", orders) + } + if byClientID["shutdown-sell-order"].Status != domain.OrderStatusSent { + t.Fatalf("orders=%+v, want sell left active", orders) } } diff --git a/internal/statemachine/system.go b/internal/statemachine/system.go index 9e34a48..31f0bae 100644 --- a/internal/statemachine/system.go +++ b/internal/statemachine/system.go @@ -109,7 +109,7 @@ func legalTransition(from, to domain.SystemState) bool { domain.StateHoldOvernight: {domain.StateWaitExitWindow, domain.StatePlaceExitOrders, domain.StateMonitorExitOrders, domain.StateReconcile}, domain.StateWaitExitWindow: {domain.StatePlaceExitOrders}, domain.StatePlaceExitOrders: {domain.StateMonitorExitOrders, domain.StateReconcile}, - domain.StateMonitorExitOrders: {domain.StateReconcile}, + domain.StateMonitorExitOrders: {domain.StatePlaceExitOrders, domain.StateReconcile}, domain.StateReconcile: {domain.StateReport, domain.StateHalted, domain.StateGenerateSignals, domain.StateSleep}, domain.StateReport: {domain.StateSleep}, domain.StateSleep: {domain.StateInit, domain.StateWaitExitWindow, domain.StatePlaceExitOrders, domain.StateMonitorExitOrders, domain.StateGenerateSignals, domain.StatePlaceEntryOrders, domain.StateHoldOvernight, domain.StateReconcile},