From 55e897909d3e93ca58a2be805d2b84dba0528ecf Mon Sep 17 00:00:00 2001 From: Valentin Popov Date: Thu, 18 Jun 2026 05:06:45 +0400 Subject: [PATCH] Handle server time outages as soft pauses --- internal/scheduler/scheduler.go | 11 ++-- internal/scheduler/scheduler_test.go | 88 ++++++++++++++++++++++++++++ internal/tinvest/gateway.go | 4 ++ 3 files changed, 98 insertions(+), 5 deletions(-) diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index 793c30f..0601f42 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -1061,7 +1061,7 @@ func (s *Scheduler) recordInfrastructureFailure(ctx context.Context, err error) now := s.nowUTC() if s.infraFailedSince.IsZero() { s.infraFailedSince = now - s.logWarn("infrastructure check failed; waiting for outage threshold", "err", err, "threshold", s.cfg.APIOutageHalt) + s.logWarn("infrastructure check failed; pausing scheduler step", "err", err, "threshold", s.cfg.APIOutageHalt) if s.svc.Repo != nil { if insertErr := s.svc.Repo.InsertRiskEvent(ctx, domain.RiskEvent{ TS: now, @@ -1075,10 +1075,12 @@ func (s *Scheduler) recordInfrastructureFailure(ctx context.Context, err error) } return nil } - if s.cfg.APIOutageHalt <= 0 || now.Sub(s.infraFailedSince) >= s.cfg.APIOutageHalt { - return err + elapsed := now.Sub(s.infraFailedSince) + if s.cfg.APIOutageHalt > 0 && elapsed >= s.cfg.APIOutageHalt { + s.logWarn("infrastructure check still failing after outage threshold; continuing soft pause", "err", err, "elapsed", elapsed, "threshold", s.cfg.APIOutageHalt) + return nil } - s.logWarn("infrastructure check still failing", "err", err, "elapsed", now.Sub(s.infraFailedSince), "threshold", s.cfg.APIOutageHalt) + s.logWarn("infrastructure check still failing; pausing scheduler step", "err", err, "elapsed", elapsed, "threshold", s.cfg.APIOutageHalt) return nil } @@ -1397,7 +1399,6 @@ func (s Scheduler) unknownBrokerState(ctx context.Context, portfolio domain.Port func isHardHaltPreTradeReason(reason string) bool { switch reason { case "database_unavailable", - "server_time_unavailable", "server_clock_drift_too_high", "unknown_broker_order", "unknown_broker_position", diff --git a/internal/scheduler/scheduler_test.go b/internal/scheduler/scheduler_test.go index 452d6c4..892c4ab 100644 --- a/internal/scheduler/scheduler_test.go +++ b/internal/scheduler/scheduler_test.go @@ -115,6 +115,55 @@ func TestClockDriftHardLimitHaltsImmediately(t *testing.T) { } } +func TestServerTimeUnavailableSoftPausesAfterOutageThreshold(t *testing.T) { + ctx := context.Background() + gateway := tinvest.NewFakeGateway() + gateway.ServerTimeError = context.DeadlineExceeded + repo := testutil.NewMemoryRepository() + now := time.Date(2026, 6, 18, 9, 0, 0, 0, time.UTC) + s := &Scheduler{ + clock: fixedClock{now: now}, + cfg: Config{ + Mode: domain.ModeSandbox, + MaxClockDrift: 2 * time.Second, + APIOutageHalt: 180 * time.Second, + }, + svc: Services{ + Repo: repo, + Gateway: gateway, + Risk: risk.NewManager(repo, risk.ManagerConfig{}), + Notifier: &countNotifier{}, + }, + } + + if err := s.checkInfrastructure(ctx); err != nil { + t.Fatalf("first infrastructure check err=%v, want soft pause", err) + } + if repo.Halted { + t.Fatalf("system halted on first server time outage: reason=%q", repo.HaltReason) + } + if len(repo.RiskEvents) != 1 || repo.RiskEvents[0].EventType != "infrastructure_outage_started" { + t.Fatalf("risk events=%+v", repo.RiskEvents) + } + + s.clock = fixedClock{now: now.Add(5 * time.Minute)} + if err := s.checkInfrastructure(ctx); err != nil { + t.Fatalf("post-threshold infrastructure check err=%v, want soft pause", err) + } + if repo.Halted { + t.Fatalf("system halted after server time outage threshold: reason=%q", repo.HaltReason) + } + + gateway.ServerTimeError = nil + gateway.ServerTime = now.Add(5 * time.Minute) + if err := s.checkInfrastructure(ctx); err != nil { + t.Fatalf("recovered infrastructure check err=%v", err) + } + if !s.infraFailedSince.IsZero() { + t.Fatalf("infraFailedSince=%s, want zero after recovery", s.infraFailedSince) + } +} + func TestStepIsIdempotentAfterSignalPreparation(t *testing.T) { ctx := context.Background() repo := testutil.NewMemoryRepository() @@ -602,6 +651,45 @@ func TestPreTradeClockDriftBreachHalts(t *testing.T) { } } +func TestPreTradeServerTimeUnavailableRejectsWithoutHalting(t *testing.T) { + ctx := context.Background() + repo := testutil.NewMemoryRepository() + now := time.Date(2026, 6, 8, 18, 20, 0, 0, time.UTC) + gateway := tinvest.NewFakeGateway() + gateway.ServerTimeError = context.DeadlineExceeded + notifier := &countNotifier{} + s := Scheduler{ + cfg: Config{ + Mode: domain.ModeSandbox, + Location: time.UTC, + MaxClockDrift: 2 * time.Second, + }, + svc: Services{ + Repo: repo, + Gateway: gateway, + Risk: risk.NewManager(repo, risk.ManagerConfig{}), + Notifier: notifier, + AccountIDHash: "hash", + }, + } + result, err := s.preTradeCheck(ctx, now, "uid", domain.Portfolio{ + Equity: decimal.NewFromInt(10000), + Cash: decimal.NewFromInt(10000), + }, 0, false, domain.TradingStatusNormal, now) + if err != nil { + t.Fatalf("err=%v, want reject without hard halt error", err) + } + if result.Allowed || result.Reason != "server_time_unavailable" { + t.Fatalf("result=%+v, want server_time_unavailable reject", result) + } + if repo.Halted || repo.HaltReason != "" { + t.Fatalf("halted=%v reason=%q, want no halt", repo.Halted, repo.HaltReason) + } + if notifier.alerts != 0 { + t.Fatalf("alerts=%d, want 0", notifier.alerts) + } +} + func TestPreTradeUsesPhaseDeadlineForMinTimeToClose(t *testing.T) { ctx := context.Background() repo := testutil.NewMemoryRepository() diff --git a/internal/tinvest/gateway.go b/internal/tinvest/gateway.go index 8d8d3e6..f7c8e58 100644 --- a/internal/tinvest/gateway.go +++ b/internal/tinvest/gateway.go @@ -43,6 +43,7 @@ type FakeGateway struct { Portfolio domain.Portfolio Operations []domain.Operation ServerTime time.Time + ServerTimeError error } func NewFakeGateway() *FakeGateway { @@ -239,6 +240,9 @@ func (f *FakeGateway) GetOperations(_ context.Context, _ string, from, to time.T func (f *FakeGateway) GetServerTime(context.Context) (time.Time, error) { f.mu.Lock() defer f.mu.Unlock() + if f.ServerTimeError != nil { + return time.Time{}, f.ServerTimeError + } if f.ServerTime.IsZero() { return time.Now().UTC(), nil }