Handle server time outages as soft pauses
Deploy / Test, build and deploy (push) Successful in 2m14s
Deploy / Test, build and deploy (push) Successful in 2m14s
This commit is contained in:
@@ -1061,7 +1061,7 @@ func (s *Scheduler) recordInfrastructureFailure(ctx context.Context, err error)
|
|||||||
now := s.nowUTC()
|
now := s.nowUTC()
|
||||||
if s.infraFailedSince.IsZero() {
|
if s.infraFailedSince.IsZero() {
|
||||||
s.infraFailedSince = now
|
s.infraFailedSince = now
|
||||||
s.logWarn("infrastructure check failed; waiting for outage threshold", "err", err, "threshold", s.cfg.APIOutageHalt)
|
s.logWarn("infrastructure check failed; pausing scheduler step", "err", err, "threshold", s.cfg.APIOutageHalt)
|
||||||
if s.svc.Repo != nil {
|
if s.svc.Repo != nil {
|
||||||
if insertErr := s.svc.Repo.InsertRiskEvent(ctx, domain.RiskEvent{
|
if insertErr := s.svc.Repo.InsertRiskEvent(ctx, domain.RiskEvent{
|
||||||
TS: now,
|
TS: now,
|
||||||
@@ -1075,10 +1075,12 @@ func (s *Scheduler) recordInfrastructureFailure(ctx context.Context, err error)
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if s.cfg.APIOutageHalt <= 0 || now.Sub(s.infraFailedSince) >= s.cfg.APIOutageHalt {
|
elapsed := now.Sub(s.infraFailedSince)
|
||||||
return err
|
if s.cfg.APIOutageHalt > 0 && elapsed >= s.cfg.APIOutageHalt {
|
||||||
|
s.logWarn("infrastructure check still failing after outage threshold; continuing soft pause", "err", err, "elapsed", elapsed, "threshold", s.cfg.APIOutageHalt)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
s.logWarn("infrastructure check still failing", "err", err, "elapsed", now.Sub(s.infraFailedSince), "threshold", s.cfg.APIOutageHalt)
|
s.logWarn("infrastructure check still failing; pausing scheduler step", "err", err, "elapsed", elapsed, "threshold", s.cfg.APIOutageHalt)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1397,7 +1399,6 @@ func (s Scheduler) unknownBrokerState(ctx context.Context, portfolio domain.Port
|
|||||||
func isHardHaltPreTradeReason(reason string) bool {
|
func isHardHaltPreTradeReason(reason string) bool {
|
||||||
switch reason {
|
switch reason {
|
||||||
case "database_unavailable",
|
case "database_unavailable",
|
||||||
"server_time_unavailable",
|
|
||||||
"server_clock_drift_too_high",
|
"server_clock_drift_too_high",
|
||||||
"unknown_broker_order",
|
"unknown_broker_order",
|
||||||
"unknown_broker_position",
|
"unknown_broker_position",
|
||||||
|
|||||||
@@ -115,6 +115,55 @@ func TestClockDriftHardLimitHaltsImmediately(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestServerTimeUnavailableSoftPausesAfterOutageThreshold(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
gateway := tinvest.NewFakeGateway()
|
||||||
|
gateway.ServerTimeError = context.DeadlineExceeded
|
||||||
|
repo := testutil.NewMemoryRepository()
|
||||||
|
now := time.Date(2026, 6, 18, 9, 0, 0, 0, time.UTC)
|
||||||
|
s := &Scheduler{
|
||||||
|
clock: fixedClock{now: now},
|
||||||
|
cfg: Config{
|
||||||
|
Mode: domain.ModeSandbox,
|
||||||
|
MaxClockDrift: 2 * time.Second,
|
||||||
|
APIOutageHalt: 180 * time.Second,
|
||||||
|
},
|
||||||
|
svc: Services{
|
||||||
|
Repo: repo,
|
||||||
|
Gateway: gateway,
|
||||||
|
Risk: risk.NewManager(repo, risk.ManagerConfig{}),
|
||||||
|
Notifier: &countNotifier{},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := s.checkInfrastructure(ctx); err != nil {
|
||||||
|
t.Fatalf("first infrastructure check err=%v, want soft pause", err)
|
||||||
|
}
|
||||||
|
if repo.Halted {
|
||||||
|
t.Fatalf("system halted on first server time outage: reason=%q", repo.HaltReason)
|
||||||
|
}
|
||||||
|
if len(repo.RiskEvents) != 1 || repo.RiskEvents[0].EventType != "infrastructure_outage_started" {
|
||||||
|
t.Fatalf("risk events=%+v", repo.RiskEvents)
|
||||||
|
}
|
||||||
|
|
||||||
|
s.clock = fixedClock{now: now.Add(5 * time.Minute)}
|
||||||
|
if err := s.checkInfrastructure(ctx); err != nil {
|
||||||
|
t.Fatalf("post-threshold infrastructure check err=%v, want soft pause", err)
|
||||||
|
}
|
||||||
|
if repo.Halted {
|
||||||
|
t.Fatalf("system halted after server time outage threshold: reason=%q", repo.HaltReason)
|
||||||
|
}
|
||||||
|
|
||||||
|
gateway.ServerTimeError = nil
|
||||||
|
gateway.ServerTime = now.Add(5 * time.Minute)
|
||||||
|
if err := s.checkInfrastructure(ctx); err != nil {
|
||||||
|
t.Fatalf("recovered infrastructure check err=%v", err)
|
||||||
|
}
|
||||||
|
if !s.infraFailedSince.IsZero() {
|
||||||
|
t.Fatalf("infraFailedSince=%s, want zero after recovery", s.infraFailedSince)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestStepIsIdempotentAfterSignalPreparation(t *testing.T) {
|
func TestStepIsIdempotentAfterSignalPreparation(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
repo := testutil.NewMemoryRepository()
|
repo := testutil.NewMemoryRepository()
|
||||||
@@ -602,6 +651,45 @@ func TestPreTradeClockDriftBreachHalts(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPreTradeServerTimeUnavailableRejectsWithoutHalting(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
repo := testutil.NewMemoryRepository()
|
||||||
|
now := time.Date(2026, 6, 8, 18, 20, 0, 0, time.UTC)
|
||||||
|
gateway := tinvest.NewFakeGateway()
|
||||||
|
gateway.ServerTimeError = context.DeadlineExceeded
|
||||||
|
notifier := &countNotifier{}
|
||||||
|
s := Scheduler{
|
||||||
|
cfg: Config{
|
||||||
|
Mode: domain.ModeSandbox,
|
||||||
|
Location: time.UTC,
|
||||||
|
MaxClockDrift: 2 * time.Second,
|
||||||
|
},
|
||||||
|
svc: Services{
|
||||||
|
Repo: repo,
|
||||||
|
Gateway: gateway,
|
||||||
|
Risk: risk.NewManager(repo, risk.ManagerConfig{}),
|
||||||
|
Notifier: notifier,
|
||||||
|
AccountIDHash: "hash",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
result, err := s.preTradeCheck(ctx, now, "uid", domain.Portfolio{
|
||||||
|
Equity: decimal.NewFromInt(10000),
|
||||||
|
Cash: decimal.NewFromInt(10000),
|
||||||
|
}, 0, false, domain.TradingStatusNormal, now)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err=%v, want reject without hard halt error", err)
|
||||||
|
}
|
||||||
|
if result.Allowed || result.Reason != "server_time_unavailable" {
|
||||||
|
t.Fatalf("result=%+v, want server_time_unavailable reject", result)
|
||||||
|
}
|
||||||
|
if repo.Halted || repo.HaltReason != "" {
|
||||||
|
t.Fatalf("halted=%v reason=%q, want no halt", repo.Halted, repo.HaltReason)
|
||||||
|
}
|
||||||
|
if notifier.alerts != 0 {
|
||||||
|
t.Fatalf("alerts=%d, want 0", notifier.alerts)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestPreTradeUsesPhaseDeadlineForMinTimeToClose(t *testing.T) {
|
func TestPreTradeUsesPhaseDeadlineForMinTimeToClose(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
repo := testutil.NewMemoryRepository()
|
repo := testutil.NewMemoryRepository()
|
||||||
|
|||||||
@@ -43,6 +43,7 @@ type FakeGateway struct {
|
|||||||
Portfolio domain.Portfolio
|
Portfolio domain.Portfolio
|
||||||
Operations []domain.Operation
|
Operations []domain.Operation
|
||||||
ServerTime time.Time
|
ServerTime time.Time
|
||||||
|
ServerTimeError error
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewFakeGateway() *FakeGateway {
|
func NewFakeGateway() *FakeGateway {
|
||||||
@@ -239,6 +240,9 @@ func (f *FakeGateway) GetOperations(_ context.Context, _ string, from, to time.T
|
|||||||
func (f *FakeGateway) GetServerTime(context.Context) (time.Time, error) {
|
func (f *FakeGateway) GetServerTime(context.Context) (time.Time, error) {
|
||||||
f.mu.Lock()
|
f.mu.Lock()
|
||||||
defer f.mu.Unlock()
|
defer f.mu.Unlock()
|
||||||
|
if f.ServerTimeError != nil {
|
||||||
|
return time.Time{}, f.ServerTimeError
|
||||||
|
}
|
||||||
if f.ServerTime.IsZero() {
|
if f.ServerTime.IsZero() {
|
||||||
return time.Now().UTC(), nil
|
return time.Now().UTC(), nil
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user