From 7626c1b83167f919a9fafe3d596e5f5f84015b5f Mon Sep 17 00:00:00 2001 From: Valentin Popov Date: Mon, 8 Jun 2026 14:58:56 +0000 Subject: [PATCH] tenth version --- internal/backtest/engine.go | 8 +- internal/backtest/lookahead_test.go | 28 +++ internal/execution/engine.go | 241 +++++++++++++++++- internal/execution/state_test.go | 119 +++++++++ internal/features/pipeline.go | 1 - internal/features/pipeline_test.go | 8 +- internal/logging/logging.go | 8 +- internal/logging/logging_test.go | 36 +++ internal/position/manager.go | 20 +- internal/position/manager_test.go | 32 +++ internal/report/daily.go | 35 +++ internal/report/daily_test.go | 17 ++ internal/repository/mysql/repository.go | 12 +- internal/scheduler/scheduler.go | 55 ++++- internal/scheduler/scheduler_test.go | 308 ++++++++++++++++++++++++ internal/statemachine/system.go | 4 +- internal/statemachine/system_test.go | 12 + 17 files changed, 917 insertions(+), 27 deletions(-) create mode 100644 internal/logging/logging_test.go diff --git a/internal/backtest/engine.go b/internal/backtest/engine.go index 029441e..4f8fb89 100644 --- a/internal/backtest/engine.go +++ b/internal/backtest/engine.go @@ -250,11 +250,12 @@ func (e Engine) RunWithMinuteCandles(candlesByInstrument map[string][]domain.Can for _, c := range dayCandidates { entryIntervalVolume, exitIntervalVolume := e.windowVolumes(c, preparedMinutes[c.instrumentUID]) capacity := decimal.Zero - if entryIntervalVolume.IsPositive() && exitIntervalVolume.IsPositive() { + switch { + case entryIntervalVolume.IsPositive() && exitIntervalVolume.IsPositive(): capacity = money.Min(entryIntervalVolume, exitIntervalVolume).Mul(e.cfg.MaxParticipationRate) - } else if e.cfg.UseMinuteModel { + case e.cfg.UseMinuteModel: continue - } else { + default: entryIntervalVolume = e.unconstrainedIntervalVolume(equity) exitIntervalVolume = entryIntervalVolume } @@ -469,7 +470,6 @@ func (e Engine) evaluateCandidate(instrumentUID string, candles []domain.Candle, rawEdge := decimal.NewFromFloat(short.Mean).Mul(decimal.NewFromInt(10_000)) spreadBps := e.assumedSpreadBps(instrumentUID) cost := spreadBps. - Add(spreadBps). Add(e.cfg.EntrySlippageBps). Add(e.cfg.ExitSlippageBps). Add(e.cfg.CommissionRoundtripBps). diff --git a/internal/backtest/lookahead_test.go b/internal/backtest/lookahead_test.go index 6c47789..dd543e9 100644 --- a/internal/backtest/lookahead_test.go +++ b/internal/backtest/lookahead_test.go @@ -107,6 +107,34 @@ func TestEvaluateCandidateUsesInstrumentLotAndTick(t *testing.T) { } } +func TestEvaluateCandidateChargesOneFullSpreadRoundTrip(t *testing.T) { + requireZero := false + engine := New(Config{ + RollingShort: 2, + RollingLong: 2, + MinTStat60: decimal.NewFromInt(-1), + MinWinRate60: decimal.NewFromFloat(0.1), + MinNetEdgeBps: decimal.NewFromInt(-1000), + MinADVRUB: decimal.NewFromInt(1), + AssumedSpreadBps: decimal.NewFromInt(10), + EntrySlippageBps: decimal.NewFromInt(2), + ExitSlippageBps: decimal.NewFromInt(3), + CommissionRoundtripBps: decimal.NewFromInt(4), + RiskBufferBps: decimal.NewFromInt(5), + RequireZeroCommission: &requireZero, + }) + got, ok, err := engine.evaluateCandidate("uid", candidateCandles("uid"), 4) + if err != nil { + t.Fatal(err) + } + if !ok { + t.Fatal("expected candidate") + } + if !got.netEdge.Equal(decimal.NewFromInt(126)) { + t.Fatalf("net edge=%s, want raw 150 - cost 24", got.netEdge) + } +} + func TestWindowCapacityUsesMinuteEntryAndExitWindows(t *testing.T) { engine := New(Config{ Lot: 10, diff --git a/internal/execution/engine.go b/internal/execution/engine.go index 264a7e4..09998b4 100644 --- a/internal/execution/engine.go +++ b/internal/execution/engine.go @@ -150,7 +150,7 @@ func (e *Engine) placeLimit(ctx context.Context, order domain.Order, freeOrderLi lock := e.lockFor(order.InstrumentUID) lock.Lock() defer lock.Unlock() - if e.mode != domain.ModePaper && !e.mode.AllowsBrokerOrders() { + if !e.mode.AllowsBrokerOrders() && e.mode != domain.ModePaper { return order, ErrBrokerOrdersDisabled } if e.gateway == nil { @@ -287,7 +287,7 @@ func (e *Engine) MonitorUntil(ctx context.Context, order domain.Order, cfg Monit } lastPost := e.nowUTC() current := order - aggregate := order + aggregate := AggregatedOrderFill(order) seen := map[string]domain.Order{order.ClientOrderID: order} for { previous := seen[current.ClientOrderID] @@ -295,12 +295,19 @@ func (e *Engine) MonitorUntil(ctx context.Context, order domain.Order, cfg Monit if err != nil { return aggregate, err } + if delta := fillDeltaLots(previous, refreshed); delta > 0 { + refreshed.RawStateJSON = e.captureFillQuote(ctx, refreshed.RawStateJSON, refreshed, cfg, delta) + } aggregate = mergeAggregateFill(aggregate, previous, refreshed) seen[current.ClientOrderID] = refreshed current = mergeOrderState(current, refreshed) aggregate.Status = current.Status aggregate.UpdatedAt = current.UpdatedAt + current.RawStateJSON = withMonitorAggregate(current.RawStateJSON, aggregate) aggregate.RawStateJSON = current.RawStateJSON + if err := e.persistOrderMonitorState(ctx, current); err != nil { + return aggregate, err + } if aggregate.FilledLots >= aggregate.QuantityLots { aggregate.Status = domain.OrderStatusFilled return aggregate, nil @@ -329,7 +336,15 @@ func (e *Engine) MonitorUntil(ctx context.Context, order domain.Order, cfg Monit result, err := e.repost(ctx, current, cfg, aggregate.QuantityLots-aggregate.FilledLots) if result.Cancelled.ClientOrderID != "" { previous := seen[result.Cancelled.ClientOrderID] + if delta := fillDeltaLots(previous, result.Cancelled); delta > 0 { + result.Cancelled.RawStateJSON = e.captureFillQuote(ctx, result.Cancelled.RawStateJSON, result.Cancelled, cfg, delta) + } aggregate = mergeAggregateFill(aggregate, previous, result.Cancelled) + result.Cancelled.RawStateJSON = withMonitorAggregate(result.Cancelled.RawStateJSON, aggregate) + aggregate.RawStateJSON = result.Cancelled.RawStateJSON + if persistErr := e.persistOrderMonitorState(ctx, result.Cancelled); persistErr != nil { + return aggregate, persistErr + } seen[result.Cancelled.ClientOrderID] = result.Cancelled if aggregate.FilledLots >= aggregate.QuantityLots { aggregate.Status = domain.OrderStatusFilled @@ -341,6 +356,11 @@ func (e *Engine) MonitorUntil(ctx context.Context, order domain.Order, cfg Monit } if result.Changed { current = result.Current + current.RawStateJSON = carryFillQuotes(current.RawStateJSON, aggregate.RawStateJSON) + current.RawStateJSON = withMonitorAggregate(current.RawStateJSON, aggregate) + if persistErr := e.persistOrderMonitorState(ctx, current); persistErr != nil { + return aggregate, persistErr + } seen[current.ClientOrderID] = current aggregate.Status = current.Status aggregate.UpdatedAt = current.UpdatedAt @@ -362,16 +382,24 @@ func (e *Engine) MonitorOnce(ctx context.Context, order domain.Order, cfg Monito if cfg.MaxAttempts <= 0 { cfg.MaxAttempts = 1 } + aggregate := AggregatedOrderFill(order) previous := order refreshed, err := e.Refresh(ctx, order) if err != nil { return order, err } - aggregate := mergeAggregateFill(order, previous, refreshed) + if delta := fillDeltaLots(previous, refreshed); delta > 0 { + refreshed.RawStateJSON = e.captureFillQuote(ctx, refreshed.RawStateJSON, refreshed, cfg, delta) + } + aggregate = mergeAggregateFill(aggregate, previous, refreshed) current := mergeOrderState(order, refreshed) aggregate.Status = current.Status aggregate.UpdatedAt = current.UpdatedAt + current.RawStateJSON = withMonitorAggregate(current.RawStateJSON, aggregate) aggregate.RawStateJSON = current.RawStateJSON + if err := e.persistOrderMonitorState(ctx, current); err != nil { + return aggregate, err + } if aggregate.FilledLots >= aggregate.QuantityLots { aggregate.Status = domain.OrderStatusFilled return aggregate, nil @@ -399,7 +427,15 @@ func (e *Engine) MonitorOnce(ctx context.Context, order domain.Order, cfg Monito if shouldRepost { result, err := e.repost(ctx, current, cfg, aggregate.QuantityLots-aggregate.FilledLots) if result.Cancelled.ClientOrderID != "" { + if delta := fillDeltaLots(current, result.Cancelled); delta > 0 { + result.Cancelled.RawStateJSON = e.captureFillQuote(ctx, result.Cancelled.RawStateJSON, result.Cancelled, cfg, delta) + } aggregate = mergeAggregateFill(aggregate, current, result.Cancelled) + result.Cancelled.RawStateJSON = withMonitorAggregate(result.Cancelled.RawStateJSON, aggregate) + aggregate.RawStateJSON = result.Cancelled.RawStateJSON + if persistErr := e.persistOrderMonitorState(ctx, result.Cancelled); persistErr != nil { + return aggregate, persistErr + } if aggregate.FilledLots >= aggregate.QuantityLots { aggregate.Status = domain.OrderStatusFilled return aggregate, nil @@ -412,8 +448,13 @@ func (e *Engine) MonitorOnce(ctx context.Context, order domain.Order, cfg Monito aggregate.BrokerOrderID = result.Current.BrokerOrderID aggregate.ClientOrderID = result.Current.ClientOrderID aggregate.Status = result.Current.Status - aggregate.RawStateJSON = result.Current.RawStateJSON aggregate.UpdatedAt = result.Current.UpdatedAt + result.Current.RawStateJSON = carryFillQuotes(result.Current.RawStateJSON, aggregate.RawStateJSON) + result.Current.RawStateJSON = withMonitorAggregate(result.Current.RawStateJSON, aggregate) + aggregate.RawStateJSON = result.Current.RawStateJSON + if persistErr := e.persistOrderMonitorState(ctx, result.Current); persistErr != nil { + return aggregate, persistErr + } } } return aggregate, nil @@ -628,6 +669,198 @@ func localRawStateJSON(raw string) string { return raw } +func AggregatedOrderFill(order domain.Order) domain.Order { + aggregate := order + state, ok := monitorAggregateFromRaw(order.RawStateJSON) + if !ok { + return aggregate + } + if state.QuantityLots > 0 { + aggregate.QuantityLots = state.QuantityLots + } + aggregate.FilledLots = state.FilledLots + aggregate.AvgFillPrice = state.AvgFillPrice + aggregate.Commission = state.Commission + return aggregate +} + +type monitorAggregateState struct { + QuantityLots int64 + FilledLots int64 + AvgFillPrice decimal.Decimal + Commission decimal.Decimal +} + +func monitorAggregateFromRaw(raw string) (monitorAggregateState, bool) { + var root map[string]any + if err := json.Unmarshal([]byte(raw), &root); err != nil { + return monitorAggregateState{}, false + } + if local, ok := root["local"].(map[string]any); ok { + if state, ok := monitorAggregateFromContainer(local); ok { + return state, true + } + } + return monitorAggregateFromContainer(root) +} + +func monitorAggregateFromContainer(container map[string]any) (monitorAggregateState, bool) { + raw, ok := container["monitor_aggregate"].(map[string]any) + if !ok { + return monitorAggregateState{}, false + } + quantityLots, quantityOK := int64FromAny(raw["quantity_lots"]) + filledLots, filledOK := int64FromAny(raw["filled_lots"]) + avgFillPrice, avgOK := decimalFromAny(raw["avg_fill_price"]) + commission, commissionOK := decimalFromAny(raw["commission"]) + if !quantityOK || !filledOK { + return monitorAggregateState{}, false + } + if !avgOK { + avgFillPrice = decimal.Zero + } + if !commissionOK { + commission = decimal.Zero + } + return monitorAggregateState{ + QuantityLots: quantityLots, + FilledLots: filledLots, + AvgFillPrice: avgFillPrice, + Commission: commission, + }, true +} + +func withMonitorAggregate(raw string, aggregate domain.Order) string { + root := rawStateObject(raw) + local := localObjectForMutation(root) + local["monitor_aggregate"] = map[string]any{ + "quantity_lots": aggregate.QuantityLots, + "filled_lots": aggregate.FilledLots, + "avg_fill_price": aggregate.AvgFillPrice.String(), + "commission": aggregate.Commission.String(), + } + encoded, err := json.Marshal(root) + if err != nil { + return raw + } + return string(encoded) +} + +func carryFillQuotes(raw, sourceRaw string) string { + source := rawStateObject(sourceRaw) + sourceLocal := source + if local, ok := source["local"].(map[string]any); ok { + sourceLocal = local + } + quotes, ok := sourceLocal["fill_quotes"].([]any) + if !ok || len(quotes) == 0 { + return raw + } + root := rawStateObject(raw) + local := localObjectForMutation(root) + local["fill_quotes"] = quotes + encoded, err := json.Marshal(root) + if err != nil { + return raw + } + return string(encoded) +} + +func (e *Engine) captureFillQuote(ctx context.Context, raw string, order domain.Order, cfg MonitorConfig, deltaLots int64) string { + if deltaLots <= 0 || cfg.Quote == nil { + return raw + } + book, err := cfg.Quote(ctx, order.InstrumentUID) + if err != nil { + return raw + } + bid, ask, err := bestBidAsk(book) + if err != nil { + return raw + } + root := rawStateObject(raw) + local := localObjectForMutation(root) + quotes, _ := local["fill_quotes"].([]any) + fillQuote := map[string]any{ + "best_bid": bid.String(), + "best_ask": ask.String(), + "mid": bid.Add(ask).Div(decimal.NewFromInt(2)).String(), + "recorded_at": e.nowUTC().Format(time.RFC3339Nano), + "filled_lots_delta": deltaLots, + } + if ts := quoteTimestamp(book); !ts.IsZero() { + fillQuote["quote_ts"] = ts.UTC().Format(time.RFC3339Nano) + } + local["fill_quotes"] = append(quotes, fillQuote) + encoded, err := json.Marshal(root) + if err != nil { + return raw + } + return string(encoded) +} + +func rawStateObject(raw string) map[string]any { + var root map[string]any + if err := json.Unmarshal([]byte(raw), &root); err != nil || root == nil { + return map[string]any{} + } + return root +} + +func localObjectForMutation(root map[string]any) map[string]any { + if local, ok := root["local"].(map[string]any); ok { + return local + } + if _, hasBroker := root["broker"]; hasBroker { + local := map[string]any{} + root["local"] = local + return local + } + return root +} + +func decimalFromAny(value any) (decimal.Decimal, bool) { + switch typed := value.(type) { + case string: + parsed, err := decimal.NewFromString(typed) + return parsed, err == nil + case float64: + return decimal.NewFromFloat(typed), true + default: + return decimal.Zero, false + } +} + +func int64FromAny(value any) (int64, bool) { + switch typed := value.(type) { + case float64: + return int64(typed), true + case string: + parsed, err := decimal.NewFromString(typed) + if err != nil { + return 0, false + } + return parsed.IntPart(), true + default: + return 0, false + } +} + +func fillDeltaLots(previous, current domain.Order) int64 { + delta := current.FilledLots - previous.FilledLots + if delta < 0 { + return 0 + } + return delta +} + +func (e *Engine) persistOrderMonitorState(ctx context.Context, order domain.Order) error { + if e.store == nil { + return nil + } + return e.store.UpdateOrderStatus(ctx, order.ClientOrderID, order.Status, order.FilledLots, order.RawStateJSON) +} + func (e *Engine) lockFor(instrumentUID string) *sync.Mutex { value, _ := e.mu.LoadOrStore(instrumentUID, &sync.Mutex{}) lock, ok := value.(*sync.Mutex) diff --git a/internal/execution/state_test.go b/internal/execution/state_test.go index 2a68b42..8a969b3 100644 --- a/internal/execution/state_test.go +++ b/internal/execution/state_test.go @@ -352,6 +352,50 @@ func TestLiveReadonlyDoesNotPersistLocalOrder(t *testing.T) { } } +func TestPlaceLimitModePolicy(t *testing.T) { + tests := []struct { + mode domain.Mode + allowed bool + }{ + {mode: domain.ModeBacktest, allowed: false}, + {mode: domain.ModePaper, allowed: true}, + {mode: domain.ModeSandbox, allowed: true}, + {mode: domain.ModeLiveReadonly, allowed: false}, + {mode: domain.ModeLiveTrade, allowed: true}, + } + for _, tt := range tests { + t.Run(string(tt.mode), func(t *testing.T) { + ctx := context.Background() + repo := testutil.NewMemoryRepository() + engine := NewEngine(tt.mode, "account", tinvest.NewFakeGateway(), repo) + _, err := engine.PlaceLimit(ctx, domain.Order{ + ClientOrderID: "order-" + string(tt.mode), + AccountIDHash: "hash", + InstrumentUID: "uid", + TradeDate: time.Date(2026, 6, 8, 0, 0, 0, 0, time.UTC), + Side: domain.SideBuy, + OrderType: domain.OrderTypeLimit, + LimitPrice: decimal.NewFromInt(100), + QuantityLots: 1, + Status: domain.OrderStatusNew, + AttemptNo: 1, + }) + if tt.allowed && err != nil { + t.Fatalf("PlaceLimit err=%v, want allowed", err) + } + if !tt.allowed && !errors.Is(err, ErrBrokerOrdersDisabled) { + t.Fatalf("PlaceLimit err=%v, want ErrBrokerOrdersDisabled", err) + } + if tt.allowed && len(repo.Orders) != 1 { + t.Fatalf("orders=%+v, want one persisted order", repo.Orders) + } + if !tt.allowed && len(repo.Orders) != 0 { + t.Fatalf("orders=%+v, want no persisted order", repo.Orders) + } + }) + } +} + func TestMonitorUntilRepostsAndExpiresAtDeadline(t *testing.T) { ctx := context.Background() repo := testutil.NewMemoryRepository() @@ -496,6 +540,9 @@ func TestMonitorOnceRepostAccountsForFillsDuringCancel(t *testing.T) { if monitored.FilledLots != 2 { t.Fatalf("aggregate filled lots=%d, want cancel fill 2", monitored.FilledLots) } + if !strings.Contains(monitored.RawStateJSON, "fill_quotes") { + t.Fatalf("fill quote snapshot was not recorded: %s", monitored.RawStateJSON) + } if got := len(gateway.posted); got != 2 { t.Fatalf("broker orders=%d, want initial+repost", got) } @@ -504,6 +551,78 @@ func TestMonitorOnceRepostAccountsForFillsDuringCancel(t *testing.T) { } } +func TestMonitorOnceAggregatesRepostsAcrossTicks(t *testing.T) { + ctx := context.Background() + repo := testutil.NewMemoryRepository() + gateway := newCancelFillGateway(2) + engine := NewEngine(domain.ModeSandbox, "account", gateway, repo) + instrument := domain.Instrument{ + InstrumentUID: "uid", + Lot: 1, + MinPriceIncrement: decimal.NewFromInt(1), + FreeOrderLimitPerDay: -1, + } + book := domain.OrderBook{ + InstrumentUID: "uid", + Bids: []domain.OrderBookLevel{{Price: decimal.NewFromInt(99), QuantityLots: 10}}, + Asks: []domain.OrderBookLevel{{Price: decimal.NewFromInt(101), QuantityLots: 10}}, + ReceivedAt: time.Now().UTC(), + } + tradeDate := time.Date(2026, 6, 6, 0, 0, 0, 0, time.UTC) + order, err := engine.PlaceEntry(ctx, "hash", instrument, tradeDate, 5, book, 1, 1) + if err != nil { + t.Fatal(err) + } + order.CreatedAt = time.Now().UTC().Add(-time.Minute) + if err := repo.UpsertOrder(ctx, order); err != nil { + t.Fatal(err) + } + cfg := MonitorConfig{ + Deadline: time.Now().Add(time.Minute), + PollInterval: time.Millisecond, + MaxAttempts: 3, + RepostAfter: time.Second, + Instrument: instrument, + ImproveTicks: 1, + Quote: func(context.Context, string) (domain.OrderBook, error) { + book.ReceivedAt = time.Now().UTC() + return book, nil + }, + } + first, err := engine.MonitorOnce(ctx, order, cfg) + if err != nil { + t.Fatal(err) + } + if first.FilledLots != 2 { + t.Fatalf("first aggregate filled lots=%d, want 2", first.FilledLots) + } + active, err := repo.ListActiveOrders(ctx, "hash") + if err != nil { + t.Fatal(err) + } + if len(active) != 1 { + t.Fatalf("active orders=%+v, want reposted order", active) + } + next := active[0] + next.CreatedAt = time.Now().UTC().Add(-time.Minute) + if err := repo.UpsertOrder(ctx, next); err != nil { + t.Fatal(err) + } + second, err := engine.MonitorOnce(ctx, next, cfg) + if err != nil { + t.Fatal(err) + } + if second.FilledLots != 4 { + t.Fatalf("second aggregate filled lots=%d, want 4 across reposts", second.FilledLots) + } + if got := len(gateway.posted); got != 3 { + t.Fatalf("broker orders=%d, want initial+two reposts", got) + } + if got := gateway.posted[2].QuantityLots; got != 1 { + t.Fatalf("second repost quantity lots=%d, want remaining 1", got) + } +} + func TestMonitorOnceKeepsCancelFillWhenRepostPostFails(t *testing.T) { ctx := context.Background() repo := testutil.NewMemoryRepository() diff --git a/internal/features/pipeline.go b/internal/features/pipeline.go index b6d2553..a6442ca 100644 --- a/internal/features/pipeline.go +++ b/internal/features/pipeline.go @@ -114,7 +114,6 @@ func Compute(instrument domain.Instrument, candles []domain.Candle, tradeDate ti rawEdgeBps := decimal.NewFromFloat(short.Mean).Mul(decimal.NewFromInt(10_000)) commission := roundTripCommissionBps(instrument, cfg) expectedCost := spread.SpreadBps. - Add(spread.SpreadBps). Add(cfg.EntrySlippageBps). Add(cfg.ExitSlippageBps). Add(commission). diff --git a/internal/features/pipeline_test.go b/internal/features/pipeline_test.go index 1b9d2bf..178fadd 100644 --- a/internal/features/pipeline_test.go +++ b/internal/features/pipeline_test.go @@ -41,8 +41,8 @@ func TestComputeExpectedCostIncludesCommissionAndSlippage(t *testing.T) { if err != nil { t.Fatal(err) } - if !got.ExpectedCostBps.Equal(decimal.NewFromInt(32)) { - t.Fatalf("expected cost=%s, want 32", got.ExpectedCostBps) + if !got.ExpectedCostBps.Equal(decimal.NewFromInt(22)) { + t.Fatalf("expected cost=%s, want 22", got.ExpectedCostBps) } if !got.EntryIntervalVolume.Equal(decimal.NewFromInt(10000)) || !got.ExitIntervalVolume.Equal(decimal.NewFromInt(9000)) { t.Fatalf("interval volumes were not preserved: %+v", got) @@ -66,8 +66,8 @@ func TestComputeExpectedCostFallsBackToConfigCommission(t *testing.T) { if err != nil { t.Fatal(err) } - if !got.ExpectedCostBps.Equal(decimal.NewFromInt(34)) { - t.Fatalf("expected cost=%s, want 34", got.ExpectedCostBps) + if !got.ExpectedCostBps.Equal(decimal.NewFromInt(24)) { + t.Fatalf("expected cost=%s, want 24", got.ExpectedCostBps) } } diff --git a/internal/logging/logging.go b/internal/logging/logging.go index 3377eee..12e97a4 100644 --- a/internal/logging/logging.go +++ b/internal/logging/logging.go @@ -54,10 +54,16 @@ func (l SDKLogger) Fatalf(template string, args ...any) { var sensitiveStringPatterns = []*regexp.Regexp{ regexp.MustCompile(`(?i)((?:account[_-]?id|token)\s*[:=]\s*)("[^"]+"|'[^']+'|[^\s,}]+)`), - regexp.MustCompile(`(?i)("(?:accountId|account_id|token)"\s*:\s*)("[^"]*"|null)`), + regexp.MustCompile(`(?i)("(?:accountID|accountId|account_id|token)"\s*:\s*)("[^"]*"|null)`), } +var sensitiveAttrKeyPattern = regexp.MustCompile(`(?i)^(account[_-]?id|accountID|accountId|token)$`) + func redactAttr(_ []string, attr slog.Attr) slog.Attr { + if sensitiveAttrKeyPattern.MatchString(attr.Key) { + attr.Value = slog.StringValue("[REDACTED]") + return attr + } if attr.Value.Kind() == slog.KindString { attr.Value = slog.StringValue(RedactString(attr.Value.String())) } diff --git a/internal/logging/logging_test.go b/internal/logging/logging_test.go new file mode 100644 index 0000000..58ecea1 --- /dev/null +++ b/internal/logging/logging_test.go @@ -0,0 +1,36 @@ +package logging + +import ( + "bytes" + "strings" + "testing" +) + +func TestRedactStringCoversAccountIDSpellings(t *testing.T) { + secret := "plain-account-id" + raw := strings.Join([]string{ + `accountID=plain-account-id`, + `account_id: plain-account-id`, + `{"accountId":"plain-account-id"}`, + `{"accountID":"plain-account-id"}`, + }, "\n") + got := RedactString(raw) + if strings.Contains(got, secret) { + t.Fatalf("redacted string leaked account id: %s", got) + } +} + +func TestSlogRedactsSensitiveAccountIDAttributes(t *testing.T) { + var buf bytes.Buffer + logger := New("info", &buf) + logger.Info("submit", "account_id", "plain-account-id", "accountID", "other-account-id", "accountId", "third-account-id") + got := buf.String() + for _, secret := range []string{"plain-account-id", "other-account-id", "third-account-id"} { + if strings.Contains(got, secret) { + t.Fatalf("log leaked account id %q: %s", secret, got) + } + } + if strings.Count(got, "[REDACTED]") < 3 { + t.Fatalf("log did not redact account ids: %s", got) + } +} diff --git a/internal/position/manager.go b/internal/position/manager.go index 6408666..bf74e66 100644 --- a/internal/position/manager.go +++ b/internal/position/manager.go @@ -2,6 +2,8 @@ package position import ( "context" + "errors" + "fmt" "time" "github.com/shopspring/decimal" @@ -11,6 +13,8 @@ import ( "overnight-trading-bot/internal/repository" ) +var ErrExitFillExceedsPositionLots = errors.New("exit fill exceeds local position lots") + type Manager struct { repo repository.Repository } @@ -109,7 +113,21 @@ func (m Manager) OnExitFill(ctx context.Context, pos domain.Position, exitOrder if lot <= 0 { lot = 1 } - executedLots := min(exitOrder.FilledLots, pos.Lots) + if exitOrder.FilledLots > pos.Lots { + err := fmt.Errorf("%w: filled_lots=%d position_lots=%d instrument_uid=%s", ErrExitFillExceedsPositionLots, exitOrder.FilledLots, pos.Lots, pos.InstrumentUID) + if m.repo != nil { + _ = m.repo.InsertRiskEvent(ctx, domain.RiskEvent{ + TS: now, + Severity: domain.SeverityCritical, + EventType: "exit_overfill", + InstrumentUID: pos.InstrumentUID, + Message: err.Error(), + ContextJSON: fmt.Sprintf(`{"filled_lots":%d,"position_lots":%d}`, exitOrder.FilledLots, pos.Lots), + }) + } + return pos, err + } + executedLots := exitOrder.FilledLots if executedLots < 0 { executedLots = 0 } diff --git a/internal/position/manager_test.go b/internal/position/manager_test.go index 7efc5f2..e3dbcfd 100644 --- a/internal/position/manager_test.go +++ b/internal/position/manager_test.go @@ -2,6 +2,7 @@ package position import ( "context" + "errors" "testing" "time" @@ -183,3 +184,34 @@ func TestOnExitFillUsesLotInRealizedEdgeCommissionBase(t *testing.T) { t.Fatalf("realized edge=%s, want -10 bps", updated.RealizedEdgeBps) } } + +func TestOnExitFillRejectsOverfillWithCriticalRiskEvent(t *testing.T) { + ctx := context.Background() + repo := testutil.NewMemoryRepository() + manager := NewManager(repo) + openAt := time.Now().UTC() + pos := domain.Position{ + AccountIDHash: "hash", + InstrumentUID: "uid", + OpenTradeDate: openAt, + Lots: 3, + Lot: 1, + AvgBuyPrice: decimal.NewFromInt(100), + Status: domain.PositionHoldingOvernight, + OpenedAt: &openAt, + } + updated, err := manager.OnExitFill(ctx, pos, domain.Order{ + InstrumentUID: "uid", + FilledLots: 5, + AvgFillPrice: decimal.NewFromInt(110), + }) + if !errors.Is(err, ErrExitFillExceedsPositionLots) { + t.Fatalf("err=%v, want ErrExitFillExceedsPositionLots", err) + } + if updated.Lots != 3 || updated.Status != domain.PositionHoldingOvernight { + t.Fatalf("position was mutated despite overfill: %+v", updated) + } + if len(repo.RiskEvents) != 1 || repo.RiskEvents[0].Severity != domain.SeverityCritical || repo.RiskEvents[0].EventType != "exit_overfill" { + t.Fatalf("risk events=%+v, want one critical exit_overfill", repo.RiskEvents) + } +} diff --git a/internal/report/daily.go b/internal/report/daily.go index d26746b..ce5e31d 100644 --- a/internal/report/daily.go +++ b/internal/report/daily.go @@ -135,12 +135,47 @@ func orderAdverseSlippageBps(order domain.Order) (decimal.Decimal, bool) { } func orderReferencePrice(order domain.Order) decimal.Decimal { + if mid := rawFillMidPrice(order.RawStateJSON); mid.IsPositive() { + return mid + } if mid := rawMidPrice(order.RawStateJSON); mid.IsPositive() { return mid } return order.LimitPrice } +func rawFillMidPrice(raw string) decimal.Decimal { + var root map[string]any + if err := json.Unmarshal([]byte(raw), &root); err != nil { + return decimal.Zero + } + if mid := fillMidFromContainer(root); mid.IsPositive() { + return mid + } + if local, ok := root["local"].(map[string]any); ok { + return fillMidFromContainer(local) + } + return decimal.Zero +} + +func fillMidFromContainer(container map[string]any) decimal.Decimal { + quotes, ok := container["fill_quotes"].([]any) + if !ok || len(quotes) == 0 { + return decimal.Zero + } + for i := len(quotes) - 1; i >= 0; i-- { + quote, ok := quotes[i].(map[string]any) + if !ok { + continue + } + mid, ok := decimalFromAny(quote["mid"]) + if ok && mid.IsPositive() { + return mid + } + } + return decimal.Zero +} + func rawMidPrice(raw string) decimal.Decimal { var root map[string]any if err := json.Unmarshal([]byte(raw), &root); err != nil { diff --git a/internal/report/daily_test.go b/internal/report/daily_test.go index eef7aa6..f2935c1 100644 --- a/internal/report/daily_test.go +++ b/internal/report/daily_test.go @@ -26,6 +26,23 @@ func TestAverageAdverseSlippageBpsUsesLocalQuoteMid(t *testing.T) { } } +func TestAverageAdverseSlippageBpsPrefersFillQuoteMid(t *testing.T) { + orders := []domain.Order{{ + InstrumentUID: "uid", + Side: domain.SideBuy, + LimitPrice: decimal.NewFromInt(100), + FilledLots: 1, + AvgFillPrice: decimal.NewFromFloat(102), + RawStateJSON: `{"local":{"local_quote":{"mid":"100"},"fill_quotes":[{"mid":"101"}]}}`, + UpdatedAt: time.Now().UTC(), + }} + got := AverageAdverseSlippageBps(orders, 0) + want := decimal.NewFromInt(10_000).Div(decimal.NewFromInt(101)) + if got.Sub(want).Abs().GreaterThan(decimal.NewFromFloat(0.000001)) { + t.Fatalf("slippage=%s, want fill-mid based slippage", got) + } +} + func TestAverageAdverseSlippageBpsFallsBackToLimit(t *testing.T) { orders := []domain.Order{{ InstrumentUID: "uid", diff --git a/internal/repository/mysql/repository.go b/internal/repository/mysql/repository.go index a9e9aa3..d91ffb1 100644 --- a/internal/repository/mysql/repository.go +++ b/internal/repository/mysql/repository.go @@ -183,10 +183,14 @@ func (r *Repository) mergeInstrumentUID(ctx context.Context, oldInstrumentUID, n if err := r.mergeFreeOrders(ctx, oldInstrumentUID, newInstrumentUID); err != nil { return err } - for _, table := range []string{"orders", "positions", "risk_events"} { - if _, err := r.execer().ExecContext(ctx, fmt.Sprintf(`UPDATE %s SET instrument_uid=? WHERE instrument_uid=?`, table), newInstrumentUID, oldInstrumentUID); err != nil { - return err - } + if _, err := r.execer().ExecContext(ctx, `UPDATE orders SET instrument_uid=? WHERE instrument_uid=?`, newInstrumentUID, oldInstrumentUID); err != nil { + return err + } + if _, err := r.execer().ExecContext(ctx, `UPDATE positions SET instrument_uid=? WHERE instrument_uid=?`, newInstrumentUID, oldInstrumentUID); err != nil { + return err + } + if _, err := r.execer().ExecContext(ctx, `UPDATE risk_events SET instrument_uid=? WHERE instrument_uid=?`, newInstrumentUID, oldInstrumentUID); err != nil { + return err } _, err := r.execer().ExecContext(ctx, `DELETE FROM instruments WHERE instrument_uid=?`, oldInstrumentUID) return err diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index cd1c4c7..5e00646 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -192,6 +192,8 @@ func (s *Scheduler) Step(ctx context.Context) error { } func (s Scheduler) phase(now time.Time) domain.SystemState { + // WAIT_ENTRY_WINDOW is a persisted checkpoint after signal generation; wall-clock + // phases move directly from GENERATE_SIGNALS to PLACE_ENTRY_ORDERS. tod := sinceMidnight(now) exitWindowStart := s.cfg.ExitWindowStart.Duration if s.cfg.ExitNotBefore.Duration > exitWindowStart { @@ -351,6 +353,7 @@ func (s Scheduler) applyBatchSignalLimits(portfolio domain.Portfolio, existingEx } } selectedCount := remainingSlots + reservedCash := decimal.Zero for rank, index := range enterIndexes { candidate := &generated[index] if rank >= remainingSlots { @@ -360,7 +363,7 @@ func (s Scheduler) applyBatchSignalLimits(portfolio domain.Portfolio, existingEx candidate.Signal.RejectReason = signal.ReasonMaxPositions continue } - sized, sizingErr := s.sizeSignal(portfolio, candidate.Instrument, candidate.Feature, candidate.Book, selectedCount, existingExposure, decimal.Zero) + sized, sizingErr := s.sizeSignal(portfolio, candidate.Instrument, candidate.Feature, candidate.Book, selectedCount, existingExposure, reservedCash) switch { case sizingErr != nil: candidate.Signal.Decision = domain.DecisionReject @@ -374,6 +377,7 @@ func (s Scheduler) applyBatchSignalLimits(portfolio domain.Portfolio, existingEx default: candidate.Signal.TargetLots = sized.Lots candidate.Signal.TargetNotional = sized.TargetNotional + reservedCash = reservedCash.Add(sized.TargetNotional) } } } @@ -582,7 +586,8 @@ func (s *Scheduler) monitorEntryOrders(ctx context.Context, now time.Time) error if err != nil { return err } - if monitored.FilledLots > order.FilledLots || monitored.Commission.GreaterThan(order.Commission) { + previousFill := execution.AggregatedOrderFill(order) + if monitored.FilledLots > previousFill.FilledLots || monitored.Commission.GreaterThan(previousFill.Commission) { fill := entryFillDelta(order, monitored) if fill.FilledLots <= 0 && fill.Commission.IsZero() { continue @@ -758,7 +763,8 @@ func (s *Scheduler) monitorExitOrders(ctx context.Context, now time.Time) error if err != nil { return err } - if monitored.FilledLots > order.FilledLots || monitored.Commission.GreaterThan(order.Commission) { + previousFill := execution.AggregatedOrderFill(order) + if monitored.FilledLots > previousFill.FilledLots || monitored.Commission.GreaterThan(previousFill.Commission) { fill := exitFillDelta(order, monitored) if fill.FilledLots <= 0 && fill.Commission.IsZero() { continue @@ -902,12 +908,15 @@ func (s *Scheduler) applySizeReductionRule(ctx context.Context, tradeDate time.T if !emitEvent { return nil } - return s.svc.Repo.InsertRiskEvent(ctx, domain.RiskEvent{ + if err := s.svc.Repo.InsertRiskEvent(ctx, domain.RiskEvent{ Severity: domain.SeverityWarn, EventType: "size_reduction_rule_triggered", Message: fmt.Sprintf("average expected_error_bps over %d trades is %s; sizing factor set to %s", count, averageError.StringFixed(2), factor.String()), ContextJSON: fmt.Sprintf(`{"average_expected_error_bps":%q,"trades":%d,"size_factor":%q}`, averageError.String(), count, factor.String()), - }) + }); err != nil { + return err + } + return s.recommendLiveReadonlyAfterSizeReduction(ctx, averageError, count, factor) } func (s Scheduler) averageExpectedErrorBps(ctx context.Context, tradeDate time.Time, limit int) (decimal.Decimal, int, bool, error) { @@ -1242,7 +1251,7 @@ func (s Scheduler) preTradeCheck(ctx context.Context, now time.Time, instrumentU TradingStatus: tradingStatus, QuoteReceivedAt: quoteReceivedAt, Now: now.UTC(), - MarketClose: s.marketCloseOn(now), + MarketClose: s.preTradeDeadlineOn(now, closingPosition), UnknownBrokerOrder: unknownOrder, UnknownBrokerHolding: unknownHolding, }) @@ -1373,6 +1382,38 @@ func (s Scheduler) marketCloseOn(now time.Time) time.Time { return s.cfg.MarketClose.On(now, s.cfg.Location).UTC() } +func (s Scheduler) preTradeDeadlineOn(now time.Time, closingPosition bool) time.Time { + if closingPosition && s.cfg.HardExitDeadline.Duration > 0 { + return s.cfg.HardExitDeadline.On(now, s.cfg.Location).UTC() + } + if !closingPosition && s.cfg.NoNewEntryAfter.Duration > 0 { + return s.cfg.NoNewEntryAfter.On(now, s.cfg.Location).UTC() + } + return s.marketCloseOn(now) +} + +func (s Scheduler) recommendLiveReadonlyAfterSizeReduction(ctx context.Context, averageError decimal.Decimal, count int, factor decimal.Decimal) error { + if s.cfg.Mode != domain.ModeLiveTrade { + return nil + } + message := fmt.Sprintf("size reduction remains active after %d trades; consider switching to live_readonly until expected error recovers", count) + if s.svc.Notifier != nil { + _ = s.svc.Notifier.Alert(ctx, message) + } + return s.svc.Repo.InsertRiskEvent(ctx, domain.RiskEvent{ + Severity: domain.SeverityAlert, + EventType: "live_readonly_recommended", + Message: message, + ContextJSON: fmt.Sprintf( + `{"average_expected_error_bps":%q,"trades":%d,"size_factor":%q,"recommended_mode":%q}`, + averageError.String(), + count, + factor.String(), + domain.ModeLiveReadonly, + ), + }) +} + func (s Scheduler) recordPreTradeReject(ctx context.Context, instrumentUID, message, contextJSON string) error { return s.svc.Repo.InsertRiskEvent(ctx, domain.RiskEvent{ Severity: domain.SeverityWarn, @@ -1510,6 +1551,7 @@ func (s Scheduler) logWarn(msg string, args ...any) { } func entryFillDelta(previous, current domain.Order) domain.Order { + previous = execution.AggregatedOrderFill(previous) fill := current fill.FilledLots = current.FilledLots - previous.FilledLots if fill.FilledLots < 0 { @@ -1532,6 +1574,7 @@ func entryFillDelta(previous, current domain.Order) domain.Order { } func exitFillDelta(previous, current domain.Order) domain.Order { + previous = execution.AggregatedOrderFill(previous) fill := current fill.FilledLots = current.FilledLots - previous.FilledLots if fill.FilledLots < 0 { diff --git a/internal/scheduler/scheduler_test.go b/internal/scheduler/scheduler_test.go index 7b53971..40221c9 100644 --- a/internal/scheduler/scheduler_test.go +++ b/internal/scheduler/scheduler_test.go @@ -267,6 +267,34 @@ func TestEntryFillDeltaUsesOnlyNewlyExecutedLots(t *testing.T) { } } +func TestEntryFillDeltaUsesStoredMonitorAggregate(t *testing.T) { + previous := domain.Order{ + QuantityLots: 3, + FilledLots: 0, + AvgFillPrice: decimal.Zero, + Commission: decimal.Zero, + InstrumentUID: "uid", + RawStateJSON: `{"local":{"monitor_aggregate":{"quantity_lots":5,"filled_lots":2,"avg_fill_price":"100","commission":"0.40"}}}`, + } + current := domain.Order{ + QuantityLots: 5, + FilledLots: 4, + AvgFillPrice: decimal.NewFromInt(110), + Commission: decimal.NewFromInt(1), + InstrumentUID: "uid", + } + fill := entryFillDelta(previous, current) + if fill.FilledLots != 2 { + t.Fatalf("delta filled lots=%d, want 2 after stored aggregate", fill.FilledLots) + } + if !fill.AvgFillPrice.Equal(decimal.NewFromInt(120)) { + t.Fatalf("delta avg fill price=%s, want 120", fill.AvgFillPrice) + } + if !fill.Commission.Equal(decimal.RequireFromString("0.60")) { + t.Fatalf("delta commission=%s, want 0.60", fill.Commission) + } +} + func TestHardDeadlineMarksOpenPositionFailedAndHalts(t *testing.T) { ctx := context.Background() repo := testutil.NewMemoryRepository() @@ -421,6 +449,81 @@ func TestEntryInstrumentPreTradeRejectsQuarantineAndCommission(t *testing.T) { } } +func TestPlaceExitAllowsQuarantinedInstrumentForOpenPosition(t *testing.T) { + ctx := context.Background() + repo := testutil.NewMemoryRepository() + openDate := time.Date(2026, 6, 6, 0, 0, 0, 0, time.UTC) + exitDate := openDate.AddDate(0, 0, 1) + instrument := domain.Instrument{ + InstrumentUID: "uid", + Ticker: "TRUR", + ClassCode: "TQTF", + Enabled: true, + Quarantine: true, + QuarantineReason: "actual commission nonzero", + Lot: 1, + MinPriceIncrement: decimal.RequireFromString("0.01"), + Currency: "RUB", + FreeOrderLimitPerDay: -1, + } + if err := repo.UpsertInstrument(ctx, instrument); err != nil { + t.Fatal(err) + } + if err := repo.UpsertPosition(ctx, domain.Position{ + AccountIDHash: "hash", + InstrumentUID: "uid", + OpenTradeDate: openDate, + Lots: 2, + Lot: 1, + AvgBuyPrice: decimal.NewFromInt(100), + Status: domain.PositionHoldingOvernight, + }); err != nil { + t.Fatal(err) + } + gateway := tinvest.NewFakeGateway() + gateway.OrderBooks["uid"] = domain.OrderBook{ + InstrumentUID: "uid", + Bids: []domain.OrderBookLevel{{Price: decimal.NewFromInt(100), QuantityLots: 10}}, + Asks: []domain.OrderBookLevel{{Price: decimal.RequireFromString("100.10"), QuantityLots: 10}}, + ReceivedAt: time.Now().UTC(), + } + execEngine := execution.NewEngine(domain.ModePaper, "account", gateway, repo) + s := Scheduler{ + cfg: Config{ + Mode: domain.ModePaper, + Location: time.UTC, + HardExitDeadline: mustTOD("23:00:00"), + }, + sm: statemachine.New(repo, domain.ModePaper), + svc: Services{ + Repo: repo, + Gateway: gateway, + MarketData: marketdata.NewLoader(repo, gateway), + Signals: signalengine.New(signalengine.Config{}), + FreeOrders: risk.NewFreeOrderBudget(repo), + Risk: risk.NewManager(repo, risk.ManagerConfig{}), + Execution: &execEngine, + Positions: position.NewManager(repo), + Notifier: &countNotifier{}, + AccountID: "account", + AccountIDHash: "hash", + }, + } + if err := repo.SaveSystemState(ctx, domain.StateWaitExitWindow, domain.ModePaper, false, "", "{}"); err != nil { + t.Fatal(err) + } + if err := s.placeExitOrders(ctx, exitDate.Add(10*time.Hour)); err != nil { + t.Fatal(err) + } + orders, err := repo.ListOrders(ctx, "hash", exitDate, exitDate) + if err != nil { + t.Fatal(err) + } + if len(orders) != 1 || orders[0].Side != domain.SideSell { + t.Fatalf("orders=%+v, want sell order for quarantined open position", orders) + } +} + func TestPreTradeDailyLossBreachHalts(t *testing.T) { ctx := context.Background() repo := testutil.NewMemoryRepository() @@ -461,6 +564,46 @@ func TestPreTradeDailyLossBreachHalts(t *testing.T) { } } +func TestPreTradeUsesPhaseDeadlineForMinTimeToClose(t *testing.T) { + ctx := context.Background() + repo := testutil.NewMemoryRepository() + now := time.Date(2026, 6, 8, 18, 37, 45, 0, time.UTC) + s := Scheduler{ + cfg: Config{ + Mode: domain.ModePaper, + Location: time.UTC, + NoNewEntryAfter: mustTOD("18:38:30"), + HardExitDeadline: mustTOD("18:40:00"), + MarketClose: mustTOD("23:00:00"), + }, + svc: Services{ + Repo: repo, + Risk: risk.NewManager(repo, risk.ManagerConfig{MinTimeToClose: 90 * time.Second}), + AccountIDHash: "hash", + }, + } + entry, err := s.preTradeCheck(ctx, now, "uid", domain.Portfolio{ + Equity: decimal.NewFromInt(10000), + Cash: decimal.NewFromInt(10000), + }, 0, false, domain.TradingStatusNormal, now) + if err != nil { + t.Fatal(err) + } + if entry.Allowed || entry.Reason != "min_time_to_close_sec" { + t.Fatalf("entry result=%+v, want min_time_to_close_sec reject before NoNewEntryAfter", entry) + } + exit, err := s.preTradeCheck(ctx, now, "uid", domain.Portfolio{ + Equity: decimal.NewFromInt(10000), + Cash: decimal.NewFromInt(10000), + }, 1, true, domain.TradingStatusNormal, now) + if err != nil { + t.Fatal(err) + } + if !exit.Allowed { + t.Fatalf("exit result=%+v, want allowed before HardExitDeadline", exit) + } +} + func TestStepSendsMissedDailyReportAfterEntrySignalTime(t *testing.T) { ctx := context.Background() repo := testutil.NewMemoryRepository() @@ -555,6 +698,120 @@ func TestSizeReductionRuleCutsSizerAfterBadExpectedErrors(t *testing.T) { } } +func TestSizeReductionRuleBoundaryMinusTenDoesNotCut(t *testing.T) { + ctx := context.Background() + repo := testutil.NewMemoryRepository() + tradeDate := time.Date(2026, 6, 30, 0, 0, 0, 0, time.UTC) + for i := 0; i < sizeReductionWindowTrades; i++ { + date := tradeDate.AddDate(0, 0, -i) + if err := repo.UpsertSignal(ctx, domain.Signal{ + TradeDate: date, + InstrumentUID: "uid", + Decision: domain.DecisionEnter, + NetEdgeBps: decimal.NewFromInt(20), + }); err != nil { + t.Fatal(err) + } + if err := repo.UpsertPosition(ctx, domain.Position{ + AccountIDHash: "hash", + InstrumentUID: "uid", + OpenTradeDate: date, + Lot: 1, + Status: domain.PositionExitFilled, + RealizedEdgeBps: decimal.NewFromInt(10), + UpdatedAt: date.Add(time.Hour), + }); err != nil { + t.Fatal(err) + } + } + s := Scheduler{ + svc: Services{ + Repo: repo, + AccountIDHash: "hash", + Sizer: risk.NewSizer(risk.SizingConfig{ + MaxPositionPct: decimal.NewFromInt(1), + MaxTotalExposurePct: decimal.NewFromInt(1), + MaxParticipationRate: decimal.NewFromInt(1), + CashUsageBuffer: decimal.NewFromInt(1), + RiskBudgetPerInstrumentPct: decimal.NewFromInt(1), + MinOrderNotionalRUB: decimal.NewFromInt(1), + }), + }, + } + if err := s.applySizeReductionRule(ctx, tradeDate, true); err != nil { + t.Fatal(err) + } + sized := s.svc.Sizer.Size(risk.SizingInput{ + Portfolio: domain.Portfolio{Equity: decimal.NewFromInt(10_000), Cash: decimal.NewFromInt(10_000)}, + SelectedInstruments: 1, + LimitPrice: decimal.NewFromInt(100), + Lot: 1, + EntryIntervalVolume: decimal.NewFromInt(10_000), + ExitIntervalVolume: decimal.NewFromInt(10_000), + Q05OvernightAbs: decimal.NewFromInt(1), + }) + if sized.Lots != 100 { + t.Fatalf("lots=%d, want unreduced 100 at -10.00 bps boundary", sized.Lots) + } + if len(repo.RiskEvents) != 0 { + t.Fatalf("risk events=%+v, want none at boundary", repo.RiskEvents) + } +} + +func TestSizeReductionRuleRecommendsLiveReadonlyInLiveTrade(t *testing.T) { + ctx := context.Background() + repo := testutil.NewMemoryRepository() + notifier := &countNotifier{} + tradeDate := time.Date(2026, 6, 30, 0, 0, 0, 0, time.UTC) + for i := 0; i < sizeReductionWindowTrades; i++ { + date := tradeDate.AddDate(0, 0, -i) + if err := repo.UpsertSignal(ctx, domain.Signal{ + TradeDate: date, + InstrumentUID: "uid", + Decision: domain.DecisionEnter, + NetEdgeBps: decimal.NewFromInt(20), + }); err != nil { + t.Fatal(err) + } + if err := repo.UpsertPosition(ctx, domain.Position{ + AccountIDHash: "hash", + InstrumentUID: "uid", + OpenTradeDate: date, + Lot: 1, + Status: domain.PositionExitFilled, + RealizedEdgeBps: decimal.Zero, + UpdatedAt: date.Add(time.Hour), + }); err != nil { + t.Fatal(err) + } + } + s := Scheduler{ + cfg: Config{Mode: domain.ModeLiveTrade}, + svc: Services{ + Repo: repo, + AccountIDHash: "hash", + Notifier: notifier, + Sizer: risk.NewSizer(risk.SizingConfig{ + MaxPositionPct: decimal.NewFromInt(1), + MaxTotalExposurePct: decimal.NewFromInt(1), + MaxParticipationRate: decimal.NewFromInt(1), + CashUsageBuffer: decimal.NewFromInt(1), + RiskBudgetPerInstrumentPct: decimal.NewFromInt(1), + MinOrderNotionalRUB: decimal.NewFromInt(1), + }), + }, + } + if err := s.applySizeReductionRule(ctx, tradeDate, true); err != nil { + t.Fatal(err) + } + if len(repo.RiskEvents) != 2 || repo.RiskEvents[1].EventType != "live_readonly_recommended" || repo.RiskEvents[1].Severity != domain.SeverityAlert { + t.Fatalf("risk events=%+v, want live_readonly recommendation alert", repo.RiskEvents) + } + if notifier.alerts != 1 { + t.Fatalf("alerts=%d, want 1", notifier.alerts) + } +} + func TestBatchSignalLimitsCapSlotsAndExposure(t *testing.T) { s := Scheduler{ cfg: Config{MaxOpenPositions: 5}, @@ -609,6 +866,57 @@ func TestBatchSignalLimitsCapSlotsAndExposure(t *testing.T) { } } +func TestBatchSignalLimitsReserveCashAcrossCandidates(t *testing.T) { + s := Scheduler{ + cfg: Config{MaxOpenPositions: 5}, + svc: Services{Sizer: risk.NewSizer(risk.SizingConfig{ + MaxPositionPct: decimal.NewFromInt(1), + MaxTotalExposurePct: decimal.NewFromInt(1), + MaxParticipationRate: decimal.NewFromInt(1), + CashUsageBuffer: decimal.NewFromInt(1), + RiskBudgetPerInstrumentPct: decimal.NewFromInt(1), + MinOrderNotionalRUB: decimal.NewFromInt(1), + })}, + } + book := domain.OrderBook{ + Bids: []domain.OrderBookLevel{{Price: decimal.NewFromInt(100), QuantityLots: 10}}, + Asks: []domain.OrderBookLevel{{Price: decimal.NewFromInt(102), QuantityLots: 10}}, + } + generated := make([]signalCandidate, 0, 5) + for i := 0; i < 5; i++ { + uid := string(rune('a' + i)) + generated = append(generated, signalCandidate{ + Signal: domain.Signal{ + InstrumentUID: uid, + Decision: domain.DecisionEnter, + Score: decimal.NewFromInt(int64(100 - i)), + }, + Instrument: domain.Instrument{InstrumentUID: uid, Lot: 1, MinPriceIncrement: decimal.NewFromInt(1)}, + Feature: domain.FeatureSet{ + EntryIntervalVolume: decimal.NewFromInt(1_000_000), + ExitIntervalVolume: decimal.NewFromInt(1_000_000), + Q05On60Abs: decimal.NewFromInt(1), + }, + Book: book, + }) + } + s.applyBatchSignalLimits(domain.Portfolio{Equity: decimal.NewFromInt(100_000), Cash: decimal.NewFromInt(30_000)}, decimal.Zero, 0, generated) + enters := 0 + total := decimal.Zero + for _, candidate := range generated { + if candidate.Signal.Decision == domain.DecisionEnter { + enters++ + total = total.Add(candidate.Signal.TargetNotional) + } + } + if enters != 2 { + t.Fatalf("enter signals=%d, want only candidates that fit reserved cash", enters) + } + if total.GreaterThan(decimal.NewFromInt(30_000)) { + t.Fatalf("total target notional=%s exceeds cash", total) + } +} + func TestPlaceEntryRejectsWideSpreadBeforeOrder(t *testing.T) { ctx := context.Background() repo := testutil.NewMemoryRepository() diff --git a/internal/statemachine/system.go b/internal/statemachine/system.go index f4cada1..9e34a48 100644 --- a/internal/statemachine/system.go +++ b/internal/statemachine/system.go @@ -100,8 +100,8 @@ func legalTransition(from, to domain.SystemState) bool { } allowed := map[domain.SystemState][]domain.SystemState{ domain.StateInit: {domain.StateSyncInstruments, domain.StateWaitExitWindow, domain.StatePlaceExitOrders, domain.StateMonitorExitOrders, domain.StateGenerateSignals, domain.StatePlaceEntryOrders, domain.StateHoldOvernight, domain.StateReconcile, domain.StateSleep}, - domain.StateSyncInstruments: {domain.StateSyncMarketData}, - domain.StateSyncMarketData: {domain.StateGenerateSignals}, + domain.StateSyncInstruments: {domain.StateSyncMarketData, domain.StateInit}, + domain.StateSyncMarketData: {domain.StateGenerateSignals, domain.StateInit}, domain.StateGenerateSignals: {domain.StateWaitEntryWindow, domain.StatePlaceEntryOrders, domain.StateHoldOvernight, domain.StateSleep}, domain.StateWaitEntryWindow: {domain.StatePlaceEntryOrders, domain.StateSleep}, domain.StatePlaceEntryOrders: {domain.StateMonitorEntryOrders, domain.StateHoldOvernight, domain.StateWaitExitWindow, domain.StatePlaceExitOrders, domain.StateMonitorExitOrders, domain.StateReconcile}, diff --git a/internal/statemachine/system_test.go b/internal/statemachine/system_test.go index 6fc65f8..8f1323a 100644 --- a/internal/statemachine/system_test.go +++ b/internal/statemachine/system_test.go @@ -77,6 +77,18 @@ func TestCalendarRecoveryAllowsRestartInsideExitWindow(t *testing.T) { } } +func TestSyncStatesCanRecoverToInit(t *testing.T) { + ctx := context.Background() + repo := testutil.NewMemoryRepository() + system := New(repo, domain.ModePaper) + if err := system.Transition(ctx, domain.StateSyncInstruments, domain.StateInit); err != nil { + t.Fatalf("SYNC_INSTRUMENTS -> INIT should be legal recovery: %v", err) + } + if err := system.Transition(ctx, domain.StateSyncMarketData, domain.StateInit); err != nil { + t.Fatalf("SYNC_MARKET_DATA -> INIT should be legal recovery: %v", err) + } +} + func TestRecoverFromMonitorEntryHaltsOnCriticalReconciliationDiff(t *testing.T) { ctx := context.Background() repo := testutil.NewMemoryRepository()