From 3d7ed5450bfdfceafc7a0b149fb1f65b362e4b97 Mon Sep 17 00:00:00 2001 From: yuhan6665 <1588741+yuhan6665@users.noreply.github.com> Date: Sat, 19 Apr 2025 19:36:26 -0400 Subject: [PATCH] Stream reader read twice for large muxcool packet --- transport/internet/quic/conn.go | 63 ++++++++++++++++++++++++++++----- 1 file changed, 55 insertions(+), 8 deletions(-) diff --git a/transport/internet/quic/conn.go b/transport/internet/quic/conn.go index ddfc1235..71c9eba2 100644 --- a/transport/internet/quic/conn.go +++ b/transport/internet/quic/conn.go @@ -7,7 +7,9 @@ import ( "github.com/quic-go/quic-go" "github.com/xtls/xray-core/common/buf" "github.com/xtls/xray-core/common/errors" + "github.com/xtls/xray-core/common/mux" "github.com/xtls/xray-core/common/net" + "github.com/xtls/xray-core/common/serial" "github.com/xtls/xray-core/common/signal/done" ) @@ -70,18 +72,63 @@ func (c *interConn) acceptStreams() { continue } } - go func() { - for { - received := make([]byte, buf.Size) - i, e := stream.Read(received) - errors.LogInfo(c.ctx, "Read stream ", i) - c.readChannel <- readResult{buffer: received[:i], err: e} - } - }() + go c.readMuxCoolPacket(stream) c.streams = append(c.streams, stream) } } +func (c *interConn) readMuxCoolPacket(stream quic.Stream) { + for { + received := make([]byte, buf.Size) + i, e := stream.Read(received) + if e != nil { + errors.LogErrorInner(c.ctx, e, "Error read stream, drop this buffer ", i) + c.readChannel <- readResult{buffer: nil, err: e} + continue; + } + errors.LogInfo(c.ctx, "Read stream ", i) + + buffer := buf.New() + buffer.Write(received[:i]) + muxCoolReader := &buf.MultiBufferContainer{} + muxCoolReader.MultiBuffer = append(muxCoolReader.MultiBuffer, buffer) + var meta mux.FrameMetadata + err := meta.Unmarshal(muxCoolReader) + if err != nil { + errors.LogInfo(c.ctx, "Not a Mux Cool packet beginning, copy directly ", i) + buf.ReleaseMulti(muxCoolReader.MultiBuffer) + c.readChannel <- readResult{buffer: received[:i], err: e} + continue; + } + if !meta.Option.Has(mux.OptionData) { + errors.LogInfo(c.ctx, "No option data, copy directly ", i) + buf.ReleaseMulti(muxCoolReader.MultiBuffer) + c.readChannel <- readResult{buffer: received[:i], err: e} + continue; + } + size, err := serial.ReadUint16(muxCoolReader) + remaining := uint16(muxCoolReader.MultiBuffer.Len()) + errors.LogInfo(c.ctx, "Read stream ", i, " option size ", size, " remaining size ", remaining) + if err != nil || size <= remaining || size > remaining + 1500 { + errors.LogInfo(c.ctx, "do not wait for second part of UDP packet ", i) + buf.ReleaseMulti(muxCoolReader.MultiBuffer) + c.readChannel <- readResult{buffer: received[:i], err: e} + continue; + } + + i2, e := stream.Read(received[i:]) + if e != nil { + errors.LogErrorInner(c.ctx, e, "Error read stream, drop this buffer ", i2) + buf.ReleaseMulti(muxCoolReader.MultiBuffer) + c.readChannel <- readResult{buffer: nil, err: e} + continue; + } + errors.LogInfo(c.ctx, "Read stream i2 size ", i2) + buf.ReleaseMulti(muxCoolReader.MultiBuffer) + c.readChannel <- readResult{buffer: received[:(i + i2)], err: e} + } +} + func (c *interConn) Read(b []byte) (int, error) { if c.reader.MultiBuffer.Len() > 0 { return c.reader.Read(b)