From 3045e8e8a9d13f2aac488c66ec0ee3d0db77de76 Mon Sep 17 00:00:00 2001 From: byene0923 Date: Sat, 23 Jul 2022 03:19:32 +0800 Subject: [PATCH 1/6] fix: bigFileReader and fsSmallFileReader don't support single range --- pkg/app/fs.go | 82 +++++++++++++++------------------------------------ 1 file changed, 23 insertions(+), 59 deletions(-) diff --git a/pkg/app/fs.go b/pkg/app/fs.go index 5a8bbde77..8d25bf2b5 100644 --- a/pkg/app/fs.go +++ b/pkg/app/fs.go @@ -175,60 +175,46 @@ type FS struct { h HandlerFunc } -type byteRangeUpdater interface { - UpdateByteRange(startPos, endPos int) error -} - type fsSmallFileReader struct { - ff *fsFile - startPos int - endPos int + ff *fsFile + offset int64 } func (r *fsSmallFileReader) Close() error { ff := r.ff ff.decReadersCount() r.ff = nil - r.startPos = 0 - r.endPos = 0 + r.offset = 0 ff.h.smallFileReaderPool.Put(r) return nil } -func (r *fsSmallFileReader) UpdateByteRange(startPos, endPos int) error { - r.startPos = startPos - r.endPos = endPos + 1 - return nil -} - func (r *fsSmallFileReader) Read(p []byte) (int, error) { - tailLen := r.endPos - r.startPos - if tailLen <= 0 { - return 0, io.EOF - } - if len(p) > tailLen { - p = p[:tailLen] - } - ff := r.ff + if ff.f != nil { - n, err := ff.f.ReadAt(p, int64(r.startPos)) - r.startPos += n + n, err := ff.f.ReadAt(p, r.offset) + r.offset += int64(n) return n, err } - n := copy(p, ff.dirIndex[r.startPos:]) - r.startPos += n + n := copy(p, ff.dirIndex[r.offset:]) + r.offset += int64(n) return n, nil } func (r *fsSmallFileReader) WriteTo(w io.Writer) (int64, error) { + if r.offset != 0 { + panic("BUG: no-zero offset! Read() mustn't be called before WriteTo()") + } + ff := r.ff var n int var err error + if ff.f == nil { - n, err = w.Write(ff.dirIndex[r.startPos:r.endPos]) + n, err = w.Write(ff.dirIndex) return int64(n), err } @@ -236,20 +222,13 @@ func (r *fsSmallFileReader) WriteTo(w io.Writer) (int64, error) { return rf.ReadFrom(r) } - curPos := r.startPos bufv := utils.CopyBufPool.Get() buf := bufv.([]byte) + for err == nil { - tailLen := r.endPos - curPos - if tailLen <= 0 { - break - } - if len(buf) > tailLen { - buf = buf[:tailLen] - } - n, err = ff.f.ReadAt(buf, int64(curPos)) + n, err = ff.f.ReadAt(buf, r.offset) nw, errw := w.Write(buf[:n]) - curPos += nw + r.offset += int64(nw) if errw == nil && nw != n { panic("BUG: Write(p) returned (n, nil), where n != len(p)") } @@ -262,7 +241,7 @@ func (r *fsSmallFileReader) WriteTo(w io.Writer) (int64, error) { if err == io.EOF { err = nil } - return int64(curPos - r.startPos), err + return r.offset, err } // ServeFile returns HTTP response containing compressed file contents @@ -379,36 +358,23 @@ type fsHandler struct { type bigFileReader struct { f *os.File ff *fsFile - r io.Reader - lr io.LimitedReader -} - -func (r *bigFileReader) UpdateByteRange(startPos, endPos int) error { - if _, err := r.f.Seek(int64(startPos), 0); err != nil { - return err - } - r.r = &r.lr - r.lr.R = r.f - r.lr.N = int64(endPos - startPos + 1) - return nil } func (r *bigFileReader) Read(p []byte) (int, error) { - return r.r.Read(p) + return r.f.Read(p) } func (r *bigFileReader) WriteTo(w io.Writer) (int64, error) { if rf, ok := w.(io.ReaderFrom); ok { // fast path. Sendfile must be triggered - return rf.ReadFrom(r.r) + return rf.ReadFrom(r.f) } zw := network.NewWriter(w) // slow pathw - return utils.CopyZeroAlloc(zw, r.r) + return utils.CopyZeroAlloc(zw, r.f) } func (r *bigFileReader) Close() error { - r.r = r.f n, err := r.f.Seek(0, 0) if err == nil { if n != 0 { @@ -766,7 +732,6 @@ func (ff *fsFile) bigFileReader() (io.Reader, error) { return &bigFileReader{ f: f, ff: ff, - r: f, }, nil } @@ -788,9 +753,8 @@ func (ff *fsFile) smallFileReader() io.Reader { } r := v.(*fsSmallFileReader) r.ff = ff - r.endPos = ff.contentLength - if r.startPos > 0 { - panic("BUG: fsSmallFileReader with non-nil startPos found in the pool") + if r.offset > 0 { + panic("BUG: fsSmallFileReader with non-nil offset found in the pool") } return r } From ec201f14e4a57af7b0f969190c589ddf12d36a0e Mon Sep 17 00:00:00 2001 From: byene0923 Date: Fri, 29 Jul 2022 01:15:28 +0800 Subject: [PATCH 2/6] feat: add first version code --- pkg/app/fs.go | 735 ++++++++++++++++++++++++++++++++++++++--- pkg/app/fs_test.go | 49 +-- pkg/protocol/header.go | 24 +- 3 files changed, 728 insertions(+), 80 deletions(-) diff --git a/pkg/app/fs.go b/pkg/app/fs.go index 8d25bf2b5..07c264c04 100644 --- a/pkg/app/fs.go +++ b/pkg/app/fs.go @@ -45,6 +45,7 @@ import ( "bytes" "compress/gzip" "context" + "crypto/rand" "fmt" "html" "io" @@ -57,6 +58,7 @@ import ( "strings" "sync" "time" + "unsafe" "github.com/cloudwego/hertz/internal/bytesconv" "github.com/cloudwego/hertz/internal/bytestr" @@ -350,7 +352,8 @@ type fsHandler struct { compressedCache map[string]*fsFile cacheLock sync.Mutex - smallFileReaderPool sync.Pool + smallFileReaderPool sync.Pool + smallRangeReaderPool sync.Pool } // bigFileReader attempts to trigger sendfile @@ -759,6 +762,72 @@ func (ff *fsFile) smallFileReader() io.Reader { return r } +func (ff *fsFile) NewRangeReader() (io.Reader, error) { + if ff.isBig() { + r, err := ff.bigRangeReader() + if err != nil { + ff.decReadersCount() + } + return r, err + } + return ff.smallRangeReader(), nil +} + +func (ff *fsFile) smallRangeReader() io.Reader { + v := ff.h.smallRangeReaderPool.Get() + if v == nil { + v = &smallRangeReader{ + sta: make([]int, 64), + end: make([]int, 64), + buf: make([]byte, 4096), + } + } + r := v.(*smallRangeReader) + r.ff = ff + r.sta, r.end, r.buf = r.sta[:0], r.end[:0], r.buf[:0] + r.first = true + r.hasCurRangeBodyHeader = false + r.curRange = 0 + r.boundary = randomBoundary() + return r +} + +func (ff *fsFile) bigRangeReader() (io.Reader, error) { + if ff.f == nil { + panic("BUG: ff.f must be non-nil in bigRangeReader") + } + var r io.Reader + + ff.rangeBigFilesLock.Lock() + n := len(ff.rangeBigFiles) + if n > 0 { + r = ff.rangeBigFiles[n-1] + ff.rangeBigFiles = ff.rangeBigFiles[:n-1] + } + ff.rangeBigFilesLock.Unlock() + + if r != nil { + return r, nil + } + f, err := os.Open(ff.f.Name()) + if err != nil { + return nil, fmt.Errorf("cannot open already opened file: %s", err) + } + rr := &bigRangeReader{ + ff: ff, + f: f, + sta: make([]int, 64), + end: make([]int, 64), + buf: make([]byte, 4096), + first: true, + hasCurRangeBodyHeader: false, + curRange: 0, + boundary: randomBoundary(), + } + rr.sta, rr.end, rr.buf = rr.sta[:0], rr.end[:0], rr.buf[:0] + return rr, nil +} + func (h *fsHandler) handleRequest(c context.Context, ctx *RequestContext) { var path []byte if h.pathRewrite != nil { @@ -854,7 +923,14 @@ func (h *fsHandler) handleRequest(c context.Context, ctx *RequestContext) { return } - r, err := ff.NewReader() + var r io.Reader + var err error + if h.acceptByteRange && len(byteRange) > 0 { + r, err = ff.NewRangeReader() + } else { + r, err = ff.NewReader() + } + if err != nil { hlog.Errorf("HERTZ: Cannot obtain file reader for path=%q, error=%s", path, err) ctx.AbortWithMsg("Internal Server Error", consts.StatusInternalServerError) @@ -871,7 +947,7 @@ func (h *fsHandler) handleRequest(c context.Context, ctx *RequestContext) { if h.acceptByteRange { hdr.SetCanonical(bytestr.StrAcceptRanges, bytestr.StrBytes) if len(byteRange) > 0 { - startPos, endPos, err := ParseByteRange(byteRange, contentLength) + staList, endList, err := ParseByteRanges(byteRange, contentLength) if err != nil { r.(io.Closer).Close() hlog.Errorf("HERTZ: Cannot parse byte range %q for path=%q,error=%s", byteRange, path, err) @@ -879,15 +955,19 @@ func (h *fsHandler) handleRequest(c context.Context, ctx *RequestContext) { return } - if err = r.(byteRangeUpdater).UpdateByteRange(startPos, endPos); err != nil { + if err = r.(byteRangeHandler).ByteRangeUpdate(staList, endList); err != nil { r.(io.Closer).Close() hlog.Errorf("HERTZ: Cannot seek byte range %q for path=%q, error=%s", byteRange, path, err) ctx.AbortWithMsg("Internal Server Error", consts.StatusInternalServerError) return } - - hdr.SetContentRange(startPos, endPos, contentLength) - contentLength = endPos - startPos + 1 + switch { + case len(staList) == 1: + hdr.SetContentRange(staList[0], endList[0], contentLength) + case len(staList) > 1: + hdr.SetContentType(fmt.Sprintf("multipart/byteranges; boundary=%s", r.(byteRangeHandler).Boundary())) + } + contentLength = r.(byteRangeHandler).ByteRangeLength() statusCode = consts.StatusPartialContent } } @@ -930,6 +1010,9 @@ type fsFile struct { bigFiles []*bigFileReader bigFilesLock sync.Mutex + + rangeBigFiles []*bigRangeReader + rangeBigFilesLock sync.Mutex } func (ff *fsFile) Release() { @@ -942,6 +1025,12 @@ func (ff *fsFile) Release() { r.f.Close() } ff.bigFilesLock.Unlock() + + ff.rangeBigFilesLock.Lock() + for _, r := range ff.rangeBigFiles { + r.f.Close() + } + ff.rangeBigFilesLock.Unlock() } } } @@ -1057,58 +1146,60 @@ func fsModTime(t time.Time) time.Time { return t.In(time.UTC).Truncate(time.Second) } -// ParseByteRange parses 'Range: bytes=...' header value. -// -// It follows https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35 . -func ParseByteRange(byteRange []byte, contentLength int) (startPos, endPos int, err error) { +func ParseByteRanges(byteRange []byte, contentLength int) (startPos, endPos []int, err error) { b := byteRange if !bytes.HasPrefix(b, bytestr.StrBytes) { - return 0, 0, fmt.Errorf("unsupported range units: %q. Expecting %q", byteRange, bytestr.StrBytes) + return startPos, endPos, fmt.Errorf("unsupported range units: %q. Expecting %q", byteRange, bytestr.StrBytes) } - b = b[len(bytestr.StrBytes):] if len(b) == 0 || b[0] != '=' { - return 0, 0, fmt.Errorf("missing byte range in %q", byteRange) + return startPos, endPos, fmt.Errorf("missing byte range in %q", byteRange) } b = b[1:] - n := bytes.IndexByte(b, '-') - if n < 0 { - return 0, 0, fmt.Errorf("missing the end position of byte range in %q", byteRange) - } + var n, sta, end, v int - if n == 0 { - v, err := bytesconv.ParseUint(b[n+1:]) - if err != nil { - return 0, 0, err + for _, ra := range bytes.Split(b, bytesconv.S2b(",")) { + n = bytes.IndexByte(ra, '-') + if n < 0 { + return startPos, endPos, fmt.Errorf("missing the end position of byte range in %q", byteRange) } - startPos := contentLength - v - if startPos < 0 { - startPos = 0 + if n == 0 { + v, err = bytesconv.ParseUint(ra[n+1:]) + if err != nil { + return startPos, endPos, err + } + sta = contentLength - v + if sta < 0 { + sta = 0 + } + startPos = append(startPos, sta) + endPos = append(endPos, contentLength-1) + continue } - return startPos, contentLength - 1, nil - } - - if startPos, err = bytesconv.ParseUint(b[:n]); err != nil { - return 0, 0, err - } - if startPos >= contentLength { - return 0, 0, fmt.Errorf("the start position of byte range cannot exceed %d. byte range %q", contentLength-1, byteRange) - } - - b = b[n+1:] - if len(b) == 0 { - return startPos, contentLength - 1, nil - } - - if endPos, err = bytesconv.ParseUint(b); err != nil { - return 0, 0, err - } - if endPos >= contentLength { - endPos = contentLength - 1 - } - if endPos < startPos { - return 0, 0, fmt.Errorf("the start position of byte range cannot exceed the end position. byte range %q", byteRange) + if sta, err = bytesconv.ParseUint(ra[:n]); err != nil { + return startPos, endPos, err + } + if sta >= contentLength { + return startPos, endPos, fmt.Errorf("the start position of byte range cannot exceed %d. byte range %q", contentLength-1, byteRange) + } + ra = ra[n+1:] + if len(ra) == 0 { + startPos = append(startPos, sta) + endPos = append(endPos, contentLength-1) + continue + } + if end, err = bytesconv.ParseUint(ra); err != nil { + return startPos, endPos, err + } + if end >= contentLength { + end = contentLength - 1 + } + if end < sta { + return startPos, endPos, fmt.Errorf("the start position of byte range cannot exceed the end position. byte range %q", byteRange) + } + startPos = append(startPos, sta) + endPos = append(endPos, end) } return startPos, endPos, nil } @@ -1226,3 +1317,551 @@ func NewPathSlashesStripper(slashesCount int) PathRewriteFunc { return stripLeadingSlashes(ctx.Path(), slashesCount) } } + +type byteRangeHandler interface { + ByteRangeUpdate(sta, end []int) error + ByteRangeLength() int + Boundary() string + IsMultiRange() bool +} + +type smallRangeReader struct { + ff *fsFile + sta []int + end []int + + // handle multi range + buf []byte + first bool + hasCurRangeBodyHeader bool + curRange int + boundary string +} + +func (r *smallRangeReader) ByteRangeUpdate(sta, end []int) error { + for i := range sta { + r.sta = append(r.sta, sta[i]) + r.end = append(r.end, end[i]+1) + } + return nil +} + +func (r *smallRangeReader) IsMultiRange() bool { + return len(r.sta) > 1 +} + +func (r *smallRangeReader) Boundary() string { + return r.boundary +} + +func (r *smallRangeReader) ByteRangeLength() int { + if !r.IsMultiRange() { + return r.end[0] - r.sta[0] + } + sum := 0 + first := true + bufv := utils.CopyBufPool.Get() + buf := bufv.([]byte) + for i := range r.sta { + buf = buf[:0] + if i > 0 { + first = false + } + multiRangeBodyHeader(&buf, r.sta[i], r.end[i], r.ff.contentLength, r.ff.contentType, r.boundary, first) + sum += len(buf) + } + buf = buf[:0] + multiRangeBodyEnd(&buf, r.boundary) + sum += len(buf) + utils.CopyBufPool.Put(bufv) + return sum +} + +func (r *smallRangeReader) Close() error { + ff := r.ff + ff.decReadersCount() + r.ff = nil + + r.sta, r.end, r.buf = r.sta[:0], r.end[:0], r.buf[:0] + r.curRange = 0 + r.first = true + r.hasCurRangeBodyHeader = false + r.boundary = "" + ff.h.smallRangeReaderPool.Put(r) + return nil +} + +func (r *smallRangeReader) Read(p []byte) (int, error) { + + ff := r.ff + var err error + cPos, cLen, n := 0, 0, 0 + if ff.f == nil { + + if r.IsMultiRange() { + cLen = len(p[cPos:]) + n = copy(p[cPos:], r.buf) + if len(r.buf) > cLen { + r.buf = r.buf[n:] + return n, nil + } + cPos += n + r.buf = r.buf[:0] + } + + for i := r.curRange; i < len(r.sta); i++ { + if r.sta[i] >= r.end[i] { + continue + } + + if r.IsMultiRange() && !r.hasCurRangeBodyHeader { + r.hasCurRangeBodyHeader = true + multiRangeBodyHeader(&r.buf, r.sta[i], r.end[i], r.ff.contentLength, r.ff.contentType, r.boundary, r.first) + r.first = false + cLen = len(p[cPos:]) + n = copy(p[cPos:], r.buf) + cPos += n + if len(r.buf) > cLen { + r.buf = r.buf[n:] + return cPos, nil + } + r.buf = r.buf[:0] + } + + cLen = len(p[cPos:]) + n = copy(p[cPos:], ff.dirIndex[r.sta[i]:r.end[i]]) + if r.end[i]-r.sta[i] > cLen { + r.sta[i] += n + return cPos + n, nil + } + cPos += n + r.curRange = i + 1 + } + + if r.IsMultiRange() { + multiRangeBodyEnd(&r.buf, r.boundary) + cLen = len(p[cPos:]) + n = copy(p[cPos:], r.buf) + if len(r.buf) > len(p[cPos:]) { + r.buf = r.buf[n:] + return cPos + n, nil + } + cPos += n + r.buf = r.buf[:0] + } + + return cPos, io.EOF + } + + if r.curRange >= len(r.sta) && len(r.buf) == 0 { + return 0, io.EOF + } + + if r.IsMultiRange() { + cLen = len(p[cPos:]) + n = copy(p[cPos:], r.buf) + if len(r.buf) > cLen { + r.buf = r.buf[n:] + return n, nil + } + cPos += n + r.buf = r.buf[:0] + } + + for i := r.curRange; i < len(r.sta); i++ { + if r.sta[i] >= r.end[i] { + continue + } + + if r.IsMultiRange() && !r.hasCurRangeBodyHeader { + r.hasCurRangeBodyHeader = true + multiRangeBodyHeader(&r.buf, r.sta[i], r.end[i], r.ff.contentLength, r.ff.contentType, r.boundary, r.first) + r.first = false + cLen = len(p[cPos:]) + n = copy(p[cPos:], r.buf) + cPos += n + if len(r.buf) > cLen { + r.buf = r.buf[n:] + return cPos, nil + } + r.buf = r.buf[:0] + } + + cLen = len(p[cPos:]) + if r.end[i]-r.sta[i] > cLen { + n, err = ff.f.ReadAt(p[cPos:], int64(r.sta[i])) + r.sta[i] += n + return cPos + n, err + } + + // todo use pool? + cBody := make([]byte, r.end[i]-r.sta[i]) + n, err = ff.f.ReadAt(cBody, int64(r.sta[i])) + if err != nil && err != io.EOF { + return cPos + n, err + } + n = copy(p[cPos:], cBody) + cPos += n + r.curRange = i + 1 + r.hasCurRangeBodyHeader = false + } + + if r.IsMultiRange() { + multiRangeBodyEnd(&r.buf, r.boundary) + cLen = len(p[cPos:]) + n = copy(p[cPos:], r.buf) + if len(r.buf) > len(p[cPos:]) { + r.buf = r.buf[n:] + return cPos + n, nil + } + cPos += n + r.buf = r.buf[:0] + } + return cPos, io.EOF +} + +func (r *smallRangeReader) WriteTo(w io.Writer) (int64, error) { + ff := r.ff + var err error + cPos, cLen, n, sum := 0, 0, 0, 0 + bufv := utils.CopyBufPool.Get() + buf := bufv.([]byte) + first := true + if ff.f == nil { + for i := range r.sta { + + if r.IsMultiRange() { + buf = buf[:0] + if i > 0 { + first = false + } + multiRangeBodyHeader(&buf, r.sta[i], r.end[i], ff.contentLength, ff.contentType, r.boundary, first) + nw, errw := w.Write(buf) + if errw == nil && nw != len(buf) { + panic("BUG: buf returned(n, nil),where n != len(buf)") + } + sum += nw + if errw != nil { + return int64(sum), errw + } + } + + n, err = w.Write(ff.dirIndex[r.sta[i]:r.end[i]]) + sum += n + if err != nil { + return int64(sum), err + } + } + if r.IsMultiRange() { + buf = buf[:0] + multiRangeBodyEnd(&buf, r.boundary) + nw, errw := w.Write(buf) + if errw == nil && nw != len(buf) { + panic("BUG: buf returned (n, nil), where n != len(buf)") + } + sum += nw + err = errw + } + return int64(sum), err + } + + if rf, ok := w.(io.ReaderFrom); ok { + return rf.ReadFrom(r) + } + + for i := range r.sta { + + if r.IsMultiRange() { + buf = buf[:0] + if i > 0 { + first = false + } + multiRangeBodyHeader(&buf, r.sta[i], r.end[i], ff.contentLength, ff.contentType, r.boundary, first) + nw, errw := w.Write(buf) + if errw == nil && nw != len(buf) { + panic("BUG: buf returned(n, nil),where n != len(buf)") + } + sum += nw + if errw != nil { + return int64(sum), errw + } + } + + cPos = r.sta[i] + buf = buf[:4096] + for err == nil { + cLen = r.end[i] - cPos + if cLen <= 0 { + break + } + if len(buf) > cLen { + buf = buf[:cLen] + } + n, err = ff.f.ReadAt(buf, int64(cPos)) + nw, errw := w.Write(buf[:n]) + cPos += nw + sum += nw + if errw == nil && nw != n { + panic("BUG: Write(p) returned (n, nil), where n != len(p)") + } + if err == nil { + err = errw + } + } + } + if err == io.EOF { + err = nil + } + if r.IsMultiRange() { + buf = buf[:0] + multiRangeBodyEnd(&buf, r.boundary) + nw, errw := w.Write(buf) + if errw == nil && nw != len(buf) { + panic("BUG: buf returned (n, nil), where n != len(buf)") + } + sum += nw + if err == nil { + err = errw + } + } + return int64(sum), err +} + +func multiRangeBodyHeader(b *[]byte, sta, end, size int, ct, boundary string, first bool) { + if first { + *b = append(*b, bytesconv.S2b(fmt.Sprintf("\r\n"))...) + } + *b = append(*b, bytesconv.S2b(fmt.Sprintf("--%s\r\n", boundary))...) + *b = append(*b, bytesconv.S2b(fmt.Sprintf("%s: %s\r\n", consts.HeaderContentRange, + fmt.Sprintf("bytes %d-%d/%d", sta, end-1, size)))...) + *b = append(*b, bytesconv.S2b(fmt.Sprintf("%s: %s\n", consts.HeaderContentType, ct))...) + *b = append(*b, bytesconv.S2b(fmt.Sprintf("\r\n"))...) +} + +func multiRangeBodyEnd(b *[]byte, boundary string) { + *b = append(*b, bytesconv.S2b(fmt.Sprintf("\r\n--%s--", boundary))...) +} + +func randomBoundary() string { + var buf [30]byte + _, err := io.ReadFull(rand.Reader, buf[:]) + if err != nil { + panic(err) + } + return *(*string)(unsafe.Pointer(&buf)) +} + +type bigRangeReader struct { + ff *fsFile + f *os.File + sta []int + end []int + + // handle multi range + buf []byte + first bool + hasCurRangeBodyHeader bool + curRange int + boundary string +} + +func (r *bigRangeReader) ByteRangeUpdate(sta, end []int) error { + for i := range sta { + r.sta = append(r.sta, sta[i]) + r.end = append(r.end, end[i]+1) + } + return nil +} + +func (r *bigRangeReader) IsMultiRange() bool { + return len(r.sta) > 1 +} + +func (r *bigRangeReader) Boundary() string { + return r.boundary +} + +func (r *bigRangeReader) ByteRangeLength() int { + if !r.IsMultiRange() { + return r.end[0] - r.sta[0] + } + sum := 0 + first := true + bufv := utils.CopyBufPool.Get() + buf := bufv.([]byte) + for i := range r.sta { + buf = buf[:0] + if i > 0 { + first = false + } + multiRangeBodyHeader(&buf, r.sta[i], r.end[i], r.ff.contentLength, r.ff.contentType, r.boundary, first) + sum += len(buf) + } + buf = buf[:0] + multiRangeBodyEnd(&buf, r.boundary) + sum += len(buf) + utils.CopyBufPool.Put(bufv) + return sum +} + +func (r *bigRangeReader) Close() error { + n, err := r.f.Seek(0, 0) + if err == nil { + if n != 0 { + panic("BUG: File.Seek(0,0) returned (non-zero, nil)") + } + ff := r.ff + ff.rangeBigFilesLock.Lock() + r.end, r.sta, r.buf = r.end[:0], r.sta[:0], r.buf[:0] + r.curRange = 0 + r.first = true + r.hasCurRangeBodyHeader = false + + ff.rangeBigFiles = append(ff.rangeBigFiles, r) + ff.rangeBigFilesLock.Unlock() + } else { + r.f.Close() + } + r.ff.decReadersCount() + return err +} + +func (r *bigRangeReader) Read(p []byte) (int, error) { + if r.curRange >= len(r.sta) && len(r.buf) == 0 { + return 0, io.EOF + } + + ff := r.ff + var err error + cPos, cLen, n := 0, 0, 0 + + if r.IsMultiRange() { + cLen = len(p[cPos:]) + n = copy(p[cPos:], r.buf) + if len(r.buf) > cLen { + r.buf = r.buf[n:] + return n, nil + } + cPos += n + r.buf = r.buf[:0] + } + + for i := r.curRange; i < len(r.sta); i++ { + if r.sta[i] >= r.end[i] { + continue + } + + if r.IsMultiRange() && !r.hasCurRangeBodyHeader { + r.hasCurRangeBodyHeader = true + multiRangeBodyHeader(&r.buf, r.sta[i], r.end[i], r.ff.contentLength, r.ff.contentType, r.boundary, r.first) + r.first = false + cLen = len(p[cPos:]) + n = copy(p[cPos:], r.buf) + cPos += n + if len(r.buf) > cLen { + r.buf = r.buf[n:] + return cPos, nil + } + r.buf = r.buf[:0] + } + + cLen = len(p[cPos:]) + if r.end[i]-r.sta[i] > cLen { + n, err = r.f.ReadAt(p[cPos:], int64(r.sta[i])) + r.sta[i] += n + return cPos + n, err + } + + // todo use pool? + cBody := make([]byte, r.end[i]-r.sta[i]) + n, err = ff.f.ReadAt(cBody, int64(r.sta[i])) + if err != nil && err != io.EOF { + return cPos + n, err + } + n = copy(p[cPos:], cBody) + cPos += n + r.curRange = i + 1 + r.hasCurRangeBodyHeader = false + } + + if r.IsMultiRange() { + multiRangeBodyEnd(&r.buf, r.boundary) + cLen = len(p[cPos:]) + n = copy(p[cPos:], r.buf) + if len(r.buf) > len(p[cPos:]) { + r.buf = r.buf[n:] + return cPos + n, nil + } + cPos += n + r.buf = r.buf[:0] + } + return cPos, io.EOF +} + +func (r *bigRangeReader) WriteTo(w io.Writer) (int64, error) { + if rf, ok := w.(io.ReaderFrom); ok { + return rf.ReadFrom(r) + } + + var err error + cPos, cLen, n, sum := 0, 0, 0, 0 + first := true + + bufv := utils.CopyBufPool.Get() + buf := bufv.([]byte) + + for i := range r.sta { + + if r.IsMultiRange() { + buf = buf[:0] + if i > 0 { + first = false + } + multiRangeBodyHeader(&buf, r.sta[i], r.end[i], r.ff.contentLength, r.ff.contentType, r.boundary, first) + nw, errw := w.Write(buf) + if errw == nil && nw != len(buf) { + panic("BUG: buf returned(n, nil),where n != len(buf)") + } + sum += nw + if errw != nil { + return int64(sum), errw + } + } + + cPos = r.sta[i] + buf = buf[:4096] + for err == nil { + cLen = r.end[i] - cPos + if cLen <= 0 { + break + } + if len(buf) > cLen { + buf = buf[:cLen] + } + n, err = r.f.ReadAt(buf, int64(cPos)) + nw, errw := w.Write(buf[:n]) + cPos += nw + sum += nw + if errw == nil && nw != n { + panic("BUG: Write(p) returned (n, nil), where n != len(p)") + } + if err == nil { + err = errw + } + } + } + if err == io.EOF { + err = nil + } + if r.IsMultiRange() { + buf = buf[:0] + multiRangeBodyEnd(&buf, r.boundary) + nw, errw := w.Write(buf) + if err == nil && nw != len(buf) { + panic("BUG: buf returned (n, nil), where n != len(buf)") + } + sum += nw + err = errw + } + return int64(sum), err +} diff --git a/pkg/app/fs_test.go b/pkg/app/fs_test.go index 084e516d2..dd20c76aa 100644 --- a/pkg/app/fs_test.go +++ b/pkg/app/fs_test.go @@ -355,11 +355,14 @@ func testFSByteRange(t *testing.T, h HandlerFunc, filePath string) { } fileSize := len(expectedBody) - startPos := rand.Intn(fileSize) - endPos := rand.Intn(fileSize) - if endPos < startPos { - startPos, endPos = endPos, startPos + startPos, endPos := make([]int, 0), make([]int, 0) + start := rand.Intn(fileSize) + end := rand.Intn(fileSize) + if end < start { + start, end = end, start } + startPos = append(startPos, start) + endPos = append(endPos, end) ctx.Request.SetRequestURI(filePath) ctx.Request.Header.SetByteRange(startPos, endPos) @@ -381,13 +384,13 @@ func testFSByteRange(t *testing.T, h HandlerFunc, filePath string) { t.Fatalf("unexpected content-range %q. Expecting %q. filePath=%q", cr, expectedCR, filePath) } body := r.Body() - bodySize := endPos - startPos + 1 + bodySize := end - start + 1 if len(body) != bodySize { t.Fatalf("unexpected body size %d. Expecting %d. filePath=%q, startPos=%d, endPos=%d", len(body), bodySize, filePath, startPos, endPos) } - expectedBody = expectedBody[startPos : endPos+1] + expectedBody = expectedBody[start : end+1] if !bytes.Equal(body, expectedBody) { t.Fatalf("unexpected body %q. Expecting %q. filePath=%q, startPos=%d, endPos=%d", body, expectedBody, filePath, startPos, endPos) @@ -407,33 +410,35 @@ func getFileContents(path string) ([]byte, error) { func TestParseByteRangeSuccess(t *testing.T) { t.Parallel() - testParseByteRangeSuccess(t, "bytes=0-0", 1, 0, 0) - testParseByteRangeSuccess(t, "bytes=1234-6789", 6790, 1234, 6789) + testParseByteRangeSuccess(t, "bytes=0-0", 1, []int{0}, []int{0}) + testParseByteRangeSuccess(t, "bytes=1234-6789", 6790, []int{1234}, []int{6789}) - testParseByteRangeSuccess(t, "bytes=123-", 456, 123, 455) - testParseByteRangeSuccess(t, "bytes=-1", 1, 0, 0) - testParseByteRangeSuccess(t, "bytes=-123", 456, 333, 455) + testParseByteRangeSuccess(t, "bytes=123-", 456, []int{123}, []int{455}) + testParseByteRangeSuccess(t, "bytes=-1", 1, []int{0}, []int{0}) + testParseByteRangeSuccess(t, "bytes=-123", 456, []int{333}, []int{455}) // End position exceeding content-length. It should be updated to content-length-1. // See https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35 - testParseByteRangeSuccess(t, "bytes=1-2345", 234, 1, 233) - testParseByteRangeSuccess(t, "bytes=0-2345", 2345, 0, 2344) + testParseByteRangeSuccess(t, "bytes=1-2345", 234, []int{1}, []int{233}) + testParseByteRangeSuccess(t, "bytes=0-2345", 2345, []int{0}, []int{2344}) // Start position overflow. Whole range must be returned. // See https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35 - testParseByteRangeSuccess(t, "bytes=-567", 56, 0, 55) + testParseByteRangeSuccess(t, "bytes=-567", 56, []int{0}, []int{55}) } -func testParseByteRangeSuccess(t *testing.T, v string, contentLength, startPos, endPos int) { - startPos1, endPos1, err := ParseByteRange([]byte(v), contentLength) +func testParseByteRangeSuccess(t *testing.T, v string, contentLength int, startPos, endPos []int) { + startPos1, endPos1, err := ParseByteRanges([]byte(v), contentLength) if err != nil { t.Fatalf("unexpected error: %s. v=%q, contentLength=%d", err, v, contentLength) } - if startPos1 != startPos { - t.Fatalf("unexpected startPos=%d. Expecting %d. v=%q, contentLength=%d", startPos1, startPos, v, contentLength) - } - if endPos1 != endPos { - t.Fatalf("unexpected endPos=%d. Expectind %d. v=%q, contentLength=%d", endPos1, endPos, v, contentLength) + for i := range startPos1 { + if startPos1[i] != startPos[i] { + t.Fatalf("unexpected startPos=%d. Expecting %d. v=%q, contentLength=%d", startPos1[i], startPos[i], v, contentLength) + } + if endPos1[i] != endPos[i] { + t.Fatalf("unexpected endPos=%d. Expectind %d. v=%q, contentLength=%d", endPos1[i], endPos[i], v, contentLength) + } } } @@ -465,7 +470,7 @@ func TestParseByteRangeError(t *testing.T) { } func testParseByteRangeError(t *testing.T, v string, contentLength int) { - _, _, err := ParseByteRange([]byte(v), contentLength) + _, _, err := ParseByteRanges([]byte(v), contentLength) if err == nil { t.Fatalf("expecting error when parsing byte range %q", v) } diff --git a/pkg/protocol/header.go b/pkg/protocol/header.go index 48fb0199f..2d56cb84a 100644 --- a/pkg/protocol/header.go +++ b/pkg/protocol/header.go @@ -862,21 +862,25 @@ func (h *RequestHeader) Reset() { // // * If startPos is negative, then 'bytes=-startPos' value is set. // * If endPos is negative, then 'bytes=startPos-' value is set. -func (h *RequestHeader) SetByteRange(startPos, endPos int) { +func (h *RequestHeader) SetByteRange(startPos, endPos []int) { b := h.bufKV.value[:0] b = append(b, bytestr.StrBytes...) - b = append(b, '=') - if startPos >= 0 { - b = bytesconv.AppendUint(b, startPos) - } else { - endPos = -startPos - } b = append(b, '-') - if endPos >= 0 { - b = bytesconv.AppendUint(b, endPos) + for i := range startPos { + if i > 0 { + b = append(b, ',') + } + if startPos[i] >= 0 { + b = bytesconv.AppendUint(b, startPos[i]) + } else { + endPos[i] = -startPos[i] + } + b = append(b, '-') + if endPos[i] >= 0 { + b = bytesconv.AppendUint(b, endPos[i]) + } } h.bufKV.value = b - h.SetCanonical(bytestr.StrRange, h.bufKV.value) } From 4fd76946d3df68a9b37316e42b3c918599136971 Mon Sep 17 00:00:00 2001 From: byene0923 Date: Sun, 31 Jul 2022 21:10:01 +0800 Subject: [PATCH 3/6] fix:add test --- pkg/app/fs.go | 205 +++++++++++----------- pkg/app/fs_test.go | 385 +++++++++++++++++++++++++++++++++++++++-- pkg/protocol/header.go | 2 +- 3 files changed, 476 insertions(+), 116 deletions(-) diff --git a/pkg/app/fs.go b/pkg/app/fs.go index 07c264c04..4cb1a933b 100644 --- a/pkg/app/fs.go +++ b/pkg/app/fs.go @@ -47,19 +47,6 @@ import ( "context" "crypto/rand" "fmt" - "html" - "io" - "io/ioutil" - "mime" - "net/http" - "os" - "path/filepath" - "sort" - "strings" - "sync" - "time" - "unsafe" - "github.com/cloudwego/hertz/internal/bytesconv" "github.com/cloudwego/hertz/internal/bytestr" "github.com/cloudwego/hertz/internal/nocopy" @@ -71,6 +58,17 @@ import ( "github.com/cloudwego/hertz/pkg/network" "github.com/cloudwego/hertz/pkg/protocol" "github.com/cloudwego/hertz/pkg/protocol/consts" + "html" + "io" + "io/ioutil" + "mime" + "net/http" + "os" + "path/filepath" + "sort" + "strings" + "sync" + "time" ) var ( @@ -785,9 +783,9 @@ func (ff *fsFile) smallRangeReader() io.Reader { r := v.(*smallRangeReader) r.ff = ff r.sta, r.end, r.buf = r.sta[:0], r.end[:0], r.buf[:0] - r.first = true + r.hf = true r.hasCurRangeBodyHeader = false - r.curRange = 0 + r.cRange = 0 r.boundary = randomBoundary() return r } @@ -819,9 +817,10 @@ func (ff *fsFile) bigRangeReader() (io.Reader, error) { sta: make([]int, 64), end: make([]int, 64), buf: make([]byte, 4096), - first: true, + hf: true, + he: false, hasCurRangeBodyHeader: false, - curRange: 0, + cRange: 0, boundary: randomBoundary(), } rr.sta, rr.end, rr.buf = rr.sta[:0], rr.end[:0], rr.buf[:0] @@ -1332,9 +1331,10 @@ type smallRangeReader struct { // handle multi range buf []byte - first bool + hf bool + he bool hasCurRangeBodyHeader bool - curRange int + cRange int boundary string } @@ -1358,23 +1358,7 @@ func (r *smallRangeReader) ByteRangeLength() int { if !r.IsMultiRange() { return r.end[0] - r.sta[0] } - sum := 0 - first := true - bufv := utils.CopyBufPool.Get() - buf := bufv.([]byte) - for i := range r.sta { - buf = buf[:0] - if i > 0 { - first = false - } - multiRangeBodyHeader(&buf, r.sta[i], r.end[i], r.ff.contentLength, r.ff.contentType, r.boundary, first) - sum += len(buf) - } - buf = buf[:0] - multiRangeBodyEnd(&buf, r.boundary) - sum += len(buf) - utils.CopyBufPool.Put(bufv) - return sum + return multiRangeLength(r.sta, r.end, r.ff.contentLength, r.ff.contentType, r.boundary) } func (r *smallRangeReader) Close() error { @@ -1383,8 +1367,9 @@ func (r *smallRangeReader) Close() error { r.ff = nil r.sta, r.end, r.buf = r.sta[:0], r.end[:0], r.buf[:0] - r.curRange = 0 - r.first = true + r.cRange = 0 + r.hf = true + r.he = false r.hasCurRangeBodyHeader = false r.boundary = "" ff.h.smallRangeReaderPool.Put(r) @@ -1409,15 +1394,15 @@ func (r *smallRangeReader) Read(p []byte) (int, error) { r.buf = r.buf[:0] } - for i := r.curRange; i < len(r.sta); i++ { + for i := r.cRange; i < len(r.sta); i++ { if r.sta[i] >= r.end[i] { continue } if r.IsMultiRange() && !r.hasCurRangeBodyHeader { r.hasCurRangeBodyHeader = true - multiRangeBodyHeader(&r.buf, r.sta[i], r.end[i], r.ff.contentLength, r.ff.contentType, r.boundary, r.first) - r.first = false + multiRangeBodyHeader(&r.buf, r.sta[i], r.end[i], r.ff.contentLength, r.ff.contentType, r.boundary, r.hf) + r.hf = false cLen = len(p[cPos:]) n = copy(p[cPos:], r.buf) cPos += n @@ -1435,11 +1420,12 @@ func (r *smallRangeReader) Read(p []byte) (int, error) { return cPos + n, nil } cPos += n - r.curRange = i + 1 + r.cRange = i + 1 } if r.IsMultiRange() { multiRangeBodyEnd(&r.buf, r.boundary) + r.he = true cLen = len(p[cPos:]) n = copy(p[cPos:], r.buf) if len(r.buf) > len(p[cPos:]) { @@ -1453,7 +1439,7 @@ func (r *smallRangeReader) Read(p []byte) (int, error) { return cPos, io.EOF } - if r.curRange >= len(r.sta) && len(r.buf) == 0 { + if r.cRange >= len(r.sta) && len(r.buf) == 0 { return 0, io.EOF } @@ -1468,15 +1454,15 @@ func (r *smallRangeReader) Read(p []byte) (int, error) { r.buf = r.buf[:0] } - for i := r.curRange; i < len(r.sta); i++ { + for i := r.cRange; i < len(r.sta); i++ { if r.sta[i] >= r.end[i] { continue } if r.IsMultiRange() && !r.hasCurRangeBodyHeader { r.hasCurRangeBodyHeader = true - multiRangeBodyHeader(&r.buf, r.sta[i], r.end[i], r.ff.contentLength, r.ff.contentType, r.boundary, r.first) - r.first = false + multiRangeBodyHeader(&r.buf, r.sta[i], r.end[i], r.ff.contentLength, r.ff.contentType, r.boundary, r.hf) + r.hf = false cLen = len(p[cPos:]) n = copy(p[cPos:], r.buf) cPos += n @@ -1502,12 +1488,13 @@ func (r *smallRangeReader) Read(p []byte) (int, error) { } n = copy(p[cPos:], cBody) cPos += n - r.curRange = i + 1 + r.cRange = i + 1 r.hasCurRangeBodyHeader = false } if r.IsMultiRange() { multiRangeBodyEnd(&r.buf, r.boundary) + r.he = true cLen = len(p[cPos:]) n = copy(p[cPos:], r.buf) if len(r.buf) > len(p[cPos:]) { @@ -1526,16 +1513,16 @@ func (r *smallRangeReader) WriteTo(w io.Writer) (int64, error) { cPos, cLen, n, sum := 0, 0, 0, 0 bufv := utils.CopyBufPool.Get() buf := bufv.([]byte) - first := true + hf := true if ff.f == nil { for i := range r.sta { if r.IsMultiRange() { buf = buf[:0] if i > 0 { - first = false + hf = false } - multiRangeBodyHeader(&buf, r.sta[i], r.end[i], ff.contentLength, ff.contentType, r.boundary, first) + multiRangeBodyHeader(&buf, r.sta[i], r.end[i], ff.contentLength, ff.contentType, r.boundary, hf) nw, errw := w.Write(buf) if errw == nil && nw != len(buf) { panic("BUG: buf returned(n, nil),where n != len(buf)") @@ -1574,9 +1561,9 @@ func (r *smallRangeReader) WriteTo(w io.Writer) (int64, error) { if r.IsMultiRange() { buf = buf[:0] if i > 0 { - first = false + hf = false } - multiRangeBodyHeader(&buf, r.sta[i], r.end[i], ff.contentLength, ff.contentType, r.boundary, first) + multiRangeBodyHeader(&buf, r.sta[i], r.end[i], ff.contentLength, ff.contentType, r.boundary, hf) nw, errw := w.Write(buf) if errw == nil && nw != len(buf) { panic("BUG: buf returned(n, nil),where n != len(buf)") @@ -1627,30 +1614,6 @@ func (r *smallRangeReader) WriteTo(w io.Writer) (int64, error) { return int64(sum), err } -func multiRangeBodyHeader(b *[]byte, sta, end, size int, ct, boundary string, first bool) { - if first { - *b = append(*b, bytesconv.S2b(fmt.Sprintf("\r\n"))...) - } - *b = append(*b, bytesconv.S2b(fmt.Sprintf("--%s\r\n", boundary))...) - *b = append(*b, bytesconv.S2b(fmt.Sprintf("%s: %s\r\n", consts.HeaderContentRange, - fmt.Sprintf("bytes %d-%d/%d", sta, end-1, size)))...) - *b = append(*b, bytesconv.S2b(fmt.Sprintf("%s: %s\n", consts.HeaderContentType, ct))...) - *b = append(*b, bytesconv.S2b(fmt.Sprintf("\r\n"))...) -} - -func multiRangeBodyEnd(b *[]byte, boundary string) { - *b = append(*b, bytesconv.S2b(fmt.Sprintf("\r\n--%s--", boundary))...) -} - -func randomBoundary() string { - var buf [30]byte - _, err := io.ReadFull(rand.Reader, buf[:]) - if err != nil { - panic(err) - } - return *(*string)(unsafe.Pointer(&buf)) -} - type bigRangeReader struct { ff *fsFile f *os.File @@ -1659,9 +1622,10 @@ type bigRangeReader struct { // handle multi range buf []byte - first bool + hf bool + he bool hasCurRangeBodyHeader bool - curRange int + cRange int boundary string } @@ -1685,23 +1649,7 @@ func (r *bigRangeReader) ByteRangeLength() int { if !r.IsMultiRange() { return r.end[0] - r.sta[0] } - sum := 0 - first := true - bufv := utils.CopyBufPool.Get() - buf := bufv.([]byte) - for i := range r.sta { - buf = buf[:0] - if i > 0 { - first = false - } - multiRangeBodyHeader(&buf, r.sta[i], r.end[i], r.ff.contentLength, r.ff.contentType, r.boundary, first) - sum += len(buf) - } - buf = buf[:0] - multiRangeBodyEnd(&buf, r.boundary) - sum += len(buf) - utils.CopyBufPool.Put(bufv) - return sum + return multiRangeLength(r.sta, r.end, r.ff.contentLength, r.ff.contentType, r.boundary) } func (r *bigRangeReader) Close() error { @@ -1713,8 +1661,9 @@ func (r *bigRangeReader) Close() error { ff := r.ff ff.rangeBigFilesLock.Lock() r.end, r.sta, r.buf = r.end[:0], r.sta[:0], r.buf[:0] - r.curRange = 0 - r.first = true + r.cRange = 0 + r.hf = true + r.he = false r.hasCurRangeBodyHeader = false ff.rangeBigFiles = append(ff.rangeBigFiles, r) @@ -1727,7 +1676,7 @@ func (r *bigRangeReader) Close() error { } func (r *bigRangeReader) Read(p []byte) (int, error) { - if r.curRange >= len(r.sta) && len(r.buf) == 0 { + if r.cRange >= len(r.sta) && len(r.buf) == 0 && r.he { return 0, io.EOF } @@ -1746,15 +1695,15 @@ func (r *bigRangeReader) Read(p []byte) (int, error) { r.buf = r.buf[:0] } - for i := r.curRange; i < len(r.sta); i++ { + for i := r.cRange; i < len(r.sta); i++ { if r.sta[i] >= r.end[i] { continue } if r.IsMultiRange() && !r.hasCurRangeBodyHeader { r.hasCurRangeBodyHeader = true - multiRangeBodyHeader(&r.buf, r.sta[i], r.end[i], r.ff.contentLength, r.ff.contentType, r.boundary, r.first) - r.first = false + multiRangeBodyHeader(&r.buf, r.sta[i], r.end[i], r.ff.contentLength, r.ff.contentType, r.boundary, r.hf) + r.hf = false cLen = len(p[cPos:]) n = copy(p[cPos:], r.buf) cPos += n @@ -1780,12 +1729,13 @@ func (r *bigRangeReader) Read(p []byte) (int, error) { } n = copy(p[cPos:], cBody) cPos += n - r.curRange = i + 1 + r.cRange = i + 1 r.hasCurRangeBodyHeader = false } if r.IsMultiRange() { multiRangeBodyEnd(&r.buf, r.boundary) + r.he = true cLen = len(p[cPos:]) n = copy(p[cPos:], r.buf) if len(r.buf) > len(p[cPos:]) { @@ -1805,7 +1755,7 @@ func (r *bigRangeReader) WriteTo(w io.Writer) (int64, error) { var err error cPos, cLen, n, sum := 0, 0, 0, 0 - first := true + hf := true bufv := utils.CopyBufPool.Get() buf := bufv.([]byte) @@ -1815,9 +1765,9 @@ func (r *bigRangeReader) WriteTo(w io.Writer) (int64, error) { if r.IsMultiRange() { buf = buf[:0] if i > 0 { - first = false + hf = false } - multiRangeBodyHeader(&buf, r.sta[i], r.end[i], r.ff.contentLength, r.ff.contentType, r.boundary, first) + multiRangeBodyHeader(&buf, r.sta[i], r.end[i], r.ff.contentLength, r.ff.contentType, r.boundary, hf) nw, errw := w.Write(buf) if errw == nil && nw != len(buf) { panic("BUG: buf returned(n, nil),where n != len(buf)") @@ -1865,3 +1815,48 @@ func (r *bigRangeReader) WriteTo(w io.Writer) (int64, error) { } return int64(sum), err } + +func multiRangeLength(sta, end []int, cl int, ct, bd string) int { + sum := 0 + hf := true + bufv := utils.CopyBufPool.Get() + buf := bufv.([]byte) + for i := range sta { + buf = buf[:0] + if i > 0 { + hf = false + } + multiRangeBodyHeader(&buf, sta[i], end[i], cl, ct, bd, hf) + sum += len(buf) + sum += end[i] - sta[i] + } + buf = buf[:0] + multiRangeBodyEnd(&buf, bd) + sum += len(buf) + utils.CopyBufPool.Put(bufv) + return sum +} + +func multiRangeBodyHeader(b *[]byte, sta, end, size int, ct, boundary string, hf bool) { + if !hf { + *b = append(*b, bytesconv.S2b(fmt.Sprintf("\r\n"))...) + } + *b = append(*b, bytesconv.S2b(fmt.Sprintf("--%s\r\n", boundary))...) + *b = append(*b, bytesconv.S2b(fmt.Sprintf("%s: %s\r\n", consts.HeaderContentRange, + fmt.Sprintf("bytes %d-%d/%d", sta, end-1, size)))...) + *b = append(*b, bytesconv.S2b(fmt.Sprintf("%s: %s\n", consts.HeaderContentType, ct))...) + *b = append(*b, bytesconv.S2b(fmt.Sprintf("\r\n"))...) +} + +func multiRangeBodyEnd(b *[]byte, boundary string) { + *b = append(*b, bytesconv.S2b(fmt.Sprintf("\r\n--%s--", boundary))...) +} + +func randomBoundary() string { + var buf [30]byte + _, err := io.ReadFull(rand.Reader, buf[:]) + if err != nil { + panic(err) + } + return fmt.Sprintf("%x", buf[:]) +} diff --git a/pkg/app/fs_test.go b/pkg/app/fs_test.go index dd20c76aa..6c4ba144f 100644 --- a/pkg/app/fs_test.go +++ b/pkg/app/fs_test.go @@ -45,12 +45,14 @@ import ( "bytes" "context" "fmt" + "github.com/cloudwego/hertz/internal/bytesconv" "io" "io/ioutil" "math/rand" "os" "path" "sort" + "strings" "testing" "time" @@ -303,7 +305,7 @@ func TestServeFileUncompressed(t *testing.T) { } } -func TestFSByteRangeConcurrent(t *testing.T) { +func TestFSSingleByteRangeConcurrent(t *testing.T) { t.Parallel() fs := &FS{ @@ -317,7 +319,8 @@ func TestFSByteRangeConcurrent(t *testing.T) { for i := 0; i < concurrency; i++ { go func() { for j := 0; j < 5; j++ { - testFSByteRange(t, h, "/fs.go") + testFSSingleByteRangeOfRead(t, h, "/fs.go") + testFSSingleByteRangeOfWriteTo(t, h, "/fs.go") } ch <- struct{}{} }() @@ -332,7 +335,7 @@ func TestFSByteRangeConcurrent(t *testing.T) { } } -func TestFSByteRangeSingleThread(t *testing.T) { +func TestFSSingleByteRangeSingleThread(t *testing.T) { t.Parallel() fs := &FS{ @@ -341,10 +344,81 @@ func TestFSByteRangeSingleThread(t *testing.T) { } h := fs.NewRequestHandler() - testFSByteRange(t, h, "/fs.go") + testFSSingleByteRangeOfRead(t, h, "/fs.go") + testFSSingleByteRangeOfWriteTo(t, h, "/fs.go") } -func testFSByteRange(t *testing.T, h HandlerFunc, filePath string) { +func testFSSingleByteRangeOfWriteTo(t *testing.T, h HandlerFunc, filePath string) { + var ctx RequestContext + req := &protocol.Request{} + req.CopyTo(&ctx.Request) + + expectedBody, err := getFileContents(filePath) + if err != nil { + t.Fatalf("cannot read file %q: %s", filePath, err) + } + + fileSize := len(expectedBody) + startPos, endPos := make([]int, 0), make([]int, 0) + start := rand.Intn(fileSize) + end := rand.Intn(fileSize) + if end < start { + start, end = end, start + } + startPos = append(startPos, start) + endPos = append(endPos, end) + + ctx.Request.SetRequestURI(filePath) + ctx.Request.Header.SetByteRange(startPos, endPos) + h(context.Background(), &ctx) + + bodySize := end - start + 1 + + // todo 代码优化 + // test WriteTo(w io.Writer) + if fileSize > consts.MaxSmallFileSize { + reader, ok := ctx.Response.BodyStream().(*bigRangeReader) + if !ok { + t.Fatal("expected bigRangeReader") + } + buf := bytes.NewBuffer(nil) + + n, err := reader.WriteTo(pureWriter{buf}) + if err != nil { + t.Fatal(err) + } + if n != int64(bodySize) { + t.Fatalf("expected %d bytes, got %d bytes", bodySize, n) + } + body1 := buf.String() + if body1 != bytesconv.B2s(expectedBody[start:end+1]) { + t.Fatalf("unexpected body %q. Expecting %q. filePath=%q, startPos=%d, endPos=%d", + body1, bytesconv.B2s(expectedBody[start:end+1]), filePath, startPos, endPos) + } + } else { + reader, ok := ctx.Response.BodyStream().(*smallRangeReader) + if !ok { + t.Fatal("expected smallRangeReader") + } + buf := bytes.NewBuffer(nil) + + n, err := reader.WriteTo(pureWriter{buf}) + if err != nil { + t.Fatal(err) + } + if n != int64(bodySize) { + t.Fatalf("expected %d bytes, got %d bytes", bodySize, n) + } + body1 := buf.String() + if body1 != bytesconv.B2s(expectedBody[start:end+1]) { + t.Fatalf("unexpected body %q. Expecting %q. filePath=%q, startPos=%d, endPos=%d", + body1, bytesconv.B2s(expectedBody[start:end+1]), filePath, startPos, endPos) + } + } + +} + +func testFSSingleByteRangeOfRead(t *testing.T, h HandlerFunc, filePath string) { var ctx RequestContext req := &protocol.Request{} req.CopyTo(&ctx.Request) @@ -370,6 +444,7 @@ func testFSByteRange(t *testing.T, h HandlerFunc, filePath string) { var r protocol.Response s := resp.GetHTTP1Response(&ctx.Response).String() + zr := mock.NewZeroCopyReader(s) if err := resp.Read(&r, zr); err != nil { t.Fatalf("unexpected error: %s. filePath=%q", err, filePath) @@ -377,9 +452,10 @@ func testFSByteRange(t *testing.T, h HandlerFunc, filePath string) { if r.StatusCode() != consts.StatusPartialContent { t.Fatalf("unexpected status code: %d. Expecting %d. filePath=%q", r.StatusCode(), consts.StatusPartialContent, filePath) } + cr := r.Header.Peek(consts.HeaderContentRange) - expectedCR := fmt.Sprintf("bytes %d-%d/%d", startPos, endPos, fileSize) + expectedCR := fmt.Sprintf("bytes %d-%d/%d", start, end, fileSize) if string(cr) != expectedCR { t.Fatalf("unexpected content-range %q. Expecting %q. filePath=%q", cr, expectedCR, filePath) } @@ -397,6 +473,281 @@ func testFSByteRange(t *testing.T, h HandlerFunc, filePath string) { } } +func TestFSMultiByteRangeConcurrent(t *testing.T) { + t.Parallel() + + fs := &FS{ + Root: ".", + AcceptByteRange: true, + } + h := fs.NewRequestHandler() + + concurrency := 10 + ch := make(chan struct{}, concurrency) + for i := 0; i < concurrency; i++ { + go func() { + for j := 0; j < 50; j++ { + testFSMultiByteRangeOfRead(t, h, "/fs.go") + testFSMultiByteRangeOfWriteTo(t, h, "/fs.go") + } + ch <- struct{}{} + }() + } + + for i := 0; i < concurrency; i++ { + select { + case <-time.After(5 * time.Second): + t.Fatalf("timeout") + case <-ch: + } + } +} + +func TestFSMultiByteRangeSingleThread(t *testing.T) { + t.Parallel() + + fs := &FS{ + Root: ".", + AcceptByteRange: true, + } + h := fs.NewRequestHandler() + + testFSMultiByteRangeOfRead(t, h, "/fs.go") + testFSMultiByteRangeOfWriteTo(t, h, "/fs.go") +} + +func testFSMultiByteRangeOfWriteTo(t *testing.T, h HandlerFunc, filePath string) { + var ctx RequestContext + req := &protocol.Request{} + req.CopyTo(&ctx.Request) + + expectedBody, err := getFileContents(filePath) + if err != nil { + t.Fatalf("cannot read file %q: %s", filePath, err) + } + + num := rand.Intn(50) + 2 + + fileSize := len(expectedBody) + startPos, endPos := make([]int, 0), make([]int, 0) + + for i := 0; i < num; i++ { + start := rand.Intn(fileSize) + end := rand.Intn(fileSize) + if end < start { + start, end = end, start + } + startPos = append(startPos, start) + endPos = append(endPos, end) + } + + ctx.Request.SetRequestURI(filePath) + ctx.Request.Header.SetByteRange(startPos, endPos) + h(context.Background(), &ctx) + + var body string + var boundary string + + if fileSize > consts.MaxSmallFileSize { + reader, ok := ctx.Response.BodyStream().(*bigRangeReader) + boundary = reader.Boundary() + if !ok { + t.Fatal("expected bigRangeReader") + } + buf := bytes.NewBuffer(nil) + + _, err := reader.WriteTo(pureWriter{buf}) + if err != nil { + t.Fatal(err) + } + body = buf.String() + } else { + reader, ok := ctx.Response.BodyStream().(*smallRangeReader) + boundary = reader.Boundary() + if !ok { + t.Fatal("expected smallRangeReader") + } + buf := bytes.NewBuffer(nil) + + _, err := reader.WriteTo(pureWriter{buf}) + if err != nil { + t.Fatal(err) + } + body = buf.String() + } + + singleBodys := make([]byte, 0) + + // compare with single range + for i := 0; i < num; i++ { + var ctx1 RequestContext + req1 := &protocol.Request{} + req1.CopyTo(&ctx1.Request) + ctx1.Request.SetRequestURI(filePath) + ctx1.Request.Header.SetByteRange([]int{startPos[i]}, []int{endPos[i]}) + h(context.Background(), &ctx1) + + var r1 protocol.Response + s1 := resp.GetHTTP1Response(&ctx1.Response).String() + + zr1 := mock.NewZeroCopyReader(s1) + if err1 := resp.Read(&r1, zr1); err1 != nil { + t.Fatalf("unexpected error: %s. filePath=%q", err1, filePath) + } + if r1.StatusCode() != consts.StatusPartialContent { + t.Fatalf("unexpected status code: %d. Expecting %d. filePath=%q", r1.StatusCode(), consts.StatusPartialContent, filePath) + } + + cr1 := r1.Header.Peek(consts.HeaderContentRange) + expectedCR1 := fmt.Sprintf("bytes %d-%d/%d", startPos[i], endPos[i], fileSize) + if string(cr1) != expectedCR1 { + t.Fatalf("unexpected content-range %q. Expecting %q. filePath=%q", cr1, expectedCR1, filePath) + } + + body1 := r1.Body() + bodySize := endPos[i] - startPos[i] + 1 + if len(body1) != bodySize { + t.Fatalf("unexpected body size %d. Expecting %d. filePath=%q, startPos=%d, endPos=%d", + len(body), bodySize, filePath, startPos[i], endPos[i]) + } + + expectedBody1 := expectedBody[startPos[i] : endPos[i]+1] + if !bytes.Equal(body1, expectedBody1) { + t.Fatalf("unexpected body %q. Expecting %q. filePath=%q, startPos=%d, endPos=%d", + body1, expectedBody1, filePath, startPos[i], endPos[i]) + } + buf := make([]byte, 0) + first := true + if i > 0 { + first = false + } + ct1 := r1.Header.Peek(consts.HeaderContentType) + multiRangeBodyHeader(&buf, startPos[i], endPos[i]+1, fileSize, string(ct1), boundary, first) + singleBodys = append(singleBodys, buf...) + singleBodys = append(singleBodys, body1...) + } + buf := make([]byte, 0) + multiRangeBodyEnd(&buf, boundary) + singleBodys = append(singleBodys, buf...) + if body != string(singleBodys) { + t.Fatalf("multipart ranges content is invalid") + } + +} + +func testFSMultiByteRangeOfRead(t *testing.T, h HandlerFunc, filePath string) { + var ctx RequestContext + req := &protocol.Request{} + req.CopyTo(&ctx.Request) + + expectedBody, err := getFileContents(filePath) + if err != nil { + t.Fatalf("cannot read file %q: %s", filePath, err) + } + + num := rand.Intn(50) + 2 + + fileSize := len(expectedBody) + startPos, endPos := make([]int, 0), make([]int, 0) + + for i := 0; i < num; i++ { + start := rand.Intn(fileSize) + end := rand.Intn(fileSize) + if end < start { + start, end = end, start + } + startPos = append(startPos, start) + endPos = append(endPos, end) + } + + ctx.Request.SetRequestURI(filePath) + ctx.Request.Header.SetByteRange(startPos, endPos) + h(context.Background(), &ctx) + + var r protocol.Response + s := resp.GetHTTP1Response(&ctx.Response).String() + + zr := mock.NewZeroCopyReader(s) + if err := resp.Read(&r, zr); err != nil { + t.Fatalf("unexpected error: %s. filePath=%q", err, filePath) + } + if r.StatusCode() != consts.StatusPartialContent { + t.Fatalf("unexpected status code: %d. Expecting %d. filePath=%q", r.StatusCode(), consts.StatusPartialContent, filePath) + } + + ct := r.Header.Peek(consts.HeaderContentType) + expectedCT := fmt.Sprintf("multipart/byteranges; boundary=") + if !strings.HasPrefix(string(ct), expectedCT) { + t.Fatalf("unexpected content-type %q. Expecting prefix %q. filePath=%q", ct, expectedCT, filePath) + } + + cl := r.Header.Peek(consts.HeaderContentLength) + + body := r.Body() + if fmt.Sprintf("%d", len(body)) != bytesconv.B2s(cl) { + t.Fatalf("error") + } + + boundary := string(ct)[len(expectedCT):] + + singleBodys := make([]byte, 0) + + // compare with single range + for i := 0; i < num; i++ { + var ctx1 RequestContext + req1 := &protocol.Request{} + req1.CopyTo(&ctx1.Request) + ctx1.Request.SetRequestURI(filePath) + ctx1.Request.Header.SetByteRange([]int{startPos[i]}, []int{endPos[i]}) + h(context.Background(), &ctx1) + + var r1 protocol.Response + s1 := resp.GetHTTP1Response(&ctx1.Response).String() + + zr1 := mock.NewZeroCopyReader(s1) + if err1 := resp.Read(&r1, zr1); err1 != nil { + t.Fatalf("unexpected error: %s. filePath=%q", err1, filePath) + } + if r1.StatusCode() != consts.StatusPartialContent { + t.Fatalf("unexpected status code: %d. Expecting %d. filePath=%q", r1.StatusCode(), consts.StatusPartialContent, filePath) + } + + cr1 := r1.Header.Peek(consts.HeaderContentRange) + expectedCR1 := fmt.Sprintf("bytes %d-%d/%d", startPos[i], endPos[i], fileSize) + if string(cr1) != expectedCR1 { + t.Fatalf("unexpected content-range %q. Expecting %q. filePath=%q", cr1, expectedCR1, filePath) + } + + body1 := r1.Body() + bodySize := endPos[i] - startPos[i] + 1 + if len(body1) != bodySize { + t.Fatalf("unexpected body size %d. Expecting %d. filePath=%q, startPos=%d, endPos=%d", + len(body), bodySize, filePath, startPos[i], endPos[i]) + } + + expectedBody1 := expectedBody[startPos[i] : endPos[i]+1] + if !bytes.Equal(body1, expectedBody1) { + t.Fatalf("unexpected body %q. Expecting %q. filePath=%q, startPos=%d, endPos=%d", + body1, expectedBody1, filePath, startPos[i], endPos[i]) + } + buf := make([]byte, 0) + first := true + if i > 0 { + first = false + } + ct1 := r1.Header.Peek(consts.HeaderContentType) + multiRangeBodyHeader(&buf, startPos[i], endPos[i]+1, fileSize, string(ct1), boundary, first) + singleBodys = append(singleBodys, buf...) + singleBodys = append(singleBodys, body1...) + } + buf := make([]byte, 0) + multiRangeBodyEnd(&buf, boundary) + singleBodys = append(singleBodys, buf...) + if string(body) != string(singleBodys) { + t.Fatalf("multipart ranges content is invalid") + } +} + func getFileContents(path string) ([]byte, error) { path = "." + path f, err := os.Open(path) @@ -407,7 +758,7 @@ func getFileContents(path string) ([]byte, error) { return ioutil.ReadAll(f) } -func TestParseByteRangeSuccess(t *testing.T) { +func TestParseByteSingleRangeSuccess(t *testing.T) { t.Parallel() testParseByteRangeSuccess(t, "bytes=0-0", 1, []int{0}, []int{0}) @@ -427,6 +778,23 @@ func TestParseByteRangeSuccess(t *testing.T) { testParseByteRangeSuccess(t, "bytes=-567", 56, []int{0}, []int{55}) } +func TestParseByteMultiRangeSuccess(t *testing.T) { + t.Parallel() + + testParseByteRangeSuccess(t, "bytes=1234-6789,23-342", 6790, []int{1234, 23}, []int{6789, 342}) + testParseByteRangeSuccess(t, "bytes=123-,-123", 456, []int{123, 333}, []int{455, 455}) + + // End position exceeding content-length. It should be updated to content-length-1. + // See https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35 + testParseByteRangeSuccess(t, "bytes=1-2345,1-345", 234, []int{1, 1}, []int{233, 233}) + + testParseByteRangeSuccess(t, "bytes=0-2345,23-1234", 2345, []int{0, 23}, []int{2344, 1234}) + + // Start position overflow. Whole range must be returned. + // See https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35 + testParseByteRangeSuccess(t, "bytes=-567,-765", 56, []int{0, 0}, []int{55, 55}) +} + func testParseByteRangeSuccess(t *testing.T, v string, contentLength int, startPos, endPos []int) { startPos1, endPos1, err := ParseByteRanges([]byte(v), contentLength) if err != nil { @@ -459,9 +827,6 @@ func TestParseByteRangeError(t *testing.T) { testParseByteRangeError(t, "bytes=1-foobar", 123) testParseByteRangeError(t, "bytes=df-344", 545) - // multiple byte ranges - testParseByteRangeError(t, "bytes=1-2,4-6", 123) - // byte range exceeding contentLength testParseByteRangeError(t, "bytes=123-", 12) diff --git a/pkg/protocol/header.go b/pkg/protocol/header.go index 2d56cb84a..29c7bee2a 100644 --- a/pkg/protocol/header.go +++ b/pkg/protocol/header.go @@ -865,7 +865,7 @@ func (h *RequestHeader) Reset() { func (h *RequestHeader) SetByteRange(startPos, endPos []int) { b := h.bufKV.value[:0] b = append(b, bytestr.StrBytes...) - b = append(b, '-') + b = append(b, '=') for i := range startPos { if i > 0 { b = append(b, ',') From a30bed6017ee258a1e902d9938f8810f163e918d Mon Sep 17 00:00:00 2001 From: byene0923 Date: Mon, 1 Aug 2022 00:01:29 +0800 Subject: [PATCH 4/6] feat: support multi_range fix:fix bug fix: remove useless code fix: lint code --- pkg/app/fs.go | 133 +++++++++++++++------------------------------ pkg/app/fs_test.go | 8 +-- 2 files changed, 47 insertions(+), 94 deletions(-) diff --git a/pkg/app/fs.go b/pkg/app/fs.go index a02de1ed8..dd9e1f40e 100644 --- a/pkg/app/fs.go +++ b/pkg/app/fs.go @@ -47,17 +47,6 @@ import ( "context" "crypto/rand" "fmt" - "github.com/cloudwego/hertz/internal/bytesconv" - "github.com/cloudwego/hertz/internal/bytestr" - "github.com/cloudwego/hertz/internal/nocopy" - "github.com/cloudwego/hertz/pkg/common/bytebufferpool" - "github.com/cloudwego/hertz/pkg/common/compress" - "github.com/cloudwego/hertz/pkg/common/errors" - "github.com/cloudwego/hertz/pkg/common/hlog" - "github.com/cloudwego/hertz/pkg/common/utils" - "github.com/cloudwego/hertz/pkg/network" - "github.com/cloudwego/hertz/pkg/protocol" - "github.com/cloudwego/hertz/pkg/protocol/consts" "html" "io" "io/ioutil" @@ -69,6 +58,18 @@ import ( "strings" "sync" "time" + + "github.com/cloudwego/hertz/internal/bytesconv" + "github.com/cloudwego/hertz/internal/bytestr" + "github.com/cloudwego/hertz/internal/nocopy" + "github.com/cloudwego/hertz/pkg/common/bytebufferpool" + "github.com/cloudwego/hertz/pkg/common/compress" + "github.com/cloudwego/hertz/pkg/common/errors" + "github.com/cloudwego/hertz/pkg/common/hlog" + "github.com/cloudwego/hertz/pkg/common/utils" + "github.com/cloudwego/hertz/pkg/network" + "github.com/cloudwego/hertz/pkg/protocol" + "github.com/cloudwego/hertz/pkg/protocol/consts" ) var ( @@ -784,6 +785,7 @@ func (ff *fsFile) smallRangeReader() io.Reader { r.ff = ff r.sta, r.end, r.buf = r.sta[:0], r.end[:0], r.buf[:0] r.hf = true + r.he = false r.hasCurRangeBodyHeader = false r.cRange = 0 r.boundary = randomBoundary() @@ -1342,69 +1344,11 @@ func (r *smallRangeReader) Close() error { } func (r *smallRangeReader) Read(p []byte) (int, error) { - ff := r.ff var err error cPos, cLen, n := 0, 0, 0 - if ff.f == nil { - - if r.IsMultiRange() { - cLen = len(p[cPos:]) - n = copy(p[cPos:], r.buf) - if len(r.buf) > cLen { - r.buf = r.buf[n:] - return n, nil - } - cPos += n - r.buf = r.buf[:0] - } - - for i := r.cRange; i < len(r.sta); i++ { - if r.sta[i] >= r.end[i] { - continue - } - if r.IsMultiRange() && !r.hasCurRangeBodyHeader { - r.hasCurRangeBodyHeader = true - multiRangeBodyHeader(&r.buf, r.sta[i], r.end[i], r.ff.contentLength, r.ff.contentType, r.boundary, r.hf) - r.hf = false - cLen = len(p[cPos:]) - n = copy(p[cPos:], r.buf) - cPos += n - if len(r.buf) > cLen { - r.buf = r.buf[n:] - return cPos, nil - } - r.buf = r.buf[:0] - } - - cLen = len(p[cPos:]) - n = copy(p[cPos:], ff.dirIndex[r.sta[i]:r.end[i]]) - if r.end[i]-r.sta[i] > cLen { - r.sta[i] += n - return cPos + n, nil - } - cPos += n - r.cRange = i + 1 - } - - if r.IsMultiRange() { - multiRangeBodyEnd(&r.buf, r.boundary) - r.he = true - cLen = len(p[cPos:]) - n = copy(p[cPos:], r.buf) - if len(r.buf) > len(p[cPos:]) { - r.buf = r.buf[n:] - return cPos + n, nil - } - cPos += n - r.buf = r.buf[:0] - } - - return cPos, io.EOF - } - - if r.cRange >= len(r.sta) && len(r.buf) == 0 { + if r.cRange >= len(r.sta) && len(r.buf) == 0 && r.he { return 0, io.EOF } @@ -1439,28 +1383,39 @@ func (r *smallRangeReader) Read(p []byte) (int, error) { } cLen = len(p[cPos:]) - if r.end[i]-r.sta[i] > cLen { - n, err = ff.f.ReadAt(p[cPos:], int64(r.sta[i])) - r.sta[i] += n - return cPos + n, err - } - // todo use pool? - cBody := make([]byte, r.end[i]-r.sta[i]) - n, err = ff.f.ReadAt(cBody, int64(r.sta[i])) - if err != nil && err != io.EOF { - return cPos + n, err + // handle file + if ff.f != nil { + if r.end[i]-r.sta[i] > cLen { + n, err = ff.f.ReadAt(p[cPos:], int64(r.sta[i])) + r.sta[i] += n + return cPos + n, err + } + + // todo use pool? + cBody := make([]byte, r.end[i]-r.sta[i]) + n, err = ff.f.ReadAt(cBody, int64(r.sta[i])) + if err != nil && err != io.EOF { + return cPos + n, err + } + n = copy(p[cPos:], cBody) + } else { + // handle dir + n = copy(p[cPos:], ff.dirIndex[r.sta[i]:r.end[i]]) + if r.end[i]-r.sta[i] > cLen { + r.sta[i] += n + return cPos + n, nil + } } - n = copy(p[cPos:], cBody) + cPos += n r.cRange = i + 1 r.hasCurRangeBodyHeader = false } - if r.IsMultiRange() { + if r.IsMultiRange() && !r.he { multiRangeBodyEnd(&r.buf, r.boundary) r.he = true - cLen = len(p[cPos:]) n = copy(p[cPos:], r.buf) if len(r.buf) > len(p[cPos:]) { r.buf = r.buf[n:] @@ -1698,10 +1653,9 @@ func (r *bigRangeReader) Read(p []byte) (int, error) { r.hasCurRangeBodyHeader = false } - if r.IsMultiRange() { + if r.IsMultiRange() && !r.he { multiRangeBodyEnd(&r.buf, r.boundary) r.he = true - cLen = len(p[cPos:]) n = copy(p[cPos:], r.buf) if len(r.buf) > len(p[cPos:]) { r.buf = r.buf[n:] @@ -1804,17 +1758,17 @@ func multiRangeLength(sta, end []int, cl int, ct, bd string) int { func multiRangeBodyHeader(b *[]byte, sta, end, size int, ct, boundary string, hf bool) { if !hf { - *b = append(*b, bytesconv.S2b(fmt.Sprintf("\r\n"))...) + *b = append(*b, bytesconv.S2b("\r\n")...) } *b = append(*b, bytesconv.S2b(fmt.Sprintf("--%s\r\n", boundary))...) *b = append(*b, bytesconv.S2b(fmt.Sprintf("%s: %s\r\n", consts.HeaderContentRange, fmt.Sprintf("bytes %d-%d/%d", sta, end-1, size)))...) - *b = append(*b, bytesconv.S2b(fmt.Sprintf("%s: %s\n", consts.HeaderContentType, ct))...) - *b = append(*b, bytesconv.S2b(fmt.Sprintf("\r\n"))...) + *b = append(*b, bytesconv.S2b(fmt.Sprintf("%s: %s\r\n", consts.HeaderContentType, ct))...) + *b = append(*b, bytesconv.S2b("\r\n")...) } func multiRangeBodyEnd(b *[]byte, boundary string) { - *b = append(*b, bytesconv.S2b(fmt.Sprintf("\r\n--%s--", boundary))...) + *b = append(*b, bytesconv.S2b(fmt.Sprintf("\r\n--%s--\r\n", boundary))...) } func randomBoundary() string { @@ -1823,5 +1777,6 @@ func randomBoundary() string { if err != nil { panic(err) } + // todo use bytesconv.b2S ? return fmt.Sprintf("%x", buf[:]) } diff --git a/pkg/app/fs_test.go b/pkg/app/fs_test.go index a1ace6c43..628ed8281 100644 --- a/pkg/app/fs_test.go +++ b/pkg/app/fs_test.go @@ -45,7 +45,6 @@ import ( "bytes" "context" "fmt" - "github.com/cloudwego/hertz/internal/bytesconv" "io" "io/ioutil" "math/rand" @@ -55,6 +54,7 @@ import ( "testing" "time" + "github.com/cloudwego/hertz/internal/bytesconv" "github.com/cloudwego/hertz/pkg/common/test/mock" "github.com/cloudwego/hertz/pkg/protocol" "github.com/cloudwego/hertz/pkg/protocol/consts" @@ -414,7 +414,6 @@ func testFSSingleByteRangeOfWriteTo(t *testing.T, h HandlerFunc, filePath string body1, bytesconv.B2s(expectedBody[start:end+1]), filePath, startPos, endPos) } } - } func testFSSingleByteRangeOfRead(t *testing.T, h HandlerFunc, filePath string) { @@ -495,7 +494,7 @@ func TestFSMultiByteRangeConcurrent(t *testing.T) { for i := 0; i < concurrency; i++ { select { - case <-time.After(5 * time.Second): + case <-time.After(time.Second): t.Fatalf("timeout") case <-ch: } @@ -631,7 +630,6 @@ func testFSMultiByteRangeOfWriteTo(t *testing.T, h HandlerFunc, filePath string) if body != string(singleBodys) { t.Fatalf("multipart ranges content is invalid") } - } func testFSMultiByteRangeOfRead(t *testing.T, h HandlerFunc, filePath string) { @@ -675,7 +673,7 @@ func testFSMultiByteRangeOfRead(t *testing.T, h HandlerFunc, filePath string) { } ct := r.Header.Peek(consts.HeaderContentType) - expectedCT := fmt.Sprintf("multipart/byteranges; boundary=") + expectedCT := "multipart/byteranges; boundary=" if !strings.HasPrefix(string(ct), expectedCT) { t.Fatalf("unexpected content-type %q. Expecting prefix %q. filePath=%q", ct, expectedCT, filePath) } From 62e58e0925b03e8a20fa50e7e4ae4d60f3d562b2 Mon Sep 17 00:00:00 2001 From: byene0923 Date: Tue, 2 Aug 2022 00:41:06 +0800 Subject: [PATCH 5/6] fix: fix bug of smallFileReader --- pkg/app/fs.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/app/fs.go b/pkg/app/fs.go index dd9e1f40e..417e47cc0 100644 --- a/pkg/app/fs.go +++ b/pkg/app/fs.go @@ -198,7 +198,9 @@ func (r *fsSmallFileReader) Read(p []byte) (int, error) { r.offset += int64(n) return n, err } - + if r.offset == int64(len(ff.dirIndex)) { + return 0, io.EOF + } n := copy(p, ff.dirIndex[r.offset:]) r.offset += int64(n) return n, nil From b595bb3091ace03d70996760253c0546a0128196 Mon Sep 17 00:00:00 2001 From: byene0923 Date: Sat, 12 Nov 2022 01:49:50 +0800 Subject: [PATCH 6/6] fix: keep SetByteRange function --- pkg/app/fs_test.go | 17 ++++++++--------- pkg/protocol/header.go | 6 +++++- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/pkg/app/fs_test.go b/pkg/app/fs_test.go index 628ed8281..fda46ff80 100644 --- a/pkg/app/fs_test.go +++ b/pkg/app/fs_test.go @@ -368,12 +368,11 @@ func testFSSingleByteRangeOfWriteTo(t *testing.T, h HandlerFunc, filePath string endPos = append(endPos, end) ctx.Request.SetRequestURI(filePath) - ctx.Request.Header.SetByteRange(startPos, endPos) + ctx.Request.Header.SetByteRanges(startPos, endPos) h(context.Background(), &ctx) bodySize := end - start + 1 - // todo 代码优化 // test WriteTo(w io.Writer) if fileSize > consts.MaxSmallFileSize { reader, ok := ctx.Response.BodyStream().(*bigRangeReader) @@ -437,7 +436,7 @@ func testFSSingleByteRangeOfRead(t *testing.T, h HandlerFunc, filePath string) { endPos = append(endPos, end) ctx.Request.SetRequestURI(filePath) - ctx.Request.Header.SetByteRange(startPos, endPos) + ctx.Request.Header.SetByteRanges(startPos, endPos) h(context.Background(), &ctx) var r protocol.Response @@ -524,7 +523,7 @@ func testFSMultiByteRangeOfWriteTo(t *testing.T, h HandlerFunc, filePath string) t.Fatalf("cannot read file %q: %s", filePath, err) } - num := rand.Intn(20) + 2 + num := rand.Intn(5) + 2 fileSize := len(expectedBody) startPos, endPos := make([]int, 0), make([]int, 0) @@ -540,7 +539,7 @@ func testFSMultiByteRangeOfWriteTo(t *testing.T, h HandlerFunc, filePath string) } ctx.Request.SetRequestURI(filePath) - ctx.Request.Header.SetByteRange(startPos, endPos) + ctx.Request.Header.SetByteRanges(startPos, endPos) h(context.Background(), &ctx) var body string @@ -582,7 +581,7 @@ func testFSMultiByteRangeOfWriteTo(t *testing.T, h HandlerFunc, filePath string) req1 := &protocol.Request{} req1.CopyTo(&ctx1.Request) ctx1.Request.SetRequestURI(filePath) - ctx1.Request.Header.SetByteRange([]int{startPos[i]}, []int{endPos[i]}) + ctx1.Request.Header.SetByteRanges([]int{startPos[i]}, []int{endPos[i]}) h(context.Background(), &ctx1) var r1 protocol.Response @@ -642,7 +641,7 @@ func testFSMultiByteRangeOfRead(t *testing.T, h HandlerFunc, filePath string) { t.Fatalf("cannot read file %q: %s", filePath, err) } - num := rand.Intn(50) + 2 + num := rand.Intn(5) + 2 fileSize := len(expectedBody) startPos, endPos := make([]int, 0), make([]int, 0) @@ -658,7 +657,7 @@ func testFSMultiByteRangeOfRead(t *testing.T, h HandlerFunc, filePath string) { } ctx.Request.SetRequestURI(filePath) - ctx.Request.Header.SetByteRange(startPos, endPos) + ctx.Request.Header.SetByteRanges(startPos, endPos) h(context.Background(), &ctx) var r protocol.Response @@ -695,7 +694,7 @@ func testFSMultiByteRangeOfRead(t *testing.T, h HandlerFunc, filePath string) { req1 := &protocol.Request{} req1.CopyTo(&ctx1.Request) ctx1.Request.SetRequestURI(filePath) - ctx1.Request.Header.SetByteRange([]int{startPos[i]}, []int{endPos[i]}) + ctx1.Request.Header.SetByteRanges([]int{startPos[i]}, []int{endPos[i]}) h(context.Background(), &ctx1) var r1 protocol.Response diff --git a/pkg/protocol/header.go b/pkg/protocol/header.go index dbf9298a9..31c4351c6 100644 --- a/pkg/protocol/header.go +++ b/pkg/protocol/header.go @@ -875,11 +875,15 @@ func (h *RequestHeader) Reset() { h.ResetSkipNormalize() } +func (h *RequestHeader) SetByteRange(startPos, endPos int) { + h.SetByteRanges([]int{startPos}, []int{endPos}) +} + // SetByteRange sets 'Range: bytes=startPos-endPos' header. // // * If startPos is negative, then 'bytes=-startPos' value is set. // * If endPos is negative, then 'bytes=startPos-' value is set. -func (h *RequestHeader) SetByteRange(startPos, endPos []int) { +func (h *RequestHeader) SetByteRanges(startPos, endPos []int) { b := h.bufKV.value[:0] b = append(b, bytestr.StrBytes...) b = append(b, '=')