eighth version

This commit is contained in:
2026-06-08 11:55:36 +00:00
parent ebea17b411
commit e8b7d8e27c
15 changed files with 431 additions and 42 deletions
+46 -5
View File
@@ -153,18 +153,34 @@ func (s *Scheduler) Step(ctx context.Context) error {
return nil
}
phase := s.phase(now)
current, halted, reason, err := s.svc.Repo.GetSystemState(ctx)
if err != nil {
return err
}
if halted || current == domain.StateHalted {
return fmt.Errorf("%w: %s", statemachine.ErrSystemHalted, reason)
}
switch phase {
case domain.StateWaitExitWindow:
return s.waitExit(ctx, now)
case domain.StatePlaceExitOrders:
if current == domain.StateMonitorExitOrders {
return s.monitorExitOrders(ctx, now)
}
return s.placeExitOrders(ctx, now)
case domain.StateMonitorExitOrders:
return s.monitorExitOrders(ctx, now)
case domain.StateReconcile:
return s.failOpenPositionsAtHardDeadline(ctx)
case domain.StateGenerateSignals:
if signalPhaseAlreadyPrepared(current) {
return s.sm.Heartbeat(ctx, current)
}
return s.prepareSignals(ctx, now)
case domain.StatePlaceEntryOrders:
if current == domain.StateMonitorEntryOrders {
return s.monitorEntryOrders(ctx, now)
}
return s.placeEntryOrders(ctx, now)
case domain.StateMonitorEntryOrders:
return s.monitorEntryOrders(ctx, now)
@@ -492,7 +508,7 @@ func (s *Scheduler) placeEntryOrders(ctx context.Context, now time.Time) error {
}
continue
}
pre, err := s.preTradeCheck(ctx, now, sig.InstrumentUID, portfolio, projectedOpenPositions, false, tradingStatus, book.ReceivedAt)
pre, err := s.preTradeCheck(ctx, now, sig.InstrumentUID, portfolio, projectedOpenPositions, false, tradingStatus, quoteTimestamp(book))
if err != nil {
return err
}
@@ -664,7 +680,7 @@ func (s *Scheduler) placeExitOrders(ctx context.Context, now time.Time) error {
if err != nil {
return err
}
pre, err := s.preTradeCheck(ctx, now, pos.InstrumentUID, portfolio, len(positionsList), true, tradingStatus, book.ReceivedAt)
pre, err := s.preTradeCheck(ctx, now, pos.InstrumentUID, portfolio, len(positionsList), true, tradingStatus, quoteTimestamp(book))
if err != nil {
return err
}
@@ -954,7 +970,11 @@ func (s *Scheduler) checkInfrastructure(ctx context.Context) error {
}
drift := timeutil.Drift(s.nowUTC(), serverTime)
if drift > s.cfg.MaxClockDrift {
return s.recordInfrastructureFailure(ctx, fmt.Errorf("server_clock_drift_too_high: %s > %s", drift, s.cfg.MaxClockDrift))
reason := fmt.Sprintf("server_clock_drift_too_high: %s > %s", drift, s.cfg.MaxClockDrift)
if err := s.halt(ctx, "server_clock_drift_too_high", reason, ""); err != nil {
return err
}
return fmt.Errorf("%w: %s", statemachine.ErrSystemHalted, reason)
}
s.infraFailedSince = time.Time{}
return nil
@@ -1177,7 +1197,7 @@ func (s Scheduler) repostPreTradeCheck(ctx context.Context, now time.Time, order
if err != nil {
return err
}
pre, err := s.preTradeCheck(ctx, now, order.InstrumentUID, portfolio, len(openPositions), order.Side == domain.SideSell, tradingStatus, book.ReceivedAt)
pre, err := s.preTradeCheck(ctx, now, order.InstrumentUID, portfolio, len(openPositions), order.Side == domain.SideSell, tradingStatus, quoteTimestamp(book))
if err != nil {
return err
}
@@ -1457,6 +1477,25 @@ func isSizingSkipReason(reason string) bool {
return reason == "lots_below_one" || reason == "min_order_notional"
}
func signalPhaseAlreadyPrepared(state domain.SystemState) bool {
switch state {
case domain.StateWaitEntryWindow,
domain.StatePlaceEntryOrders,
domain.StateMonitorEntryOrders,
domain.StateHoldOvernight:
return true
default:
return false
}
}
func quoteTimestamp(book domain.OrderBook) time.Time {
if !book.Time.IsZero() {
return book.Time.UTC()
}
return book.ReceivedAt.UTC()
}
func (s Scheduler) hasStateMachine() bool {
return s.sm != (statemachine.System{})
}
@@ -1488,7 +1527,9 @@ func (s Scheduler) transitionTo(ctx context.Context, to domain.SystemState) erro
}
func (s Scheduler) halt(ctx context.Context, eventType, reason, instrumentUID string) error {
_ = s.svc.Notifier.Alert(ctx, fmt.Sprintf("%s: %s", eventType, reason))
if s.svc.Notifier != nil {
_ = s.svc.Notifier.Alert(ctx, fmt.Sprintf("%s: %s", eventType, reason))
}
return s.svc.Risk.Halt(ctx, s.cfg.Mode, eventType, reason, instrumentUID)
}
+82 -7
View File
@@ -85,23 +85,98 @@ func TestPhaseHonorsExitNotBeforeWhenWindowStartsEarlier(t *testing.T) {
}
}
func TestInfrastructureOutageRequiresThreshold(t *testing.T) {
func TestClockDriftHardLimitHaltsImmediately(t *testing.T) {
gateway := tinvest.NewFakeGateway()
gateway.ServerTime = time.Now().UTC().Add(-10 * time.Second)
repo := testutil.NewMemoryRepository()
notifier := &countNotifier{}
s := &Scheduler{
cfg: Config{
Mode: domain.ModeSandbox,
MaxClockDrift: 2 * time.Second,
APIOutageHalt: 180 * time.Second,
},
svc: Services{Gateway: gateway},
svc: Services{
Gateway: gateway,
Risk: risk.NewManager(repo, risk.ManagerConfig{}),
Notifier: notifier,
},
}
if err := s.checkInfrastructure(context.Background()); err != nil {
t.Fatalf("first infrastructure failure should be tolerated: %v", err)
if err := s.checkInfrastructure(context.Background()); !errors.Is(err, statemachine.ErrSystemHalted) {
t.Fatalf("err=%v, want immediate halt on clock drift", err)
}
s.infraFailedSince = time.Now().UTC().Add(-181 * time.Second)
if err := s.checkInfrastructure(context.Background()); err == nil {
t.Fatalf("expected outage after threshold")
if !repo.Halted || repo.HaltReason == "" {
t.Fatalf("system was not halted: state=%s halted=%v reason=%q", repo.State, repo.Halted, repo.HaltReason)
}
if notifier.alerts != 1 {
t.Fatalf("alerts=%d, want 1", notifier.alerts)
}
}
func TestStepIsIdempotentAfterSignalPreparation(t *testing.T) {
ctx := context.Background()
repo := testutil.NewMemoryRepository()
now := time.Date(2026, 6, 8, 18, 15, 0, 0, time.UTC)
if err := repo.SaveSystemState(ctx, domain.StateWaitEntryWindow, domain.ModePaper, false, "", "{}"); err != nil {
t.Fatal(err)
}
s := Scheduler{
clock: fixedClock{now: now},
cfg: Config{
Mode: domain.ModePaper,
Location: time.UTC,
EntrySignalTime: mustTOD("18:10:00"),
EntryWindowStart: mustTOD("18:20:00"),
},
sm: statemachine.New(repo, domain.ModePaper),
svc: Services{
Repo: repo,
Notifier: &countNotifier{},
AccountIDHash: "hash",
},
}
if err := s.Step(ctx); err != nil {
t.Fatal(err)
}
state, halted, _, err := repo.GetSystemState(ctx)
if err != nil {
t.Fatal(err)
}
if halted || state != domain.StateWaitEntryWindow {
t.Fatalf("state=%s halted=%v, want WAIT_ENTRY_WINDOW without rollback", state, halted)
}
}
func TestStepMonitorsEntryOrdersOnRepeatedEntryWindowTick(t *testing.T) {
ctx := context.Background()
repo := testutil.NewMemoryRepository()
now := time.Date(2026, 6, 8, 18, 25, 0, 0, time.UTC)
if err := repo.SaveSystemState(ctx, domain.StateMonitorEntryOrders, domain.ModePaper, false, "", "{}"); err != nil {
t.Fatal(err)
}
s := Scheduler{
clock: fixedClock{now: now},
cfg: Config{
Mode: domain.ModePaper,
Location: time.UTC,
EntryWindowStart: mustTOD("18:20:00"),
NoNewEntryAfter: mustTOD("18:38:30"),
},
sm: statemachine.New(repo, domain.ModePaper),
svc: Services{
Repo: repo,
AccountIDHash: "hash",
},
}
if err := s.Step(ctx); err != nil {
t.Fatal(err)
}
state, halted, _, err := repo.GetSystemState(ctx)
if err != nil {
t.Fatal(err)
}
if halted || state != domain.StateMonitorEntryOrders {
t.Fatalf("state=%s halted=%v, want MONITOR_ENTRY_ORDERS", state, halted)
}
}