fifth version

This commit is contained in:
2026-06-08 09:03:37 +00:00
parent b9efa98758
commit 2d57c4ff1f
26 changed files with 896 additions and 159 deletions
+71 -47
View File
@@ -12,6 +12,7 @@ import (
"overnight-trading-bot/internal/domain"
"overnight-trading-bot/internal/repository"
"overnight-trading-bot/internal/risk"
"overnight-trading-bot/internal/timeutil"
)
var ErrBrokerOrdersDisabled = errors.New("broker orders are disabled for current mode")
@@ -35,6 +36,7 @@ type Engine struct {
store repository.Repository
maxQuoteAge time.Duration
freeOrderCountPolicy string
clock timeutil.Clock
mu sync.Map
}
@@ -50,13 +52,26 @@ type MonitorConfig struct {
}
func NewEngine(mode domain.Mode, accountID string, gateway Gateway, store repository.Repository) Engine {
return Engine{mode: mode, accountID: accountID, gateway: gateway, store: store, freeOrderCountPolicy: FreeOrderPolicySubmitted}
return Engine{
mode: mode,
accountID: accountID,
gateway: gateway,
store: store,
freeOrderCountPolicy: FreeOrderPolicySubmitted,
clock: timeutil.RealClock{},
}
}
func (e *Engine) SetMaxQuoteAge(maxQuoteAge time.Duration) {
e.maxQuoteAge = maxQuoteAge
}
func (e *Engine) SetClock(clock timeutil.Clock) {
if clock != nil {
e.clock = clock
}
}
func (e *Engine) SetFreeOrderCountPolicy(policy string) {
switch policy {
case FreeOrderPolicyCancelCounts:
@@ -78,7 +93,7 @@ func (e *Engine) PlaceEntry(ctx context.Context, accountIDHash string, instrumen
if err != nil {
return domain.Order{}, err
}
return e.PlaceLimit(ctx, domain.Order{
return e.placeLimit(ctx, domain.Order{
ClientOrderID: ClientOrderID(tradeDate, instrument.InstrumentUID, domain.SideBuy, attempt),
AccountIDHash: accountIDHash,
InstrumentUID: instrument.InstrumentUID,
@@ -90,7 +105,7 @@ func (e *Engine) PlaceEntry(ctx context.Context, accountIDHash string, instrumen
Status: domain.OrderStatusNew,
AttemptNo: attempt,
RawStateJSON: "{}",
})
}, instrument.FreeOrderLimitPerDay)
}
func (e *Engine) PlaceExit(ctx context.Context, accountIDHash string, instrument domain.Instrument, tradeDate time.Time, lots int64, book domain.OrderBook, improveTicks int, attempt int) (domain.Order, error) {
@@ -105,7 +120,7 @@ func (e *Engine) PlaceExit(ctx context.Context, accountIDHash string, instrument
if err != nil {
return domain.Order{}, err
}
return e.PlaceLimit(ctx, domain.Order{
return e.placeLimit(ctx, domain.Order{
ClientOrderID: ClientOrderID(tradeDate, instrument.InstrumentUID, domain.SideSell, attempt),
AccountIDHash: accountIDHash,
InstrumentUID: instrument.InstrumentUID,
@@ -117,10 +132,14 @@ func (e *Engine) PlaceExit(ctx context.Context, accountIDHash string, instrument
Status: domain.OrderStatusNew,
AttemptNo: attempt,
RawStateJSON: "{}",
})
}, instrument.FreeOrderLimitPerDay)
}
func (e *Engine) PlaceLimit(ctx context.Context, order domain.Order) (domain.Order, error) {
return e.placeLimit(ctx, order, 0)
}
func (e *Engine) placeLimit(ctx context.Context, order domain.Order, freeOrderLimit int) (domain.Order, error) {
lock := e.lockFor(order.InstrumentUID)
lock.Lock()
defer lock.Unlock()
@@ -134,7 +153,7 @@ func (e *Engine) PlaceLimit(ctx context.Context, order domain.Order) (domain.Ord
}
}
if e.mode == domain.ModePaper {
return e.placePaperLimit(ctx, order)
return e.placePaperLimit(ctx, order, freeOrderLimit)
}
if !e.mode.AllowsBrokerOrders() {
order.Status = domain.OrderStatusNew
@@ -147,7 +166,7 @@ func (e *Engine) PlaceLimit(ctx context.Context, order domain.Order) (domain.Ord
return domain.Order{}, errors.New("gateway is nil")
}
now := time.Now().UTC()
now := e.nowUTC()
draft := order
draft.Status = domain.OrderStatusSent
draft.CreatedAt = now
@@ -156,8 +175,13 @@ func (e *Engine) PlaceLimit(ctx context.Context, order domain.Order) (domain.Ord
draft.RawStateJSON = "{}"
}
if e.store != nil {
if err := e.store.UpsertOrder(ctx, draft); err != nil {
return domain.Order{}, fmt.Errorf("persist draft order: %w", err)
if err := e.store.RunInTx(ctx, func(ctx context.Context, repo repository.Repository) error {
if err := repo.UpsertOrder(ctx, draft); err != nil {
return fmt.Errorf("persist draft order: %w", err)
}
return repo.ReserveFreeOrders(ctx, order.TradeDate, order.InstrumentUID, 1, freeOrderLimit)
}); err != nil {
return domain.Order{}, err
}
}
posted, err := e.gateway.PostLimitOrder(ctx, e.accountID, order.InstrumentUID, order.Side, order.QuantityLots, order.LimitPrice, order.ClientOrderID)
@@ -180,20 +204,15 @@ func (e *Engine) PlaceLimit(ctx context.Context, order domain.Order) (domain.Ord
posted.CreatedAt = now
posted.UpdatedAt = posted.CreatedAt
if e.store != nil {
if err := e.store.RunInTx(ctx, func(ctx context.Context, repo repository.Repository) error {
if err := repo.UpsertOrder(ctx, posted); err != nil {
return fmt.Errorf("persist posted order: %w", err)
}
return repo.IncrementFreeOrders(ctx, order.TradeDate, order.InstrumentUID, 1)
}); err != nil {
if err := e.store.UpsertOrder(ctx, posted); err != nil {
return domain.Order{}, err
}
}
return posted, nil
}
func (e *Engine) placePaperLimit(ctx context.Context, order domain.Order) (domain.Order, error) {
now := time.Now().UTC()
func (e *Engine) placePaperLimit(ctx context.Context, order domain.Order, freeOrderLimit int) (domain.Order, error) {
now := e.nowUTC()
order.BrokerOrderID = "paper-" + order.ClientOrderID
order.FilledLots = order.QuantityLots
order.AvgFillPrice = order.LimitPrice
@@ -206,7 +225,7 @@ func (e *Engine) placePaperLimit(ctx context.Context, order domain.Order) (domai
if err := repo.UpsertOrder(ctx, order); err != nil {
return fmt.Errorf("persist paper order: %w", err)
}
return repo.IncrementFreeOrders(ctx, order.TradeDate, order.InstrumentUID, 1)
return repo.ReserveFreeOrders(ctx, order.TradeDate, order.InstrumentUID, 1, freeOrderLimit)
}); err != nil {
return domain.Order{}, err
}
@@ -286,12 +305,10 @@ func (e *Engine) MonitorUntil(ctx context.Context, order domain.Order, cfg Monit
if cfg.MaxAttempts <= 0 {
cfg.MaxAttempts = 1
}
lastPost := time.Now()
lastPost := e.nowUTC()
current := order
aggregate := order
seen := map[string]domain.Order{order.ClientOrderID: order}
ticker := time.NewTicker(cfg.PollInterval)
defer ticker.Stop()
for {
previous := seen[current.ClientOrderID]
refreshed, err := e.Refresh(ctx, current)
@@ -311,7 +328,7 @@ func (e *Engine) MonitorUntil(ctx context.Context, order domain.Order, cfg Monit
if isTerminal(current.Status) {
return aggregate, nil
}
if !cfg.Deadline.IsZero() && !time.Now().Before(cfg.Deadline) {
if !cfg.Deadline.IsZero() && !e.nowUTC().Before(cfg.Deadline) {
if err := e.Cancel(ctx, current); err != nil {
return aggregate, err
}
@@ -324,7 +341,7 @@ func (e *Engine) MonitorUntil(ctx context.Context, order domain.Order, cfg Monit
return aggregate, nil
}
shouldRepost := cfg.RepostAfter > 0 &&
time.Since(lastPost) >= cfg.RepostAfter &&
e.nowUTC().Sub(lastPost) >= cfg.RepostAfter &&
current.AttemptNo < cfg.MaxAttempts &&
aggregate.FilledLots < aggregate.QuantityLots &&
cfg.Quote != nil
@@ -337,13 +354,11 @@ func (e *Engine) MonitorUntil(ctx context.Context, order domain.Order, cfg Monit
current = next
seen[current.ClientOrderID] = current
}
lastPost = time.Now()
lastPost = e.nowUTC()
continue
}
select {
case <-ctx.Done():
if !e.sleep(ctx, cfg.PollInterval) {
return aggregate, ctx.Err()
case <-ticker.C:
}
}
}
@@ -372,7 +387,7 @@ func (e *Engine) MonitorOnce(ctx context.Context, order domain.Order, cfg Monito
if isTerminal(current.Status) {
return aggregate, nil
}
if !cfg.Deadline.IsZero() && !time.Now().Before(cfg.Deadline) {
if !cfg.Deadline.IsZero() && !e.nowUTC().Before(cfg.Deadline) {
if err := e.Cancel(ctx, current); err != nil {
return aggregate, err
}
@@ -385,7 +400,7 @@ func (e *Engine) MonitorOnce(ctx context.Context, order domain.Order, cfg Monito
return aggregate, nil
}
shouldRepost := cfg.RepostAfter > 0 &&
repostDue(current, cfg.RepostAfter) &&
e.repostDue(current, cfg.RepostAfter) &&
current.AttemptNo < cfg.MaxAttempts &&
aggregate.FilledLots < aggregate.QuantityLots &&
cfg.Quote != nil
@@ -409,7 +424,7 @@ func (e *Engine) repost(ctx context.Context, order domain.Order, cfg MonitorConf
if err := e.ensureRepostBudget(ctx, order, cfg.Instrument); err != nil {
return domain.Order{}, false, err
}
if !cfg.Deadline.IsZero() && !time.Now().Before(cfg.Deadline) {
if !cfg.Deadline.IsZero() && !e.nowUTC().Before(cfg.Deadline) {
return order, false, nil
}
book, err := cfg.Quote(ctx, order.InstrumentUID)
@@ -432,7 +447,7 @@ func (e *Engine) repost(ctx context.Context, order domain.Order, cfg MonitorConf
cancelled.Status = domain.OrderStatusFilled
return cancelled, true, nil
}
if !cfg.Deadline.IsZero() && !time.Now().Before(cfg.Deadline) {
if !cfg.Deadline.IsZero() && !e.nowUTC().Before(cfg.Deadline) {
return cancelled, true, nil
}
book, err = cfg.Quote(ctx, order.InstrumentUID)
@@ -440,7 +455,7 @@ func (e *Engine) repost(ctx context.Context, order domain.Order, cfg MonitorConf
return domain.Order{}, false, err
}
if cfg.RepostCheck != nil {
if err := cfg.RepostCheck(ctx, order, cfg.Instrument, book); err != nil {
if err := cfg.RepostCheck(ctx, cancelled, cfg.Instrument, book); err != nil {
return cancelled, true, nil
}
}
@@ -468,25 +483,16 @@ func (e *Engine) waitTerminal(ctx context.Context, order domain.Order, cfg Monit
if isTerminal(current.Status) {
return current, nil
}
if !cfg.Deadline.IsZero() && !time.Now().Before(cfg.Deadline) {
if !cfg.Deadline.IsZero() && !e.nowUTC().Before(cfg.Deadline) {
return current, nil
}
timer := time.NewTimer(cfg.PollInterval)
select {
case <-ctx.Done():
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
if !e.sleep(ctx, cfg.PollInterval) {
return domain.Order{}, ctx.Err()
case <-timer.C:
}
}
}
func repostDue(order domain.Order, after time.Duration) bool {
func (e *Engine) repostDue(order domain.Order, after time.Duration) bool {
if after <= 0 {
return false
}
@@ -497,7 +503,7 @@ func repostDue(order domain.Order, after time.Duration) bool {
if basis.IsZero() {
return true
}
return time.Since(basis) >= after
return e.nowUTC().Sub(basis) >= after
}
func (e *Engine) ensureRepostBudget(ctx context.Context, order domain.Order, instrument domain.Instrument) error {
@@ -530,7 +536,7 @@ func (e *Engine) checkQuoteFresh(book domain.OrderBook) error {
if book.ReceivedAt.IsZero() {
return fmt.Errorf("quote received timestamp is missing")
}
age := time.Since(book.ReceivedAt)
age := e.nowUTC().Sub(book.ReceivedAt)
if age > e.maxQuoteAge {
return fmt.Errorf("quote age %s exceeds %s", age, e.maxQuoteAge)
}
@@ -541,11 +547,29 @@ func (e *Engine) lockFor(instrumentUID string) *sync.Mutex {
value, _ := e.mu.LoadOrStore(instrumentUID, &sync.Mutex{})
lock, ok := value.(*sync.Mutex)
if !ok {
panic("execution lock has unexpected type")
lock = &sync.Mutex{}
e.mu.Store(instrumentUID, lock)
}
return lock
}
func (e *Engine) nowUTC() time.Time {
if e.clock == nil {
return time.Now().UTC()
}
return e.clock.Now().UTC()
}
func (e *Engine) sleep(ctx context.Context, d time.Duration) bool {
if d <= 0 {
return true
}
if e.clock == nil {
return timeutil.RealClock{}.Sleep(ctx.Done(), d)
}
return e.clock.Sleep(ctx.Done(), d)
}
func bestBidAsk(book domain.OrderBook) (decimal.Decimal, decimal.Decimal, error) {
bid, ok := book.BestBid()
if !ok {