From 20cc8506ad909b8d420e3d071dc0b9f380d89a6a Mon Sep 17 00:00:00 2001 From: Valentin Popov Date: Mon, 8 Jun 2026 14:25:44 +0000 Subject: [PATCH] ninth version --- README.md | 2 +- cmd/bot/main.go | 4 +- internal/app/app.go | 6 +- internal/app/app_test.go | 9 ++ internal/backtest/engine.go | 8 +- internal/backtest/lookahead_test.go | 28 +++- internal/execution/engine.go | 105 ++++++++++----- internal/execution/state_test.go | 71 +++++++++- internal/instruments/registry.go | 2 +- internal/instruments/registry_test.go | 38 ++++++ internal/marketdata/loader.go | 20 +-- internal/marketdata/loader_test.go | 44 +++++++ internal/reconciliation/engine_test.go | 14 ++ internal/report/daily.go | 92 ++++++++++++- internal/report/daily_test.go | 62 +++++++++ internal/scheduler/scheduler.go | 64 ++------- internal/tinvest/gateway.go | 140 ++++++++++++++++++-- internal/tinvest/paper.go | 19 ++- internal/tinvest/real.go | 176 ++++++++++++++++++++++--- internal/tinvest/real_test.go | 87 +++++++++++- internal/tinvest/sandbox.go | 4 +- 21 files changed, 847 insertions(+), 148 deletions(-) create mode 100644 internal/instruments/registry_test.go create mode 100644 internal/report/daily_test.go diff --git a/README.md b/README.md index 008fdc0..d0d4f91 100644 --- a/README.md +++ b/README.md @@ -32,7 +32,7 @@ APP_MODE=backtest go run ./cmd/bot | `APP_MODE` | `backtest`, `paper`, `sandbox`, `live_readonly`, `live_trade` | нет, в `.env.example`: `paper` | обязательна; только перечисленные значения | Режим работы. `backtest` не требует БД и API в `cmd/bot`; `paper` без `TINVEST_TOKEN` использует fake gateway, а с токеном берёт реальные market data/status через T-Invest при симулированных заявках; `sandbox`, `live_readonly`, `live_trade` подключаются к T-Invest API; `live_trade` может отправлять брокерские заявки. | | `APP_TIMEZONE` | `Europe/Moscow` | `Europe/Moscow` | жёстко только `Europe/Moscow` | Таймзона расписания торговых окон. Изменить нельзя без изменения валидации. | | `APP_LOG_LEVEL` | `debug`, `info`, `warn`, `warning`, `error` | `info` | неизвестное значение трактуется как `info` | Уровень JSON-логов. Ниже уровень - больше диагностических записей. | -| `APP_HEALTHCHECK_ADDR` | HTTP listen address, например `:3300` или `127.0.0.1:3300` | `:3300` | без отдельной валидации | Адрес `/health` и `/ready`. При изменении меняется порт или интерфейс healthcheck-сервера. | +| `APP_HEALTHCHECK_ADDR` | HTTP listen address, например `:3300` или `127.0.0.1:3300` | `:3300` | без отдельной валидации | Адрес `/health` и `/ready`; CLI `-healthcheck` по умолчанию проверяет `/ready`. При изменении меняется порт или интерфейс healthcheck-сервера. | | `APP_SHUTDOWN_TIMEOUT_SEC` | целое число секунд | `30` | должно быть `> 0` | Таймаут graceful shutdown для HTTP healthcheck при остановке. | ### TINVEST diff --git a/cmd/bot/main.go b/cmd/bot/main.go index 3ce0884..b70531b 100644 --- a/cmd/bot/main.go +++ b/cmd/bot/main.go @@ -14,8 +14,8 @@ func main() { halt := flag.Bool("halt", false, "manually set HALT and stop new automated actions") unhalt := flag.Bool("unhalt", false, "manually clear HALT after reconciliation") reason := flag.String("reason", "", "audit reason for -halt or -unhalt") - health := flag.Bool("healthcheck", false, "check local /health endpoint") - healthURL := flag.String("healthcheck-url", "", "healthcheck URL; default http://127.0.0.1:3300/health") + health := flag.Bool("healthcheck", false, "check local /ready endpoint") + healthURL := flag.String("healthcheck-url", "", "healthcheck URL; default http://127.0.0.1:3300/ready") flag.Parse() if err := app.Run(context.Background(), app.Options{ diff --git a/internal/app/app.go b/internal/app/app.go index dcfeb78..a400335 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -58,7 +58,7 @@ func Run(ctx context.Context, opts Options) error { if opts.Healthcheck { target := opts.HealthcheckURL if target == "" { - target = "http://127.0.0.1:3300/health" + target = "http://127.0.0.1:3300/ready" } return healthcheck.CheckEndpoint(ctx, target) } @@ -465,12 +465,12 @@ func accountHash(accountID string) string { func HealthURL(addr string) string { if strings.HasPrefix(addr, ":") { - return "http://127.0.0.1" + addr + "/health" + return "http://127.0.0.1" + addr + "/ready" } if _, err := url.ParseRequestURI(addr); err == nil && strings.HasPrefix(addr, "http") { return addr } - return "http://" + addr + "/health" + return "http://" + addr + "/ready" } func PingDB(ctx context.Context, db *sql.DB) error { diff --git a/internal/app/app_test.go b/internal/app/app_test.go index 3ea6461..e7e1944 100644 --- a/internal/app/app_test.go +++ b/internal/app/app_test.go @@ -36,6 +36,15 @@ func TestRunBacktestModeWithoutDB(t *testing.T) { } } +func TestHealthURLDefaultsToReadyEndpoint(t *testing.T) { + if got := HealthURL(":3300"); got != "http://127.0.0.1:3300/ready" { + t.Fatalf("HealthURL(:3300)=%s", got) + } + if got := HealthURL("127.0.0.1:3301"); got != "http://127.0.0.1:3301/ready" { + t.Fatalf("HealthURL(host)=%s", got) + } +} + func TestSeedPaperGatewayMakesSeedInstrumentsDiscoverable(t *testing.T) { ctx := context.Background() repo := testutil.NewMemoryRepository() diff --git a/internal/backtest/engine.go b/internal/backtest/engine.go index 0b87ff3..029441e 100644 --- a/internal/backtest/engine.go +++ b/internal/backtest/engine.go @@ -447,7 +447,11 @@ func (e Engine) evaluateCandidate(instrumentUID string, candles []domain.Candle, return candidate{}, false, nil } lot := e.lotFor(instrumentUID) - history := candles[:exitIndex] + entryIndex := exitIndex - 1 + if entryIndex <= 0 { + return candidate{}, false, nil + } + history := candles[:entryIndex] returns := make([]float64, 0, len(history)-1) for j := 1; j < len(history); j++ { r, err := features.OvernightReturn(history[j].Open, history[j-1].Close) @@ -490,7 +494,7 @@ func (e Engine) evaluateCandidate(instrumentUID string, candles []domain.Candle, case adv.LessThan(e.cfg.MinADVRUB): return candidate{}, false, nil } - entry := candles[exitIndex-1] + entry := candles[entryIndex] exit := candles[exitIndex] buy := entry.Close.Mul(decimal.NewFromInt(1).Add(money.FromBps(e.cfg.EntrySlippageBps))) sell := exit.Open.Mul(decimal.NewFromInt(1).Sub(money.FromBps(e.cfg.ExitSlippageBps))) diff --git a/internal/backtest/lookahead_test.go b/internal/backtest/lookahead_test.go index e15dc51..6c47789 100644 --- a/internal/backtest/lookahead_test.go +++ b/internal/backtest/lookahead_test.go @@ -86,7 +86,7 @@ func TestEvaluateCandidateUsesInstrumentLotAndTick(t *testing.T) { ExitSlippageBps: decimal.NewFromInt(13), }) candles := candidateCandles("uid") - got, ok, err := engine.evaluateCandidate("uid", candles, 3) + got, ok, err := engine.evaluateCandidate("uid", candles, 4) if err != nil { t.Fatal(err) } @@ -149,6 +149,31 @@ func TestBacktestWithoutMinuteDataDoesNotReportADVAsCapacity(t *testing.T) { } } +func TestEvaluateCandidateIgnoresEntryDayOpenForSignal(t *testing.T) { + engine := New(Config{ + RollingShort: 2, + RollingLong: 2, + MinTStat60: decimal.NewFromInt(-1), + MinWinRate60: decimal.NewFromFloat(0.1), + MinNetEdgeBps: decimal.NewFromInt(-1000), + MinADVRUB: decimal.NewFromInt(1), + Lot: 1, + }) + start := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC) + candles := []domain.Candle{ + {InstrumentUID: "uid", TradeDate: start, Open: decimal.NewFromInt(100), Close: decimal.NewFromInt(100), VolumeLots: decimal.NewFromInt(10)}, + {InstrumentUID: "uid", TradeDate: start.AddDate(0, 0, 1), Open: decimal.NewFromInt(101), Close: decimal.NewFromInt(100), VolumeLots: decimal.NewFromInt(10)}, + {InstrumentUID: "uid", TradeDate: start.AddDate(0, 0, 2), Open: decimal.NewFromInt(102), Close: decimal.NewFromInt(100), VolumeLots: decimal.NewFromInt(10)}, + {InstrumentUID: "uid", TradeDate: start.AddDate(0, 0, 3), Open: decimal.NewFromInt(1), Close: decimal.NewFromInt(100), VolumeLots: decimal.NewFromInt(10)}, + {InstrumentUID: "uid", TradeDate: start.AddDate(0, 0, 4), Open: decimal.NewFromInt(105), Close: decimal.NewFromInt(100), VolumeLots: decimal.NewFromInt(10)}, + } + if _, ok, err := engine.evaluateCandidate("uid", candles, 4); err != nil { + t.Fatal(err) + } else if !ok { + t.Fatal("entry-day open leaked into signal and rejected the candidate") + } +} + func TestLoadCandlesCSVWithMetadata(t *testing.T) { raw := strings.NewReader(`instrument_uid,trade_date,open,high,low,close,volume_lots,lot,min_price_increment uid,2024-01-02,100,101,99,100,10,10,0.05 @@ -172,5 +197,6 @@ func candidateCandles(uid string) []domain.Candle { {InstrumentUID: uid, TradeDate: start.AddDate(0, 0, 1), Open: decimal.NewFromInt(101), Close: decimal.NewFromInt(100), VolumeLots: decimal.NewFromInt(10)}, {InstrumentUID: uid, TradeDate: start.AddDate(0, 0, 2), Open: decimal.NewFromInt(102), Close: decimal.NewFromInt(100), VolumeLots: decimal.NewFromInt(10)}, {InstrumentUID: uid, TradeDate: start.AddDate(0, 0, 3), Open: decimal.NewFromInt(105), Close: decimal.NewFromInt(100), VolumeLots: decimal.NewFromInt(10)}, + {InstrumentUID: uid, TradeDate: start.AddDate(0, 0, 4), Open: decimal.NewFromInt(105), Close: decimal.NewFromInt(100), VolumeLots: decimal.NewFromInt(10)}, } } diff --git a/internal/execution/engine.go b/internal/execution/engine.go index 6fe2e3a..264a7e4 100644 --- a/internal/execution/engine.go +++ b/internal/execution/engine.go @@ -2,6 +2,7 @@ package execution import ( "context" + "encoding/json" "errors" "fmt" "sync" @@ -110,7 +111,7 @@ func (e *Engine) PlaceEntry(ctx context.Context, accountIDHash string, instrumen QuantityLots: lots, Status: domain.OrderStatusNew, AttemptNo: attempt, - RawStateJSON: "{}", + RawStateJSON: orderContextJSON(book), }, instrument.FreeOrderLimitPerDay) } @@ -137,7 +138,7 @@ func (e *Engine) PlaceExit(ctx context.Context, accountIDHash string, instrument QuantityLots: lots, Status: domain.OrderStatusNew, AttemptNo: attempt, - RawStateJSON: "{}", + RawStateJSON: orderContextJSON(book), }, instrument.FreeOrderLimitPerDay) } @@ -152,6 +153,9 @@ func (e *Engine) placeLimit(ctx context.Context, order domain.Order, freeOrderLi if e.mode != domain.ModePaper && !e.mode.AllowsBrokerOrders() { return order, ErrBrokerOrdersDisabled } + if e.gateway == nil { + return domain.Order{}, errors.New("gateway is nil") + } if e.store != nil { existing, err := e.findExisting(ctx, order) if err != nil { @@ -161,12 +165,6 @@ func (e *Engine) placeLimit(ctx context.Context, order domain.Order, freeOrderLi return existing, nil } } - if e.mode == domain.ModePaper { - return e.placePaperLimit(ctx, order, freeOrderLimit) - } - if e.gateway == nil { - return domain.Order{}, errors.New("gateway is nil") - } now := e.nowUTC() draft := order @@ -203,6 +201,7 @@ func (e *Engine) placeLimit(ctx context.Context, order domain.Order, freeOrderLi posted.QuantityLots = order.QuantityLots posted.AttemptNo = order.AttemptNo posted.TradeDate = order.TradeDate + posted.RawStateJSON = mergeRawStateJSON(order.RawStateJSON, posted.RawStateJSON) posted.CreatedAt = now posted.UpdatedAt = posted.CreatedAt if e.store != nil { @@ -213,28 +212,6 @@ func (e *Engine) placeLimit(ctx context.Context, order domain.Order, freeOrderLi return posted, nil } -func (e *Engine) placePaperLimit(ctx context.Context, order domain.Order, freeOrderLimit int) (domain.Order, error) { - now := e.nowUTC() - order.BrokerOrderID = "paper-" + order.ClientOrderID - order.FilledLots = order.QuantityLots - order.AvgFillPrice = order.LimitPrice - order.Status = domain.OrderStatusFilled - order.RawStateJSON = `{"paper_fill":true}` - order.CreatedAt = now - order.UpdatedAt = now - if e.store != nil { - if err := e.store.RunInTx(ctx, func(ctx context.Context, repo repository.Repository) error { - if err := repo.UpsertOrder(ctx, order); err != nil { - return fmt.Errorf("persist paper order: %w", err) - } - return repo.ReserveFreeOrders(ctx, order.TradeDate, order.InstrumentUID, 1, freeOrderLimit) - }); err != nil { - return domain.Order{}, err - } - } - return order, nil -} - func (e *Engine) findExisting(ctx context.Context, order domain.Order) (domain.Order, error) { orders, err := e.store.ListOrders(ctx, order.AccountIDHash, order.TradeDate, order.TradeDate) if err != nil { @@ -268,6 +245,7 @@ func (e *Engine) Refresh(ctx context.Context, order domain.Order) (domain.Order, state.LimitPrice = order.LimitPrice state.QuantityLots = order.QuantityLots state.AttemptNo = order.AttemptNo + state.RawStateJSON = mergeRawStateJSON(localRawStateJSON(order.RawStateJSON), state.RawStateJSON) if e.store != nil { if err := e.store.UpsertOrder(ctx, state); err != nil { return domain.Order{}, err @@ -583,6 +561,73 @@ func quoteTimestamp(book domain.OrderBook) time.Time { return book.ReceivedAt.UTC() } +func orderContextJSON(book domain.OrderBook) string { + bid, ask, err := bestBidAsk(book) + if err != nil { + return "{}" + } + mid := bid.Add(ask).Div(decimal.NewFromInt(2)) + context := map[string]any{ + "local_quote": map[string]string{ + "best_bid": bid.String(), + "best_ask": ask.String(), + "mid": mid.String(), + }, + } + if ts := quoteTimestamp(book); !ts.IsZero() { + context["local_quote"].(map[string]string)["quote_ts"] = ts.UTC().Format(time.RFC3339Nano) + } + raw, err := json.Marshal(context) + if err != nil { + return "{}" + } + return string(raw) +} + +func mergeRawStateJSON(localRaw, brokerRaw string) string { + local := decodeRawJSON(localRaw) + broker := decodeRawJSON(brokerRaw) + raw, err := json.Marshal(map[string]any{ + "local": local, + "broker": broker, + }) + if err != nil { + return brokerRaw + } + return string(raw) +} + +func decodeRawJSON(raw string) any { + if raw == "" { + return map[string]any{} + } + var value any + if err := json.Unmarshal([]byte(raw), &value); err != nil { + return raw + } + return value +} + +func localRawStateJSON(raw string) string { + var object map[string]any + if err := json.Unmarshal([]byte(raw), &object); err != nil { + return raw + } + if local, ok := object["local"]; ok { + encoded, err := json.Marshal(local) + if err == nil { + return string(encoded) + } + } + if quote, ok := object["local_quote"]; ok { + encoded, err := json.Marshal(map[string]any{"local_quote": quote}) + if err == nil { + return string(encoded) + } + } + return raw +} + func (e *Engine) lockFor(instrumentUID string) *sync.Mutex { value, _ := e.mu.LoadOrStore(instrumentUID, &sync.Mutex{}) lock, ok := value.(*sync.Mutex) diff --git a/internal/execution/state_test.go b/internal/execution/state_test.go index 2158131..2a68b42 100644 --- a/internal/execution/state_test.go +++ b/internal/execution/state_test.go @@ -3,6 +3,7 @@ package execution import ( "context" "errors" + "strings" "testing" "time" @@ -110,6 +111,35 @@ func TestPlaceEntryReservesFreeOrderBudgetAtomically(t *testing.T) { } } +func TestRefreshPreservesLocalQuoteContext(t *testing.T) { + ctx := context.Background() + repo := testutil.NewMemoryRepository() + gateway := tinvest.NewFakeGateway() + engine := NewEngine(domain.ModeSandbox, "account", gateway, repo) + instrument := domain.Instrument{ + InstrumentUID: "uid", + Lot: 1, + MinPriceIncrement: decimal.NewFromInt(1), + } + book := domain.OrderBook{ + InstrumentUID: "uid", + Bids: []domain.OrderBookLevel{{Price: decimal.NewFromInt(99), QuantityLots: 10}}, + Asks: []domain.OrderBookLevel{{Price: decimal.NewFromInt(101), QuantityLots: 10}}, + ReceivedAt: time.Now().UTC(), + } + order, err := engine.PlaceEntry(ctx, "hash", instrument, time.Now().UTC(), 1, book, 1, 1) + if err != nil { + t.Fatal(err) + } + refreshed, err := engine.Refresh(ctx, order) + if err != nil { + t.Fatal(err) + } + if !strings.Contains(refreshed.RawStateJSON, "local_quote") || !strings.Contains(refreshed.RawStateJSON, `"mid":"100"`) { + t.Fatalf("raw state lost local quote context: %s", refreshed.RawStateJSON) + } +} + func TestMonitorOnceUsesInjectedClockForDeadline(t *testing.T) { ctx := context.Background() repo := testutil.NewMemoryRepository() @@ -160,10 +190,17 @@ func TestMonitorOnceUsesInjectedClockForDeadline(t *testing.T) { } } -func TestPaperPlaceEntryFillsAndCountsSubmittedOrder(t *testing.T) { +func TestPaperPlaceEntryFillsOnlyWhenOrderBookCrosses(t *testing.T) { ctx := context.Background() repo := testutil.NewMemoryRepository() - engine := NewEngine(domain.ModePaper, "account", tinvest.NewFakeGateway(), repo) + paper := tinvest.NewPaperGateway(nil) + paper.Fake().OrderBooks["uid"] = domain.OrderBook{ + InstrumentUID: "uid", + Bids: []domain.OrderBookLevel{{Price: decimal.NewFromInt(99), QuantityLots: 10}}, + Asks: []domain.OrderBookLevel{{Price: decimal.NewFromInt(101), QuantityLots: 10}}, + ReceivedAt: time.Now().UTC(), + } + engine := NewEngine(domain.ModePaper, "account", paper, repo) tradeDate := time.Date(2026, 6, 6, 0, 0, 0, 0, time.UTC) order, err := engine.PlaceEntry(ctx, "hash", domain.Instrument{ InstrumentUID: "uid", @@ -178,8 +215,34 @@ func TestPaperPlaceEntryFillsAndCountsSubmittedOrder(t *testing.T) { if err != nil { t.Fatal(err) } - if order.Status != domain.OrderStatusFilled || order.FilledLots != 2 || order.BrokerOrderID == "" { - t.Fatalf("paper order=%+v, want filled broker-like order", order) + if order.Status != domain.OrderStatusSent || order.FilledLots != 0 || order.BrokerOrderID == "" { + t.Fatalf("paper order=%+v, want sent unfilled broker-like order", order) + } + paper.Fake().OrderBooks["uid"] = domain.OrderBook{ + InstrumentUID: "uid", + Bids: []domain.OrderBookLevel{{Price: decimal.NewFromInt(99), QuantityLots: 10}}, + Asks: []domain.OrderBookLevel{{Price: decimal.NewFromInt(100), QuantityLots: 1}}, + ReceivedAt: time.Now().UTC(), + } + partial, err := engine.MonitorOnce(ctx, order, MonitorConfig{}) + if err != nil { + t.Fatal(err) + } + if partial.Status != domain.OrderStatusPartiallyFilled || partial.FilledLots != 1 { + t.Fatalf("paper partial order=%+v, want 1 lot partial fill", partial) + } + paper.Fake().OrderBooks["uid"] = domain.OrderBook{ + InstrumentUID: "uid", + Bids: []domain.OrderBookLevel{{Price: decimal.NewFromInt(99), QuantityLots: 10}}, + Asks: []domain.OrderBookLevel{{Price: decimal.NewFromInt(100), QuantityLots: 10}}, + ReceivedAt: time.Now().UTC(), + } + filled, err := engine.MonitorOnce(ctx, partial, MonitorConfig{}) + if err != nil { + t.Fatal(err) + } + if filled.Status != domain.OrderStatusFilled || filled.FilledLots != 2 { + t.Fatalf("paper filled order=%+v, want full fill", filled) } sent, err := repo.GetFreeOrdersSent(ctx, tradeDate, "uid") if err != nil { diff --git a/internal/instruments/registry.go b/internal/instruments/registry.go index 304bf54..d5e4415 100644 --- a/internal/instruments/registry.go +++ b/internal/instruments/registry.go @@ -29,7 +29,7 @@ func (r Registry) SyncMetadata(ctx context.Context) error { } remote, err := r.gateway.GetInstrument(ctx, instrument.Ticker, instrument.ClassCode) if err != nil { - continue + return fmt.Errorf("sync instrument metadata %s: %w", instrument.Ticker, err) } remote.Enabled = instrument.Enabled && remote.Enabled remote.FundType = instrument.FundType diff --git a/internal/instruments/registry_test.go b/internal/instruments/registry_test.go new file mode 100644 index 0000000..bcca5a9 --- /dev/null +++ b/internal/instruments/registry_test.go @@ -0,0 +1,38 @@ +package instruments + +import ( + "context" + "errors" + "testing" + + "github.com/shopspring/decimal" + + "overnight-trading-bot/internal/domain" + "overnight-trading-bot/internal/testutil" + "overnight-trading-bot/internal/tinvest" +) + +func TestSyncMetadataFailsWhenEnabledInstrumentCannotBeLoaded(t *testing.T) { + ctx := context.Background() + repo := testutil.NewMemoryRepository() + gateway := tinvest.NewFakeGateway() + instrument := domain.Instrument{ + InstrumentUID: "uid", + Ticker: "TRUR", + ClassCode: "TQTF", + Lot: 1, + MinPriceIncrement: decimal.NewFromInt(1), + Currency: "RUB", + Enabled: true, + } + if err := repo.UpsertInstrument(ctx, instrument); err != nil { + t.Fatal(err) + } + gateway.Instruments["uid"] = instrument + gateway.InstrumentErrors["uid"] = errors.New("metadata unavailable") + + err := NewRegistry(repo, gateway).SyncMetadata(ctx) + if err == nil { + t.Fatal("expected sync metadata error") + } +} diff --git a/internal/marketdata/loader.go b/internal/marketdata/loader.go index dc8912d..ed92e2c 100644 --- a/internal/marketdata/loader.go +++ b/internal/marketdata/loader.go @@ -30,7 +30,6 @@ func (l *Loader) SetClock(clock timeutil.Clock) { func (l Loader) BackfillDaily(ctx context.Context, instruments []domain.Instrument, from, to time.Time) error { eligible := 0 succeeded := 0 - var firstErr error for _, instrument := range instruments { if !instrument.Enabled || instrument.Quarantine { continue @@ -38,18 +37,15 @@ func (l Loader) BackfillDaily(ctx context.Context, instruments []domain.Instrume eligible++ candles, err := l.gateway.GetCandles(ctx, instrument.InstrumentUID, "day", from, to) if err != nil { - if firstErr == nil { - firstErr = fmt.Errorf("load candles %s: %w", instrument.Ticker, err) - } - continue + return fmt.Errorf("load daily candles %s: %w", instrument.Ticker, err) } if err := l.repo.UpsertDailyCandles(ctx, candles); err != nil { return fmt.Errorf("persist candles %s: %w", instrument.Ticker, err) } succeeded++ } - if eligible > 0 && succeeded == 0 && firstErr != nil { - return fmt.Errorf("all daily candle loads failed: %w", firstErr) + if eligible > 0 && succeeded == 0 { + return fmt.Errorf("no daily candles loaded for eligible instruments") } return nil } @@ -57,7 +53,6 @@ func (l Loader) BackfillDaily(ctx context.Context, instruments []domain.Instrume func (l Loader) BackfillMinute(ctx context.Context, instruments []domain.Instrument, from, to time.Time) error { eligible := 0 succeeded := 0 - var firstErr error for _, instrument := range instruments { if !instrument.Enabled || instrument.Quarantine { continue @@ -65,18 +60,15 @@ func (l Loader) BackfillMinute(ctx context.Context, instruments []domain.Instrum eligible++ candles, err := l.gateway.GetCandles(ctx, instrument.InstrumentUID, "minute", from, to) if err != nil { - if firstErr == nil { - firstErr = fmt.Errorf("load minute candles %s: %w", instrument.Ticker, err) - } - continue + return fmt.Errorf("load minute candles %s: %w", instrument.Ticker, err) } if err := l.repo.UpsertMinuteCandles(ctx, candles); err != nil { return fmt.Errorf("persist minute candles %s: %w", instrument.Ticker, err) } succeeded++ } - if eligible > 0 && succeeded == 0 && firstErr != nil { - return fmt.Errorf("all minute candle loads failed: %w", firstErr) + if eligible > 0 && succeeded == 0 { + return fmt.Errorf("no minute candles loaded for eligible instruments") } return nil } diff --git a/internal/marketdata/loader_test.go b/internal/marketdata/loader_test.go index a46360e..8f91476 100644 --- a/internal/marketdata/loader_test.go +++ b/internal/marketdata/loader_test.go @@ -2,6 +2,7 @@ package marketdata import ( "context" + "errors" "strings" "testing" "time" @@ -9,6 +10,7 @@ import ( "github.com/shopspring/decimal" "overnight-trading-bot/internal/domain" + "overnight-trading-bot/internal/testutil" "overnight-trading-bot/internal/tinvest" ) @@ -41,3 +43,45 @@ func TestLatestQuoteUsesExchangeTimestampForFreshness(t *testing.T) { t.Fatalf("LatestQuote err=%v, want stale exchange timestamp rejection", err) } } + +func TestBackfillDailyFailsOnAnyEligibleInstrumentError(t *testing.T) { + ctx := context.Background() + gateway := tinvest.NewFakeGateway() + repo := testutil.NewMemoryRepository() + gateway.Candles["ok"] = []domain.Candle{{ + InstrumentUID: "ok", + TradeDate: time.Date(2026, 6, 8, 0, 0, 0, 0, time.UTC), + Close: decimal.NewFromInt(100), + }} + gateway.CandleErrors["bad"] = errors.New("candles unavailable") + loader := NewLoader(repo, gateway) + + err := loader.BackfillDaily(ctx, []domain.Instrument{ + {InstrumentUID: "ok", Ticker: "OK", Enabled: true}, + {InstrumentUID: "bad", Ticker: "BAD", Enabled: true}, + }, time.Date(2026, 6, 1, 0, 0, 0, 0, time.UTC), time.Date(2026, 6, 8, 0, 0, 0, 0, time.UTC)) + if err == nil { + t.Fatal("expected per-instrument backfill error") + } +} + +func TestBackfillMinuteFailsOnAnyEligibleInstrumentError(t *testing.T) { + ctx := context.Background() + gateway := tinvest.NewFakeGateway() + repo := testutil.NewMemoryRepository() + gateway.Candles["ok"] = []domain.Candle{{ + InstrumentUID: "ok", + TradeDate: time.Date(2026, 6, 8, 18, 10, 0, 0, time.UTC), + Close: decimal.NewFromInt(100), + }} + gateway.CandleErrors["bad"] = errors.New("minute candles unavailable") + loader := NewLoader(repo, gateway) + + err := loader.BackfillMinute(ctx, []domain.Instrument{ + {InstrumentUID: "ok", Ticker: "OK", Enabled: true}, + {InstrumentUID: "bad", Ticker: "BAD", Enabled: true}, + }, time.Date(2026, 6, 8, 18, 0, 0, 0, time.UTC), time.Date(2026, 6, 8, 19, 0, 0, 0, time.UTC)) + if err == nil { + t.Fatal("expected per-instrument minute backfill error") + } +} diff --git a/internal/reconciliation/engine_test.go b/internal/reconciliation/engine_test.go index 8a6be71..ea16078 100644 --- a/internal/reconciliation/engine_test.go +++ b/internal/reconciliation/engine_test.go @@ -142,6 +142,20 @@ func TestReconciliationQuarantinesOnNonZeroBrokerCommission(t *testing.T) { } } +func TestCompareOperationsFlagsNonZeroCommissionWithoutInstrument(t *testing.T) { + diffs := compareOperationsWithPolicy(nil, []domain.Operation{{ + Type: "OPERATION_TYPE_BROKER_FEE", + Commission: decimal.NewFromFloat(0.01), + ExecutedAt: time.Now().UTC(), + }}, true, decimal.Zero) + for _, diff := range diffs { + if diff.Kind == "actual_commission_nonzero" && diff.Critical { + return + } + } + t.Fatalf("expected critical nonzero commission diff, got %+v", diffs) +} + func TestReconciliationSkipsFreshInFlightLocalOrders(t *testing.T) { ctx := context.Background() repo := testutil.NewMemoryRepository() diff --git a/internal/report/daily.go b/internal/report/daily.go index d12a67c..d26746b 100644 --- a/internal/report/daily.go +++ b/internal/report/daily.go @@ -71,12 +71,102 @@ func ComposeDaily(input DailyInput) string { averageSpread = averageContextDecimal(input.Signals, "spread_bps") } fmt.Fprintf(&b, "Средний spread: %s bps\n", averageSpread.StringFixed(2)) - fmt.Fprintf(&b, "Среднее проскальзывание: %s bps\n", input.AverageSlipBps.StringFixed(2)) + averageSlip := input.AverageSlipBps + if averageSlip.IsZero() { + averageSlip = AverageAdverseSlippageBps(input.Orders, 0) + } + fmt.Fprintf(&b, "Среднее проскальзывание: %s bps\n", averageSlip.StringFixed(2)) writeExecutionErrors(&b, input.Orders) fmt.Fprintf(&b, "Risk: %s", input.RiskStatus) return b.String() } +func AverageAdverseSlippageBps(orders []domain.Order, limit int) decimal.Decimal { + if len(orders) == 0 { + return decimal.Zero + } + sorted := append([]domain.Order(nil), orders...) + sort.Slice(sorted, func(i, j int) bool { + return sorted[i].UpdatedAt.After(sorted[j].UpdatedAt) + }) + sum := decimal.Zero + weight := decimal.Zero + count := 0 + for _, order := range sorted { + slippage, ok := orderAdverseSlippageBps(order) + if !ok { + continue + } + lots := decimal.NewFromInt(order.FilledLots) + sum = sum.Add(slippage.Mul(lots)) + weight = weight.Add(lots) + count++ + if limit > 0 && count == limit { + break + } + } + if weight.IsZero() { + return decimal.Zero + } + return sum.Div(weight) +} + +func orderAdverseSlippageBps(order domain.Order) (decimal.Decimal, bool) { + if order.FilledLots <= 0 || !order.AvgFillPrice.IsPositive() { + return decimal.Zero, false + } + reference := orderReferencePrice(order) + if !reference.IsPositive() { + return decimal.Zero, false + } + var adverse decimal.Decimal + switch order.Side { + case domain.SideBuy: + adverse = order.AvgFillPrice.Sub(reference) + case domain.SideSell: + adverse = reference.Sub(order.AvgFillPrice) + default: + return decimal.Zero, false + } + if adverse.IsNegative() { + adverse = decimal.Zero + } + return adverse.Div(reference).Mul(decimal.NewFromInt(10_000)), true +} + +func orderReferencePrice(order domain.Order) decimal.Decimal { + if mid := rawMidPrice(order.RawStateJSON); mid.IsPositive() { + return mid + } + return order.LimitPrice +} + +func rawMidPrice(raw string) decimal.Decimal { + var root map[string]any + if err := json.Unmarshal([]byte(raw), &root); err != nil { + return decimal.Zero + } + if mid := midFromContainer(root); mid.IsPositive() { + return mid + } + if local, ok := root["local"].(map[string]any); ok { + return midFromContainer(local) + } + return decimal.Zero +} + +func midFromContainer(container map[string]any) decimal.Decimal { + quote, ok := container["local_quote"].(map[string]any) + if !ok { + return decimal.Zero + } + mid, ok := decimalFromAny(quote["mid"]) + if !ok { + return decimal.Zero + } + return mid +} + func groupedReasons(signals []domain.Signal) map[string]int { out := make(map[string]int) for _, sig := range signals { diff --git a/internal/report/daily_test.go b/internal/report/daily_test.go new file mode 100644 index 0000000..eef7aa6 --- /dev/null +++ b/internal/report/daily_test.go @@ -0,0 +1,62 @@ +package report + +import ( + "strings" + "testing" + "time" + + "github.com/shopspring/decimal" + + "overnight-trading-bot/internal/domain" +) + +func TestAverageAdverseSlippageBpsUsesLocalQuoteMid(t *testing.T) { + orders := []domain.Order{{ + InstrumentUID: "uid", + Side: domain.SideBuy, + LimitPrice: decimal.NewFromInt(100), + FilledLots: 2, + AvgFillPrice: decimal.NewFromFloat(100.5), + RawStateJSON: `{"local":{"local_quote":{"mid":"100"}}}`, + UpdatedAt: time.Now().UTC(), + }} + got := AverageAdverseSlippageBps(orders, 0) + if !got.Equal(decimal.NewFromInt(50)) { + t.Fatalf("slippage=%s, want 50", got) + } +} + +func TestAverageAdverseSlippageBpsFallsBackToLimit(t *testing.T) { + orders := []domain.Order{{ + InstrumentUID: "uid", + Side: domain.SideSell, + LimitPrice: decimal.NewFromInt(100), + FilledLots: 1, + AvgFillPrice: decimal.NewFromFloat(99.5), + RawStateJSON: `{}`, + UpdatedAt: time.Now().UTC(), + }} + got := AverageAdverseSlippageBps(orders, 0) + if !got.Equal(decimal.NewFromInt(50)) { + t.Fatalf("slippage=%s, want 50", got) + } +} + +func TestComposeDailyComputesSlippageWhenInputIsZero(t *testing.T) { + msg := ComposeDaily(DailyInput{ + Date: time.Date(2026, 6, 8, 0, 0, 0, 0, time.UTC), + Mode: domain.ModePaper, + Orders: []domain.Order{{ + Side: domain.SideBuy, + LimitPrice: decimal.NewFromInt(100), + FilledLots: 1, + AvgFillPrice: decimal.NewFromFloat(100.5), + RawStateJSON: `{"local":{"local_quote":{"mid":"100"}}}`, + UpdatedAt: time.Now().UTC(), + }}, + RiskStatus: "ok", + }) + if !strings.Contains(msg, "Среднее проскальзывание: 50.00 bps") { + t.Fatalf("report did not include computed slippage:\n%s", msg) + } +} diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index 663e2f8..cd1c4c7 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -868,12 +868,13 @@ func (s *Scheduler) sendDailyReport(ctx context.Context, now time.Time, riskStat } } msg := report.ComposeDaily(report.DailyInput{ - Date: tradeDate, - Mode: s.cfg.Mode, - Signals: signals, - Positions: positionsList, - Orders: orders, - RiskStatus: riskStatus, + Date: tradeDate, + Mode: s.cfg.Mode, + Signals: signals, + Positions: positionsList, + Orders: orders, + AverageSlipBps: report.AverageAdverseSlippageBps(orders, 0), + RiskStatus: riskStatus, }) if err := s.svc.Notifier.Report(ctx, msg); err != nil { return err @@ -1328,7 +1329,6 @@ func (s Scheduler) riskMetrics(ctx context.Context, now time.Time, portfolio dom weekStart := today.AddDate(0, 0, -6) var metrics preTradeMetrics monthlyPnL := decimal.Zero - var closed []domain.Position for _, pos := range positionsList { if pos.Status != domain.PositionExitFilled { continue @@ -1347,64 +1347,18 @@ func (s Scheduler) riskMetrics(ctx context.Context, now time.Time, portfolio dom if !closeDate.Before(monthStart) { monthlyPnL = monthlyPnL.Add(pos.NetPnL) } - closed = append(closed, pos) } if monthlyPnL.IsNegative() && portfolio.Equity.IsPositive() { metrics.monthlyDrawdownPct = monthlyPnL.Neg().Div(portfolio.Equity) } - avg, err := s.averageAdverseSlippageBps(ctx, closed, 10) + orders, err := s.svc.Repo.ListOrders(ctx, s.svc.AccountIDHash, monthStart.AddDate(0, 0, -7), today) if err != nil { return preTradeMetrics{}, err } - metrics.avgSlippageBps10 = avg + metrics.avgSlippageBps10 = report.AverageAdverseSlippageBps(orders, 10) return metrics, nil } -func (s Scheduler) averageAdverseSlippageBps(ctx context.Context, positionsList []domain.Position, limit int) (decimal.Decimal, error) { - if limit <= 0 { - return decimal.Zero, nil - } - sort.Slice(positionsList, func(i, j int) bool { - return positionCloseTime(positionsList[i]).After(positionCloseTime(positionsList[j])) - }) - signalsByDate := make(map[string][]domain.Signal) - var values []decimal.Decimal - for _, pos := range positionsList { - key := tradingDate(pos.OpenTradeDate).Format("2006-01-02") - signals, ok := signalsByDate[key] - if !ok { - var err error - signals, err = s.svc.Repo.ListSignals(ctx, tradingDate(pos.OpenTradeDate)) - if err != nil && !errors.Is(err, sql.ErrNoRows) { - return decimal.Zero, err - } - signalsByDate[key] = signals - } - for _, sig := range signals { - if sig.InstrumentUID != pos.InstrumentUID || sig.Decision != domain.DecisionEnter { - continue - } - adverse := sig.NetEdgeBps.Sub(pos.RealizedEdgeBps) - if adverse.IsNegative() { - adverse = decimal.Zero - } - values = append(values, adverse) - break - } - if len(values) == limit { - break - } - } - if len(values) == 0 { - return decimal.Zero, nil - } - sum := decimal.Zero - for _, value := range values { - sum = sum.Add(value) - } - return sum.Div(decimal.NewFromInt(int64(len(values)))), nil -} - func positionCloseTime(pos domain.Position) time.Time { if pos.ClosedAt != nil { return pos.ClosedAt.UTC() diff --git a/internal/tinvest/gateway.go b/internal/tinvest/gateway.go index bc66d2b..c8e2dc5 100644 --- a/internal/tinvest/gateway.go +++ b/internal/tinvest/gateway.go @@ -3,6 +3,7 @@ package tinvest import ( "context" "errors" + "fmt" "sync" "time" @@ -28,24 +29,28 @@ type Gateway interface { } type FakeGateway struct { - mu sync.Mutex - Instruments map[string]domain.Instrument - Candles map[string][]domain.Candle - OrderBooks map[string]domain.OrderBook - Statuses map[string]domain.TradingStatus - Orders map[string]domain.Order - Portfolio domain.Portfolio - Operations []domain.Operation - ServerTime time.Time + mu sync.Mutex + Instruments map[string]domain.Instrument + InstrumentErrors map[string]error + Candles map[string][]domain.Candle + CandleErrors map[string]error + OrderBooks map[string]domain.OrderBook + Statuses map[string]domain.TradingStatus + Orders map[string]domain.Order + Portfolio domain.Portfolio + Operations []domain.Operation + ServerTime time.Time } func NewFakeGateway() *FakeGateway { return &FakeGateway{ - Instruments: make(map[string]domain.Instrument), - Candles: make(map[string][]domain.Candle), - OrderBooks: make(map[string]domain.OrderBook), - Statuses: make(map[string]domain.TradingStatus), - Orders: make(map[string]domain.Order), + Instruments: make(map[string]domain.Instrument), + InstrumentErrors: make(map[string]error), + Candles: make(map[string][]domain.Candle), + CandleErrors: make(map[string]error), + OrderBooks: make(map[string]domain.OrderBook), + Statuses: make(map[string]domain.TradingStatus), + Orders: make(map[string]domain.Order), Portfolio: domain.Portfolio{ Equity: decimal.NewFromInt(100_000), Cash: decimal.NewFromInt(100_000), @@ -59,6 +64,9 @@ func (f *FakeGateway) GetInstrument(_ context.Context, ticker, classCode string) defer f.mu.Unlock() for _, instrument := range f.Instruments { if instrument.Ticker == ticker && instrument.ClassCode == classCode { + if err := f.InstrumentErrors[instrument.InstrumentUID]; err != nil { + return domain.Instrument{}, err + } return instrument, nil } } @@ -68,6 +76,9 @@ func (f *FakeGateway) GetInstrument(_ context.Context, ticker, classCode string) func (f *FakeGateway) GetCandles(_ context.Context, instrumentUID string, _ string, from, to time.Time) ([]domain.Candle, error) { f.mu.Lock() defer f.mu.Unlock() + if err := f.CandleErrors[instrumentUID]; err != nil { + return nil, err + } var out []domain.Candle for _, candle := range f.Candles[instrumentUID] { if !candle.TradeDate.Before(from) && !candle.TradeDate.After(to) { @@ -141,6 +152,40 @@ func (f *FakeGateway) GetOrderState(_ context.Context, _ string, orderID string) return order, nil } +func (f *FakeGateway) SimulateOrderBookFill(orderID string, book domain.OrderBook) (domain.Order, error) { + f.mu.Lock() + defer f.mu.Unlock() + order, ok := f.Orders[orderID] + if !ok { + return domain.Order{}, ErrNotFound + } + if isTerminalFakeOrder(order.Status) || order.FilledLots >= order.QuantityLots { + return order, nil + } + price, availableLots, ok := paperFillLevel(order, book) + if !ok || availableLots <= 0 { + return order, nil + } + remaining := order.QuantityLots - order.FilledLots + fillLots := minInt64(remaining, availableLots) + if fillLots <= 0 { + return order, nil + } + order.AvgFillPrice = paperWeightedAvg(order.AvgFillPrice, order.FilledLots, price, fillLots) + order.FilledLots += fillLots + if order.FilledLots >= order.QuantityLots { + order.Status = domain.OrderStatusFilled + } else { + order.Status = domain.OrderStatusPartiallyFilled + } + now := time.Now().UTC() + order.UpdatedAt = now + order.RawStateJSON = fmt.Sprintf(`{"paper_fill":true,"filled_lots":%d}`, order.FilledLots) + f.Orders[orderID] = order + f.recordPaperOperationLocked(order, fillLots, price, now) + return order, nil +} + func (f *FakeGateway) GetActiveOrders(_ context.Context, _ string) ([]domain.Order, error) { f.mu.Lock() defer f.mu.Unlock() @@ -180,3 +225,70 @@ func (f *FakeGateway) GetServerTime(context.Context) (time.Time, error) { } return f.ServerTime, nil } + +func isTerminalFakeOrder(status domain.OrderStatus) bool { + return status == domain.OrderStatusFilled || + status == domain.OrderStatusCancelled || + status == domain.OrderStatusRejected || + status == domain.OrderStatusExpired || + status == domain.OrderStatusFailed +} + +func paperFillLevel(order domain.Order, book domain.OrderBook) (decimal.Decimal, int64, bool) { + switch order.Side { + case domain.SideBuy: + if len(book.Asks) == 0 { + return decimal.Zero, 0, false + } + ask := book.Asks[0] + if ask.Price.IsPositive() && order.LimitPrice.GreaterThanOrEqual(ask.Price) { + return ask.Price, ask.QuantityLots, true + } + case domain.SideSell: + if len(book.Bids) == 0 { + return decimal.Zero, 0, false + } + bid := book.Bids[0] + if bid.Price.IsPositive() && order.LimitPrice.LessThanOrEqual(bid.Price) { + return bid.Price, bid.QuantityLots, true + } + } + return decimal.Zero, 0, false +} + +func paperWeightedAvg(currentAvg decimal.Decimal, currentLots int64, fillPrice decimal.Decimal, fillLots int64) decimal.Decimal { + if currentLots <= 0 { + return fillPrice + } + totalLots := currentLots + fillLots + if totalLots <= 0 { + return decimal.Zero + } + return currentAvg.Mul(decimal.NewFromInt(currentLots)). + Add(fillPrice.Mul(decimal.NewFromInt(fillLots))). + Div(decimal.NewFromInt(totalLots)) +} + +func (f *FakeGateway) recordPaperOperationLocked(order domain.Order, fillLots int64, price decimal.Decimal, ts time.Time) { + payment := price.Mul(decimal.NewFromInt(fillLots)) + opType := "OPERATION_TYPE_SELL" + if order.Side == domain.SideBuy { + payment = payment.Neg() + opType = "OPERATION_TYPE_BUY" + } + f.Operations = append(f.Operations, domain.Operation{ + ID: fmt.Sprintf("%s-%d", order.BrokerOrderID, len(f.Operations)+1), + InstrumentUID: order.InstrumentUID, + Type: opType, + Payment: payment, + Commission: decimal.Zero, + ExecutedAt: ts, + }) +} + +func minInt64(a, b int64) int64 { + if a < b { + return a + } + return b +} diff --git a/internal/tinvest/paper.go b/internal/tinvest/paper.go index bdff4fc..8d32dc8 100644 --- a/internal/tinvest/paper.go +++ b/internal/tinvest/paper.go @@ -62,7 +62,18 @@ func (g *PaperGateway) CancelOrder(ctx context.Context, accountID, orderID strin } func (g *PaperGateway) GetOrderState(ctx context.Context, accountID, orderID string) (domain.Order, error) { - return g.Fake().GetOrderState(ctx, accountID, orderID) + order, err := g.Fake().GetOrderState(ctx, accountID, orderID) + if err != nil { + return domain.Order{}, err + } + if !paperOrderCanFill(order) { + return order, nil + } + book, err := g.GetOrderBook(ctx, order.InstrumentUID, 20) + if err != nil { + return domain.Order{}, err + } + return g.Fake().SimulateOrderBookFill(orderID, book) } func (g *PaperGateway) GetActiveOrders(ctx context.Context, accountID string) ([]domain.Order, error) { @@ -83,3 +94,9 @@ func (g *PaperGateway) GetServerTime(ctx context.Context) (time.Time, error) { } return g.Fake().GetServerTime(ctx) } + +func paperOrderCanFill(order domain.Order) bool { + return order.Status == domain.OrderStatusSent || + order.Status == domain.OrderStatusPartiallyFilled || + order.Status == domain.OrderStatusNew +} diff --git a/internal/tinvest/real.go b/internal/tinvest/real.go index dbc0dd1..e9c6919 100644 --- a/internal/tinvest/real.go +++ b/internal/tinvest/real.go @@ -306,13 +306,19 @@ func (g *RealGateway) GetPortfolio(ctx context.Context, accountID string) (domai if err != nil { return domain.Portfolio{}, err } - return portfolioFromResponse(resp, g.lotForInstrument) + return portfolioFromResponse(resp, func(instrumentUID string) (int64, error) { + return g.resolveInstrumentLot(ctx, instrumentUID) + }) } func (g *RealGateway) GetOperations(ctx context.Context, accountID string, from, to time.Time) ([]domain.Operation, error) { if err := ctx.Err(); err != nil { return nil, err } + ops, err := g.getOperationsByCursor(ctx, accountID, from, to) + if err == nil { + return ops, nil + } resp, err := requestWithTimeout(ctx, g.requestTimeout, func(callCtx context.Context) (*pb.OperationsResponse, error) { return retryValue(callCtx, g.retryAttempts, g.retryBackoff, func() (*pb.OperationsResponse, error) { return g.operationsPB.GetOperations(callCtx, &pb.OperationsRequest{ @@ -328,30 +334,122 @@ func (g *RealGateway) GetOperations(ctx context.Context, accountID string, from, return operationsFromResponse(resp), nil } +func (g *RealGateway) getOperationsByCursor(ctx context.Context, accountID string, from, to time.Time) ([]domain.Operation, error) { + limit := int32(1000) + withoutCommissions := false + withoutTrades := true + withoutOvernights := false + state := pb.OperationState_OPERATION_STATE_EXECUTED + var cursor *string + var out []domain.Operation + for { + resp, err := requestWithTimeout(ctx, g.requestTimeout, func(callCtx context.Context) (*pb.GetOperationsByCursorResponse, error) { + return retryValue(callCtx, g.retryAttempts, g.retryBackoff, func() (*pb.GetOperationsByCursorResponse, error) { + return g.operationsPB.GetOperationsByCursor(callCtx, &pb.GetOperationsByCursorRequest{ + AccountId: accountID, + From: investgo.TimeToTimestamp(from), + To: investgo.TimeToTimestamp(to), + Cursor: cursor, + Limit: &limit, + State: &state, + WithoutCommissions: &withoutCommissions, + WithoutTrades: &withoutTrades, + WithoutOvernights: &withoutOvernights, + }) + }) + }) + if err != nil { + return nil, err + } + out = append(out, operationsFromCursorResponse(resp)...) + if !resp.GetHasNext() || resp.GetNextCursor() == "" { + return out, nil + } + next := resp.GetNextCursor() + cursor = &next + } +} + func operationsFromResponse(resp *pb.OperationsResponse) []domain.Operation { ops := resp.GetOperations() out := make([]domain.Operation, 0, len(ops)) for _, op := range ops { payment := money.MoneyValueToDecimal(op.GetPayment()) + instrumentUID := op.GetInstrumentUid() + commission := operationCommission(op.GetOperationType(), payment) + childCommission := decimal.Zero + for _, child := range op.GetChildOperations() { + childPayment := money.MoneyValueToDecimal(child.GetPayment()) + if instrumentUID == "" { + instrumentUID = child.GetInstrumentUid() + } + childCommission = childCommission.Add(operationCommission(op.GetOperationType(), childPayment)) + } + if commission.IsZero() { + commission = childCommission + } out = append(out, domain.Operation{ ID: op.GetId(), - InstrumentUID: op.GetInstrumentUid(), + InstrumentUID: instrumentUID, Type: op.GetOperationType().String(), Payment: payment, - Commission: operationCommission(op.GetOperationType(), payment), + Commission: commission, ExecutedAt: op.GetDate().AsTime().UTC(), }) } return out } -func portfolioFromResponse(resp *pb.PortfolioResponse, lotForInstrument func(string) int64) (domain.Portfolio, error) { +func operationsFromCursorResponse(resp *pb.GetOperationsByCursorResponse) []domain.Operation { + items := resp.GetItems() + out := make([]domain.Operation, 0, len(items)) + for _, item := range items { + payment := money.MoneyValueToDecimal(item.GetPayment()) + commission := money.Abs(money.MoneyValueToDecimal(item.GetCommission())) + instrumentUID := item.GetInstrumentUid() + childCommission := decimal.Zero + for _, child := range item.GetChildOperations() { + childPayment := money.Abs(money.MoneyValueToDecimal(child.GetPayment())) + if instrumentUID == "" { + instrumentUID = child.GetInstrumentUid() + } + if operationLooksLikeCommission(item.GetType(), childPayment) { + childCommission = childCommission.Add(childPayment) + } + } + if commission.IsZero() && operationLooksLikeCommission(item.GetType(), payment) { + commission = money.Abs(payment) + } + if commission.IsZero() { + commission = childCommission + } + out = append(out, domain.Operation{ + ID: item.GetId(), + InstrumentUID: instrumentUID, + Type: item.GetType().String(), + Payment: payment, + Commission: commission, + ExecutedAt: item.GetDate().AsTime().UTC(), + }) + } + return out +} + +func portfolioFromResponse(resp *pb.PortfolioResponse, lotForInstrument func(string) (int64, error)) (domain.Portfolio, error) { positions := resp.GetPositions() holdings := make([]domain.Holding, 0, len(positions)) for _, position := range positions { + if portfolioPositionIgnored(position) { + continue + } + lot, lotErr := portfolioPositionLot(position, lotForInstrument) + lots, err := portfolioQuantityLots(position, lot, lotErr) + if err != nil { + return domain.Portfolio{}, err + } holdings = append(holdings, domain.Holding{ InstrumentUID: position.GetInstrumentUid(), - QuantityLots: portfolioQuantityLots(position, portfolioPositionLot(position, lotForInstrument)), + QuantityLots: lots, AveragePrice: money.MoneyValueToDecimal(position.GetAveragePositionPrice()), MarketValue: money.MoneyValueToDecimal(position.GetCurrentPrice()).Mul(money.QuotationToDecimal(position.GetQuantity())), }) @@ -396,14 +494,22 @@ func (g *RealGateway) GetServerTime(ctx context.Context) (time.Time, error) { } func operationCommission(operationType pb.OperationType, payment decimal.Decimal) decimal.Decimal { - if operationType != pb.OperationType_OPERATION_TYPE_BROKER_FEE && - operationType != pb.OperationType_OPERATION_TYPE_SERVICE_FEE && - operationType != pb.OperationType_OPERATION_TYPE_SUCCESS_FEE { + if !operationTypeIsCommission(operationType) { return decimal.Zero } return money.Abs(payment) } +func operationTypeIsCommission(operationType pb.OperationType) bool { + return operationType == pb.OperationType_OPERATION_TYPE_BROKER_FEE || + operationType == pb.OperationType_OPERATION_TYPE_SERVICE_FEE || + operationType == pb.OperationType_OPERATION_TYPE_SUCCESS_FEE +} + +func operationLooksLikeCommission(operationType pb.OperationType, payment decimal.Decimal) bool { + return operationTypeIsCommission(operationType) && !payment.IsZero() +} + func rubMoneyValueToDecimal(value *pb.MoneyValue) (decimal.Decimal, error) { if value == nil { return decimal.Zero, nil @@ -414,25 +520,38 @@ func rubMoneyValueToDecimal(value *pb.MoneyValue) (decimal.Decimal, error) { return money.MoneyValueToDecimal(value), nil } -func portfolioPositionLot(position *pb.PortfolioPosition, lotForInstrument func(string) int64) int64 { +func portfolioPositionLot(position *pb.PortfolioPosition, lotForInstrument func(string) (int64, error)) (int64, error) { if position == nil || lotForInstrument == nil { - return 0 + return 0, nil } return lotForInstrument(position.GetInstrumentUid()) } -func portfolioQuantityLots(position *pb.PortfolioPosition, lot int64) int64 { +func portfolioPositionIgnored(position *pb.PortfolioPosition) bool { if position == nil { - return 0 + return true + } + if money.QuotationToDecimal(position.GetQuantity()).IsZero() { + return true + } + return strings.EqualFold(position.GetInstrumentType(), "currency") +} + +func portfolioQuantityLots(position *pb.PortfolioPosition, lot int64, lotErr error) (int64, error) { + if position == nil { + return 0, nil } if lots, ok := portfolioDeprecatedQuantityLots(position); ok { - return lots.IntPart() + return lots.IntPart(), nil + } + if lotErr != nil { + return 0, lotErr } quantity := money.QuotationToDecimal(position.GetQuantity()) if lot > 0 { - return quantity.Div(decimal.NewFromInt(lot)).IntPart() + return quantity.Div(decimal.NewFromInt(lot)).IntPart(), nil } - return quantity.IntPart() + return 0, fmt.Errorf("portfolio lot size is unknown for %s", position.GetInstrumentUid()) } func (g *RealGateway) storeInstrumentLot(instrument domain.Instrument) { @@ -457,6 +576,33 @@ func (g *RealGateway) lotForInstrument(instrumentUID string) int64 { return lot } +func (g *RealGateway) resolveInstrumentLot(ctx context.Context, instrumentUID string) (int64, error) { + if lot := g.lotForInstrument(instrumentUID); lot > 0 { + return lot, nil + } + if instrumentUID == "" { + return 0, errors.New("portfolio instrument uid is empty") + } + resp, err := requestWithTimeout(ctx, g.requestTimeout, func(callCtx context.Context) (*pb.InstrumentResponse, error) { + return retryValue(callCtx, g.retryAttempts, g.retryBackoff, func() (*pb.InstrumentResponse, error) { + return g.instrumentsPB.GetInstrumentBy(callCtx, &pb.InstrumentRequest{ + IdType: pb.InstrumentIdType_INSTRUMENT_ID_TYPE_UID, + Id: instrumentUID, + }) + }) + }) + if err != nil { + return 0, err + } + instrument := resp.GetInstrument() + if instrument == nil || instrument.GetLot() <= 0 { + return 0, fmt.Errorf("portfolio lot size is unavailable for %s", instrumentUID) + } + lot := int64(instrument.GetLot()) + g.instrumentLots.Store(instrumentUID, lot) + return lot, nil +} + func portfolioDeprecatedQuantityLots(position *pb.PortfolioPosition) (decimal.Decimal, bool) { message := position.ProtoReflect() field := message.Descriptor().Fields().ByName("quantity_lots") diff --git a/internal/tinvest/real_test.go b/internal/tinvest/real_test.go index 887d2e5..0958d6c 100644 --- a/internal/tinvest/real_test.go +++ b/internal/tinvest/real_test.go @@ -47,11 +47,11 @@ func TestPortfolioFromResponseConvertsUnitsToLots(t *testing.T) { CurrentPrice: &pb.MoneyValue{Currency: "rub", Units: 10}, }, }, - }, func(instrumentUID string) int64 { + }, func(instrumentUID string) (int64, error) { if instrumentUID == "uid" { - return 10 + return 10, nil } - return 0 + return 0, nil }) if err != nil { t.Fatal(err) @@ -63,3 +63,84 @@ func TestPortfolioFromResponseConvertsUnitsToLots(t *testing.T) { t.Fatalf("market value=%s, want 200", portfolio.Holdings[0].MarketValue) } } + +func TestPortfolioFromResponseRejectsUnknownLotWhenQuantityLotsMissing(t *testing.T) { + _, err := portfolioFromResponse(&pb.PortfolioResponse{ + Positions: []*pb.PortfolioPosition{ + { + InstrumentUid: "uid", + Quantity: &pb.Quotation{Units: 20}, + CurrentPrice: &pb.MoneyValue{Currency: "rub", Units: 10}, + }, + }, + }, func(string) (int64, error) { + return 0, nil + }) + if err == nil { + t.Fatal("expected unknown lot error") + } +} + +func TestPortfolioFromResponseIgnoresCurrencyPositions(t *testing.T) { + portfolio, err := portfolioFromResponse(&pb.PortfolioResponse{ + Positions: []*pb.PortfolioPosition{ + { + InstrumentUid: "rub", + InstrumentType: "currency", + Quantity: &pb.Quotation{Units: 1000}, + CurrentPrice: &pb.MoneyValue{Currency: "rub", Units: 1}, + QuantityLots: &pb.Quotation{Units: 1000}, + AveragePositionPrice: &pb.MoneyValue{Currency: "rub", Units: 1}, + }, + }, + }, func(string) (int64, error) { + return 0, nil + }) + if err != nil { + t.Fatal(err) + } + if len(portfolio.Holdings) != 0 { + t.Fatalf("currency position should be excluded from holdings: %+v", portfolio.Holdings) + } +} + +func TestOperationsFromResponseAttributesCommissionChildUID(t *testing.T) { + ops := operationsFromResponse(&pb.OperationsResponse{ + Operations: []*pb.Operation{{ + Id: "fee", + OperationType: pb.OperationType_OPERATION_TYPE_BROKER_FEE, + Payment: &pb.MoneyValue{Currency: "rub", Units: -1}, + ChildOperations: []*pb.ChildOperationItem{{ + InstrumentUid: "uid", + Payment: &pb.MoneyValue{Currency: "rub", Units: -1}, + }}, + }}, + }) + if len(ops) != 1 { + t.Fatalf("operations=%d, want 1", len(ops)) + } + if ops[0].InstrumentUID != "uid" { + t.Fatalf("instrument uid=%q, want uid", ops[0].InstrumentUID) + } + if !ops[0].Commission.Equal(decimal.NewFromInt(1)) { + t.Fatalf("commission=%s, want 1", ops[0].Commission) + } +} + +func TestOperationsFromCursorResponseUsesExplicitCommission(t *testing.T) { + ops := operationsFromCursorResponse(&pb.GetOperationsByCursorResponse{ + Items: []*pb.OperationItem{{ + Id: "trade", + Type: pb.OperationType_OPERATION_TYPE_BUY, + InstrumentUid: "uid", + Payment: &pb.MoneyValue{Currency: "rub", Units: -100}, + Commission: &pb.MoneyValue{Currency: "rub", Units: -1}, + }}, + }) + if len(ops) != 1 { + t.Fatalf("operations=%d, want 1", len(ops)) + } + if !ops[0].Commission.Equal(decimal.NewFromInt(1)) { + t.Fatalf("commission=%s, want 1", ops[0].Commission) + } +} diff --git a/internal/tinvest/sandbox.go b/internal/tinvest/sandbox.go index 4db9030..b0b8c2b 100644 --- a/internal/tinvest/sandbox.go +++ b/internal/tinvest/sandbox.go @@ -135,7 +135,9 @@ func (g *SandboxGateway) GetPortfolio(ctx context.Context, accountID string) (do if err != nil { return domain.Portfolio{}, err } - return portfolioFromResponse(resp, g.lotForInstrument) + return portfolioFromResponse(resp, func(instrumentUID string) (int64, error) { + return g.resolveInstrumentLot(ctx, instrumentUID) + }) } func (g *SandboxGateway) GetOperations(ctx context.Context, accountID string, from, to time.Time) ([]domain.Operation, error) {