This commit is contained in:
@@ -150,7 +150,7 @@ func (e *Engine) placeLimit(ctx context.Context, order domain.Order, freeOrderLi
|
||||
lock := e.lockFor(order.InstrumentUID)
|
||||
lock.Lock()
|
||||
defer lock.Unlock()
|
||||
if e.mode != domain.ModePaper && !e.mode.AllowsBrokerOrders() {
|
||||
if !e.mode.AllowsBrokerOrders() && e.mode != domain.ModePaper {
|
||||
return order, ErrBrokerOrdersDisabled
|
||||
}
|
||||
if e.gateway == nil {
|
||||
@@ -287,7 +287,7 @@ func (e *Engine) MonitorUntil(ctx context.Context, order domain.Order, cfg Monit
|
||||
}
|
||||
lastPost := e.nowUTC()
|
||||
current := order
|
||||
aggregate := order
|
||||
aggregate := AggregatedOrderFill(order)
|
||||
seen := map[string]domain.Order{order.ClientOrderID: order}
|
||||
for {
|
||||
previous := seen[current.ClientOrderID]
|
||||
@@ -295,12 +295,19 @@ func (e *Engine) MonitorUntil(ctx context.Context, order domain.Order, cfg Monit
|
||||
if err != nil {
|
||||
return aggregate, err
|
||||
}
|
||||
if delta := fillDeltaLots(previous, refreshed); delta > 0 {
|
||||
refreshed.RawStateJSON = e.captureFillQuote(ctx, refreshed.RawStateJSON, refreshed, cfg, delta)
|
||||
}
|
||||
aggregate = mergeAggregateFill(aggregate, previous, refreshed)
|
||||
seen[current.ClientOrderID] = refreshed
|
||||
current = mergeOrderState(current, refreshed)
|
||||
aggregate.Status = current.Status
|
||||
aggregate.UpdatedAt = current.UpdatedAt
|
||||
current.RawStateJSON = withMonitorAggregate(current.RawStateJSON, aggregate)
|
||||
aggregate.RawStateJSON = current.RawStateJSON
|
||||
if err := e.persistOrderMonitorState(ctx, current); err != nil {
|
||||
return aggregate, err
|
||||
}
|
||||
if aggregate.FilledLots >= aggregate.QuantityLots {
|
||||
aggregate.Status = domain.OrderStatusFilled
|
||||
return aggregate, nil
|
||||
@@ -329,7 +336,15 @@ func (e *Engine) MonitorUntil(ctx context.Context, order domain.Order, cfg Monit
|
||||
result, err := e.repost(ctx, current, cfg, aggregate.QuantityLots-aggregate.FilledLots)
|
||||
if result.Cancelled.ClientOrderID != "" {
|
||||
previous := seen[result.Cancelled.ClientOrderID]
|
||||
if delta := fillDeltaLots(previous, result.Cancelled); delta > 0 {
|
||||
result.Cancelled.RawStateJSON = e.captureFillQuote(ctx, result.Cancelled.RawStateJSON, result.Cancelled, cfg, delta)
|
||||
}
|
||||
aggregate = mergeAggregateFill(aggregate, previous, result.Cancelled)
|
||||
result.Cancelled.RawStateJSON = withMonitorAggregate(result.Cancelled.RawStateJSON, aggregate)
|
||||
aggregate.RawStateJSON = result.Cancelled.RawStateJSON
|
||||
if persistErr := e.persistOrderMonitorState(ctx, result.Cancelled); persistErr != nil {
|
||||
return aggregate, persistErr
|
||||
}
|
||||
seen[result.Cancelled.ClientOrderID] = result.Cancelled
|
||||
if aggregate.FilledLots >= aggregate.QuantityLots {
|
||||
aggregate.Status = domain.OrderStatusFilled
|
||||
@@ -341,6 +356,11 @@ func (e *Engine) MonitorUntil(ctx context.Context, order domain.Order, cfg Monit
|
||||
}
|
||||
if result.Changed {
|
||||
current = result.Current
|
||||
current.RawStateJSON = carryFillQuotes(current.RawStateJSON, aggregate.RawStateJSON)
|
||||
current.RawStateJSON = withMonitorAggregate(current.RawStateJSON, aggregate)
|
||||
if persistErr := e.persistOrderMonitorState(ctx, current); persistErr != nil {
|
||||
return aggregate, persistErr
|
||||
}
|
||||
seen[current.ClientOrderID] = current
|
||||
aggregate.Status = current.Status
|
||||
aggregate.UpdatedAt = current.UpdatedAt
|
||||
@@ -362,16 +382,24 @@ func (e *Engine) MonitorOnce(ctx context.Context, order domain.Order, cfg Monito
|
||||
if cfg.MaxAttempts <= 0 {
|
||||
cfg.MaxAttempts = 1
|
||||
}
|
||||
aggregate := AggregatedOrderFill(order)
|
||||
previous := order
|
||||
refreshed, err := e.Refresh(ctx, order)
|
||||
if err != nil {
|
||||
return order, err
|
||||
}
|
||||
aggregate := mergeAggregateFill(order, previous, refreshed)
|
||||
if delta := fillDeltaLots(previous, refreshed); delta > 0 {
|
||||
refreshed.RawStateJSON = e.captureFillQuote(ctx, refreshed.RawStateJSON, refreshed, cfg, delta)
|
||||
}
|
||||
aggregate = mergeAggregateFill(aggregate, previous, refreshed)
|
||||
current := mergeOrderState(order, refreshed)
|
||||
aggregate.Status = current.Status
|
||||
aggregate.UpdatedAt = current.UpdatedAt
|
||||
current.RawStateJSON = withMonitorAggregate(current.RawStateJSON, aggregate)
|
||||
aggregate.RawStateJSON = current.RawStateJSON
|
||||
if err := e.persistOrderMonitorState(ctx, current); err != nil {
|
||||
return aggregate, err
|
||||
}
|
||||
if aggregate.FilledLots >= aggregate.QuantityLots {
|
||||
aggregate.Status = domain.OrderStatusFilled
|
||||
return aggregate, nil
|
||||
@@ -399,7 +427,15 @@ func (e *Engine) MonitorOnce(ctx context.Context, order domain.Order, cfg Monito
|
||||
if shouldRepost {
|
||||
result, err := e.repost(ctx, current, cfg, aggregate.QuantityLots-aggregate.FilledLots)
|
||||
if result.Cancelled.ClientOrderID != "" {
|
||||
if delta := fillDeltaLots(current, result.Cancelled); delta > 0 {
|
||||
result.Cancelled.RawStateJSON = e.captureFillQuote(ctx, result.Cancelled.RawStateJSON, result.Cancelled, cfg, delta)
|
||||
}
|
||||
aggregate = mergeAggregateFill(aggregate, current, result.Cancelled)
|
||||
result.Cancelled.RawStateJSON = withMonitorAggregate(result.Cancelled.RawStateJSON, aggregate)
|
||||
aggregate.RawStateJSON = result.Cancelled.RawStateJSON
|
||||
if persistErr := e.persistOrderMonitorState(ctx, result.Cancelled); persistErr != nil {
|
||||
return aggregate, persistErr
|
||||
}
|
||||
if aggregate.FilledLots >= aggregate.QuantityLots {
|
||||
aggregate.Status = domain.OrderStatusFilled
|
||||
return aggregate, nil
|
||||
@@ -412,8 +448,13 @@ func (e *Engine) MonitorOnce(ctx context.Context, order domain.Order, cfg Monito
|
||||
aggregate.BrokerOrderID = result.Current.BrokerOrderID
|
||||
aggregate.ClientOrderID = result.Current.ClientOrderID
|
||||
aggregate.Status = result.Current.Status
|
||||
aggregate.RawStateJSON = result.Current.RawStateJSON
|
||||
aggregate.UpdatedAt = result.Current.UpdatedAt
|
||||
result.Current.RawStateJSON = carryFillQuotes(result.Current.RawStateJSON, aggregate.RawStateJSON)
|
||||
result.Current.RawStateJSON = withMonitorAggregate(result.Current.RawStateJSON, aggregate)
|
||||
aggregate.RawStateJSON = result.Current.RawStateJSON
|
||||
if persistErr := e.persistOrderMonitorState(ctx, result.Current); persistErr != nil {
|
||||
return aggregate, persistErr
|
||||
}
|
||||
}
|
||||
}
|
||||
return aggregate, nil
|
||||
@@ -628,6 +669,198 @@ func localRawStateJSON(raw string) string {
|
||||
return raw
|
||||
}
|
||||
|
||||
func AggregatedOrderFill(order domain.Order) domain.Order {
|
||||
aggregate := order
|
||||
state, ok := monitorAggregateFromRaw(order.RawStateJSON)
|
||||
if !ok {
|
||||
return aggregate
|
||||
}
|
||||
if state.QuantityLots > 0 {
|
||||
aggregate.QuantityLots = state.QuantityLots
|
||||
}
|
||||
aggregate.FilledLots = state.FilledLots
|
||||
aggregate.AvgFillPrice = state.AvgFillPrice
|
||||
aggregate.Commission = state.Commission
|
||||
return aggregate
|
||||
}
|
||||
|
||||
type monitorAggregateState struct {
|
||||
QuantityLots int64
|
||||
FilledLots int64
|
||||
AvgFillPrice decimal.Decimal
|
||||
Commission decimal.Decimal
|
||||
}
|
||||
|
||||
func monitorAggregateFromRaw(raw string) (monitorAggregateState, bool) {
|
||||
var root map[string]any
|
||||
if err := json.Unmarshal([]byte(raw), &root); err != nil {
|
||||
return monitorAggregateState{}, false
|
||||
}
|
||||
if local, ok := root["local"].(map[string]any); ok {
|
||||
if state, ok := monitorAggregateFromContainer(local); ok {
|
||||
return state, true
|
||||
}
|
||||
}
|
||||
return monitorAggregateFromContainer(root)
|
||||
}
|
||||
|
||||
func monitorAggregateFromContainer(container map[string]any) (monitorAggregateState, bool) {
|
||||
raw, ok := container["monitor_aggregate"].(map[string]any)
|
||||
if !ok {
|
||||
return monitorAggregateState{}, false
|
||||
}
|
||||
quantityLots, quantityOK := int64FromAny(raw["quantity_lots"])
|
||||
filledLots, filledOK := int64FromAny(raw["filled_lots"])
|
||||
avgFillPrice, avgOK := decimalFromAny(raw["avg_fill_price"])
|
||||
commission, commissionOK := decimalFromAny(raw["commission"])
|
||||
if !quantityOK || !filledOK {
|
||||
return monitorAggregateState{}, false
|
||||
}
|
||||
if !avgOK {
|
||||
avgFillPrice = decimal.Zero
|
||||
}
|
||||
if !commissionOK {
|
||||
commission = decimal.Zero
|
||||
}
|
||||
return monitorAggregateState{
|
||||
QuantityLots: quantityLots,
|
||||
FilledLots: filledLots,
|
||||
AvgFillPrice: avgFillPrice,
|
||||
Commission: commission,
|
||||
}, true
|
||||
}
|
||||
|
||||
func withMonitorAggregate(raw string, aggregate domain.Order) string {
|
||||
root := rawStateObject(raw)
|
||||
local := localObjectForMutation(root)
|
||||
local["monitor_aggregate"] = map[string]any{
|
||||
"quantity_lots": aggregate.QuantityLots,
|
||||
"filled_lots": aggregate.FilledLots,
|
||||
"avg_fill_price": aggregate.AvgFillPrice.String(),
|
||||
"commission": aggregate.Commission.String(),
|
||||
}
|
||||
encoded, err := json.Marshal(root)
|
||||
if err != nil {
|
||||
return raw
|
||||
}
|
||||
return string(encoded)
|
||||
}
|
||||
|
||||
func carryFillQuotes(raw, sourceRaw string) string {
|
||||
source := rawStateObject(sourceRaw)
|
||||
sourceLocal := source
|
||||
if local, ok := source["local"].(map[string]any); ok {
|
||||
sourceLocal = local
|
||||
}
|
||||
quotes, ok := sourceLocal["fill_quotes"].([]any)
|
||||
if !ok || len(quotes) == 0 {
|
||||
return raw
|
||||
}
|
||||
root := rawStateObject(raw)
|
||||
local := localObjectForMutation(root)
|
||||
local["fill_quotes"] = quotes
|
||||
encoded, err := json.Marshal(root)
|
||||
if err != nil {
|
||||
return raw
|
||||
}
|
||||
return string(encoded)
|
||||
}
|
||||
|
||||
func (e *Engine) captureFillQuote(ctx context.Context, raw string, order domain.Order, cfg MonitorConfig, deltaLots int64) string {
|
||||
if deltaLots <= 0 || cfg.Quote == nil {
|
||||
return raw
|
||||
}
|
||||
book, err := cfg.Quote(ctx, order.InstrumentUID)
|
||||
if err != nil {
|
||||
return raw
|
||||
}
|
||||
bid, ask, err := bestBidAsk(book)
|
||||
if err != nil {
|
||||
return raw
|
||||
}
|
||||
root := rawStateObject(raw)
|
||||
local := localObjectForMutation(root)
|
||||
quotes, _ := local["fill_quotes"].([]any)
|
||||
fillQuote := map[string]any{
|
||||
"best_bid": bid.String(),
|
||||
"best_ask": ask.String(),
|
||||
"mid": bid.Add(ask).Div(decimal.NewFromInt(2)).String(),
|
||||
"recorded_at": e.nowUTC().Format(time.RFC3339Nano),
|
||||
"filled_lots_delta": deltaLots,
|
||||
}
|
||||
if ts := quoteTimestamp(book); !ts.IsZero() {
|
||||
fillQuote["quote_ts"] = ts.UTC().Format(time.RFC3339Nano)
|
||||
}
|
||||
local["fill_quotes"] = append(quotes, fillQuote)
|
||||
encoded, err := json.Marshal(root)
|
||||
if err != nil {
|
||||
return raw
|
||||
}
|
||||
return string(encoded)
|
||||
}
|
||||
|
||||
func rawStateObject(raw string) map[string]any {
|
||||
var root map[string]any
|
||||
if err := json.Unmarshal([]byte(raw), &root); err != nil || root == nil {
|
||||
return map[string]any{}
|
||||
}
|
||||
return root
|
||||
}
|
||||
|
||||
func localObjectForMutation(root map[string]any) map[string]any {
|
||||
if local, ok := root["local"].(map[string]any); ok {
|
||||
return local
|
||||
}
|
||||
if _, hasBroker := root["broker"]; hasBroker {
|
||||
local := map[string]any{}
|
||||
root["local"] = local
|
||||
return local
|
||||
}
|
||||
return root
|
||||
}
|
||||
|
||||
func decimalFromAny(value any) (decimal.Decimal, bool) {
|
||||
switch typed := value.(type) {
|
||||
case string:
|
||||
parsed, err := decimal.NewFromString(typed)
|
||||
return parsed, err == nil
|
||||
case float64:
|
||||
return decimal.NewFromFloat(typed), true
|
||||
default:
|
||||
return decimal.Zero, false
|
||||
}
|
||||
}
|
||||
|
||||
func int64FromAny(value any) (int64, bool) {
|
||||
switch typed := value.(type) {
|
||||
case float64:
|
||||
return int64(typed), true
|
||||
case string:
|
||||
parsed, err := decimal.NewFromString(typed)
|
||||
if err != nil {
|
||||
return 0, false
|
||||
}
|
||||
return parsed.IntPart(), true
|
||||
default:
|
||||
return 0, false
|
||||
}
|
||||
}
|
||||
|
||||
func fillDeltaLots(previous, current domain.Order) int64 {
|
||||
delta := current.FilledLots - previous.FilledLots
|
||||
if delta < 0 {
|
||||
return 0
|
||||
}
|
||||
return delta
|
||||
}
|
||||
|
||||
func (e *Engine) persistOrderMonitorState(ctx context.Context, order domain.Order) error {
|
||||
if e.store == nil {
|
||||
return nil
|
||||
}
|
||||
return e.store.UpdateOrderStatus(ctx, order.ClientOrderID, order.Status, order.FilledLots, order.RawStateJSON)
|
||||
}
|
||||
|
||||
func (e *Engine) lockFor(instrumentUID string) *sync.Mutex {
|
||||
value, _ := e.mu.LoadOrStore(instrumentUID, &sync.Mutex{})
|
||||
lock, ok := value.(*sync.Mutex)
|
||||
|
||||
@@ -352,6 +352,50 @@ func TestLiveReadonlyDoesNotPersistLocalOrder(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestPlaceLimitModePolicy(t *testing.T) {
|
||||
tests := []struct {
|
||||
mode domain.Mode
|
||||
allowed bool
|
||||
}{
|
||||
{mode: domain.ModeBacktest, allowed: false},
|
||||
{mode: domain.ModePaper, allowed: true},
|
||||
{mode: domain.ModeSandbox, allowed: true},
|
||||
{mode: domain.ModeLiveReadonly, allowed: false},
|
||||
{mode: domain.ModeLiveTrade, allowed: true},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(string(tt.mode), func(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
repo := testutil.NewMemoryRepository()
|
||||
engine := NewEngine(tt.mode, "account", tinvest.NewFakeGateway(), repo)
|
||||
_, err := engine.PlaceLimit(ctx, domain.Order{
|
||||
ClientOrderID: "order-" + string(tt.mode),
|
||||
AccountIDHash: "hash",
|
||||
InstrumentUID: "uid",
|
||||
TradeDate: time.Date(2026, 6, 8, 0, 0, 0, 0, time.UTC),
|
||||
Side: domain.SideBuy,
|
||||
OrderType: domain.OrderTypeLimit,
|
||||
LimitPrice: decimal.NewFromInt(100),
|
||||
QuantityLots: 1,
|
||||
Status: domain.OrderStatusNew,
|
||||
AttemptNo: 1,
|
||||
})
|
||||
if tt.allowed && err != nil {
|
||||
t.Fatalf("PlaceLimit err=%v, want allowed", err)
|
||||
}
|
||||
if !tt.allowed && !errors.Is(err, ErrBrokerOrdersDisabled) {
|
||||
t.Fatalf("PlaceLimit err=%v, want ErrBrokerOrdersDisabled", err)
|
||||
}
|
||||
if tt.allowed && len(repo.Orders) != 1 {
|
||||
t.Fatalf("orders=%+v, want one persisted order", repo.Orders)
|
||||
}
|
||||
if !tt.allowed && len(repo.Orders) != 0 {
|
||||
t.Fatalf("orders=%+v, want no persisted order", repo.Orders)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestMonitorUntilRepostsAndExpiresAtDeadline(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
repo := testutil.NewMemoryRepository()
|
||||
@@ -496,6 +540,9 @@ func TestMonitorOnceRepostAccountsForFillsDuringCancel(t *testing.T) {
|
||||
if monitored.FilledLots != 2 {
|
||||
t.Fatalf("aggregate filled lots=%d, want cancel fill 2", monitored.FilledLots)
|
||||
}
|
||||
if !strings.Contains(monitored.RawStateJSON, "fill_quotes") {
|
||||
t.Fatalf("fill quote snapshot was not recorded: %s", monitored.RawStateJSON)
|
||||
}
|
||||
if got := len(gateway.posted); got != 2 {
|
||||
t.Fatalf("broker orders=%d, want initial+repost", got)
|
||||
}
|
||||
@@ -504,6 +551,78 @@ func TestMonitorOnceRepostAccountsForFillsDuringCancel(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestMonitorOnceAggregatesRepostsAcrossTicks(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
repo := testutil.NewMemoryRepository()
|
||||
gateway := newCancelFillGateway(2)
|
||||
engine := NewEngine(domain.ModeSandbox, "account", gateway, repo)
|
||||
instrument := domain.Instrument{
|
||||
InstrumentUID: "uid",
|
||||
Lot: 1,
|
||||
MinPriceIncrement: decimal.NewFromInt(1),
|
||||
FreeOrderLimitPerDay: -1,
|
||||
}
|
||||
book := domain.OrderBook{
|
||||
InstrumentUID: "uid",
|
||||
Bids: []domain.OrderBookLevel{{Price: decimal.NewFromInt(99), QuantityLots: 10}},
|
||||
Asks: []domain.OrderBookLevel{{Price: decimal.NewFromInt(101), QuantityLots: 10}},
|
||||
ReceivedAt: time.Now().UTC(),
|
||||
}
|
||||
tradeDate := time.Date(2026, 6, 6, 0, 0, 0, 0, time.UTC)
|
||||
order, err := engine.PlaceEntry(ctx, "hash", instrument, tradeDate, 5, book, 1, 1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
order.CreatedAt = time.Now().UTC().Add(-time.Minute)
|
||||
if err := repo.UpsertOrder(ctx, order); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
cfg := MonitorConfig{
|
||||
Deadline: time.Now().Add(time.Minute),
|
||||
PollInterval: time.Millisecond,
|
||||
MaxAttempts: 3,
|
||||
RepostAfter: time.Second,
|
||||
Instrument: instrument,
|
||||
ImproveTicks: 1,
|
||||
Quote: func(context.Context, string) (domain.OrderBook, error) {
|
||||
book.ReceivedAt = time.Now().UTC()
|
||||
return book, nil
|
||||
},
|
||||
}
|
||||
first, err := engine.MonitorOnce(ctx, order, cfg)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if first.FilledLots != 2 {
|
||||
t.Fatalf("first aggregate filled lots=%d, want 2", first.FilledLots)
|
||||
}
|
||||
active, err := repo.ListActiveOrders(ctx, "hash")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(active) != 1 {
|
||||
t.Fatalf("active orders=%+v, want reposted order", active)
|
||||
}
|
||||
next := active[0]
|
||||
next.CreatedAt = time.Now().UTC().Add(-time.Minute)
|
||||
if err := repo.UpsertOrder(ctx, next); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
second, err := engine.MonitorOnce(ctx, next, cfg)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if second.FilledLots != 4 {
|
||||
t.Fatalf("second aggregate filled lots=%d, want 4 across reposts", second.FilledLots)
|
||||
}
|
||||
if got := len(gateway.posted); got != 3 {
|
||||
t.Fatalf("broker orders=%d, want initial+two reposts", got)
|
||||
}
|
||||
if got := gateway.posted[2].QuantityLots; got != 1 {
|
||||
t.Fatalf("second repost quantity lots=%d, want remaining 1", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMonitorOnceKeepsCancelFillWhenRepostPostFails(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
repo := testutil.NewMemoryRepository()
|
||||
|
||||
Reference in New Issue
Block a user