mirror of
https://github.com/XTLS/Xray-core.git
synced 2025-05-03 08:41:25 +03:00
Stream reader read twice for large muxcool packet
This commit is contained in:
parent
c29af68a2f
commit
3d7ed5450b
@ -7,7 +7,9 @@ import (
|
|||||||
"github.com/quic-go/quic-go"
|
"github.com/quic-go/quic-go"
|
||||||
"github.com/xtls/xray-core/common/buf"
|
"github.com/xtls/xray-core/common/buf"
|
||||||
"github.com/xtls/xray-core/common/errors"
|
"github.com/xtls/xray-core/common/errors"
|
||||||
|
"github.com/xtls/xray-core/common/mux"
|
||||||
"github.com/xtls/xray-core/common/net"
|
"github.com/xtls/xray-core/common/net"
|
||||||
|
"github.com/xtls/xray-core/common/serial"
|
||||||
"github.com/xtls/xray-core/common/signal/done"
|
"github.com/xtls/xray-core/common/signal/done"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -70,18 +72,63 @@ func (c *interConn) acceptStreams() {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
go func() {
|
go c.readMuxCoolPacket(stream)
|
||||||
for {
|
|
||||||
received := make([]byte, buf.Size)
|
|
||||||
i, e := stream.Read(received)
|
|
||||||
errors.LogInfo(c.ctx, "Read stream ", i)
|
|
||||||
c.readChannel <- readResult{buffer: received[:i], err: e}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
c.streams = append(c.streams, stream)
|
c.streams = append(c.streams, stream)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *interConn) readMuxCoolPacket(stream quic.Stream) {
|
||||||
|
for {
|
||||||
|
received := make([]byte, buf.Size)
|
||||||
|
i, e := stream.Read(received)
|
||||||
|
if e != nil {
|
||||||
|
errors.LogErrorInner(c.ctx, e, "Error read stream, drop this buffer ", i)
|
||||||
|
c.readChannel <- readResult{buffer: nil, err: e}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
errors.LogInfo(c.ctx, "Read stream ", i)
|
||||||
|
|
||||||
|
buffer := buf.New()
|
||||||
|
buffer.Write(received[:i])
|
||||||
|
muxCoolReader := &buf.MultiBufferContainer{}
|
||||||
|
muxCoolReader.MultiBuffer = append(muxCoolReader.MultiBuffer, buffer)
|
||||||
|
var meta mux.FrameMetadata
|
||||||
|
err := meta.Unmarshal(muxCoolReader)
|
||||||
|
if err != nil {
|
||||||
|
errors.LogInfo(c.ctx, "Not a Mux Cool packet beginning, copy directly ", i)
|
||||||
|
buf.ReleaseMulti(muxCoolReader.MultiBuffer)
|
||||||
|
c.readChannel <- readResult{buffer: received[:i], err: e}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if !meta.Option.Has(mux.OptionData) {
|
||||||
|
errors.LogInfo(c.ctx, "No option data, copy directly ", i)
|
||||||
|
buf.ReleaseMulti(muxCoolReader.MultiBuffer)
|
||||||
|
c.readChannel <- readResult{buffer: received[:i], err: e}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
size, err := serial.ReadUint16(muxCoolReader)
|
||||||
|
remaining := uint16(muxCoolReader.MultiBuffer.Len())
|
||||||
|
errors.LogInfo(c.ctx, "Read stream ", i, " option size ", size, " remaining size ", remaining)
|
||||||
|
if err != nil || size <= remaining || size > remaining + 1500 {
|
||||||
|
errors.LogInfo(c.ctx, "do not wait for second part of UDP packet ", i)
|
||||||
|
buf.ReleaseMulti(muxCoolReader.MultiBuffer)
|
||||||
|
c.readChannel <- readResult{buffer: received[:i], err: e}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
i2, e := stream.Read(received[i:])
|
||||||
|
if e != nil {
|
||||||
|
errors.LogErrorInner(c.ctx, e, "Error read stream, drop this buffer ", i2)
|
||||||
|
buf.ReleaseMulti(muxCoolReader.MultiBuffer)
|
||||||
|
c.readChannel <- readResult{buffer: nil, err: e}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
errors.LogInfo(c.ctx, "Read stream i2 size ", i2)
|
||||||
|
buf.ReleaseMulti(muxCoolReader.MultiBuffer)
|
||||||
|
c.readChannel <- readResult{buffer: received[:(i + i2)], err: e}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (c *interConn) Read(b []byte) (int, error) {
|
func (c *interConn) Read(b []byte) (int, error) {
|
||||||
if c.reader.MultiBuffer.Len() > 0 {
|
if c.reader.MultiBuffer.Len() > 0 {
|
||||||
return c.reader.Read(b)
|
return c.reader.Read(b)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user