Skip to content

Commit

Permalink
feat: support multi_range
Browse files Browse the repository at this point in the history
fix:fix bug

fix: remove useless code

fix: lint code
  • Loading branch information
byene0923 committed Aug 1, 2022
1 parent 330ffeb commit a30bed6
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 94 deletions.
133 changes: 44 additions & 89 deletions pkg/app/fs.go
Expand Up @@ -47,17 +47,6 @@ import (
"context"
"crypto/rand"
"fmt"
"github.com/cloudwego/hertz/internal/bytesconv"
"github.com/cloudwego/hertz/internal/bytestr"
"github.com/cloudwego/hertz/internal/nocopy"
"github.com/cloudwego/hertz/pkg/common/bytebufferpool"
"github.com/cloudwego/hertz/pkg/common/compress"
"github.com/cloudwego/hertz/pkg/common/errors"
"github.com/cloudwego/hertz/pkg/common/hlog"
"github.com/cloudwego/hertz/pkg/common/utils"
"github.com/cloudwego/hertz/pkg/network"
"github.com/cloudwego/hertz/pkg/protocol"
"github.com/cloudwego/hertz/pkg/protocol/consts"
"html"
"io"
"io/ioutil"
Expand All @@ -69,6 +58,18 @@ import (
"strings"
"sync"
"time"

"github.com/cloudwego/hertz/internal/bytesconv"
"github.com/cloudwego/hertz/internal/bytestr"
"github.com/cloudwego/hertz/internal/nocopy"
"github.com/cloudwego/hertz/pkg/common/bytebufferpool"
"github.com/cloudwego/hertz/pkg/common/compress"
"github.com/cloudwego/hertz/pkg/common/errors"
"github.com/cloudwego/hertz/pkg/common/hlog"
"github.com/cloudwego/hertz/pkg/common/utils"
"github.com/cloudwego/hertz/pkg/network"
"github.com/cloudwego/hertz/pkg/protocol"
"github.com/cloudwego/hertz/pkg/protocol/consts"
)

var (
Expand Down Expand Up @@ -784,6 +785,7 @@ func (ff *fsFile) smallRangeReader() io.Reader {
r.ff = ff
r.sta, r.end, r.buf = r.sta[:0], r.end[:0], r.buf[:0]
r.hf = true
r.he = false
r.hasCurRangeBodyHeader = false
r.cRange = 0
r.boundary = randomBoundary()
Expand Down Expand Up @@ -1342,69 +1344,11 @@ func (r *smallRangeReader) Close() error {
}

func (r *smallRangeReader) Read(p []byte) (int, error) {

ff := r.ff
var err error
cPos, cLen, n := 0, 0, 0
if ff.f == nil {

if r.IsMultiRange() {
cLen = len(p[cPos:])
n = copy(p[cPos:], r.buf)
if len(r.buf) > cLen {
r.buf = r.buf[n:]
return n, nil
}
cPos += n
r.buf = r.buf[:0]
}

for i := r.cRange; i < len(r.sta); i++ {
if r.sta[i] >= r.end[i] {
continue
}

if r.IsMultiRange() && !r.hasCurRangeBodyHeader {
r.hasCurRangeBodyHeader = true
multiRangeBodyHeader(&r.buf, r.sta[i], r.end[i], r.ff.contentLength, r.ff.contentType, r.boundary, r.hf)
r.hf = false
cLen = len(p[cPos:])
n = copy(p[cPos:], r.buf)
cPos += n
if len(r.buf) > cLen {
r.buf = r.buf[n:]
return cPos, nil
}
r.buf = r.buf[:0]
}

cLen = len(p[cPos:])
n = copy(p[cPos:], ff.dirIndex[r.sta[i]:r.end[i]])
if r.end[i]-r.sta[i] > cLen {
r.sta[i] += n
return cPos + n, nil
}
cPos += n
r.cRange = i + 1
}

if r.IsMultiRange() {
multiRangeBodyEnd(&r.buf, r.boundary)
r.he = true
cLen = len(p[cPos:])
n = copy(p[cPos:], r.buf)
if len(r.buf) > len(p[cPos:]) {
r.buf = r.buf[n:]
return cPos + n, nil
}
cPos += n
r.buf = r.buf[:0]
}

return cPos, io.EOF
}

if r.cRange >= len(r.sta) && len(r.buf) == 0 {
if r.cRange >= len(r.sta) && len(r.buf) == 0 && r.he {
return 0, io.EOF
}

Expand Down Expand Up @@ -1439,28 +1383,39 @@ func (r *smallRangeReader) Read(p []byte) (int, error) {
}

cLen = len(p[cPos:])
if r.end[i]-r.sta[i] > cLen {
n, err = ff.f.ReadAt(p[cPos:], int64(r.sta[i]))
r.sta[i] += n
return cPos + n, err
}

// todo use pool?
cBody := make([]byte, r.end[i]-r.sta[i])
n, err = ff.f.ReadAt(cBody, int64(r.sta[i]))
if err != nil && err != io.EOF {
return cPos + n, err
// handle file
if ff.f != nil {
if r.end[i]-r.sta[i] > cLen {
n, err = ff.f.ReadAt(p[cPos:], int64(r.sta[i]))
r.sta[i] += n
return cPos + n, err
}

// todo use pool?
cBody := make([]byte, r.end[i]-r.sta[i])
n, err = ff.f.ReadAt(cBody, int64(r.sta[i]))
if err != nil && err != io.EOF {
return cPos + n, err
}
n = copy(p[cPos:], cBody)
} else {
// handle dir
n = copy(p[cPos:], ff.dirIndex[r.sta[i]:r.end[i]])
if r.end[i]-r.sta[i] > cLen {
r.sta[i] += n
return cPos + n, nil
}
}
n = copy(p[cPos:], cBody)

cPos += n
r.cRange = i + 1
r.hasCurRangeBodyHeader = false
}

if r.IsMultiRange() {
if r.IsMultiRange() && !r.he {
multiRangeBodyEnd(&r.buf, r.boundary)
r.he = true
cLen = len(p[cPos:])
n = copy(p[cPos:], r.buf)
if len(r.buf) > len(p[cPos:]) {
r.buf = r.buf[n:]
Expand Down Expand Up @@ -1698,10 +1653,9 @@ func (r *bigRangeReader) Read(p []byte) (int, error) {
r.hasCurRangeBodyHeader = false
}

if r.IsMultiRange() {
if r.IsMultiRange() && !r.he {
multiRangeBodyEnd(&r.buf, r.boundary)
r.he = true
cLen = len(p[cPos:])
n = copy(p[cPos:], r.buf)
if len(r.buf) > len(p[cPos:]) {
r.buf = r.buf[n:]
Expand Down Expand Up @@ -1804,17 +1758,17 @@ func multiRangeLength(sta, end []int, cl int, ct, bd string) int {

func multiRangeBodyHeader(b *[]byte, sta, end, size int, ct, boundary string, hf bool) {
if !hf {
*b = append(*b, bytesconv.S2b(fmt.Sprintf("\r\n"))...)
*b = append(*b, bytesconv.S2b("\r\n")...)
}
*b = append(*b, bytesconv.S2b(fmt.Sprintf("--%s\r\n", boundary))...)
*b = append(*b, bytesconv.S2b(fmt.Sprintf("%s: %s\r\n", consts.HeaderContentRange,
fmt.Sprintf("bytes %d-%d/%d", sta, end-1, size)))...)
*b = append(*b, bytesconv.S2b(fmt.Sprintf("%s: %s\n", consts.HeaderContentType, ct))...)
*b = append(*b, bytesconv.S2b(fmt.Sprintf("\r\n"))...)
*b = append(*b, bytesconv.S2b(fmt.Sprintf("%s: %s\r\n", consts.HeaderContentType, ct))...)
*b = append(*b, bytesconv.S2b("\r\n")...)
}

func multiRangeBodyEnd(b *[]byte, boundary string) {
*b = append(*b, bytesconv.S2b(fmt.Sprintf("\r\n--%s--", boundary))...)
*b = append(*b, bytesconv.S2b(fmt.Sprintf("\r\n--%s--\r\n", boundary))...)
}

func randomBoundary() string {
Expand All @@ -1823,5 +1777,6 @@ func randomBoundary() string {
if err != nil {
panic(err)
}
// todo use bytesconv.b2S ?
return fmt.Sprintf("%x", buf[:])
}
8 changes: 3 additions & 5 deletions pkg/app/fs_test.go
Expand Up @@ -45,7 +45,6 @@ import (
"bytes"
"context"
"fmt"
"github.com/cloudwego/hertz/internal/bytesconv"
"io"
"io/ioutil"
"math/rand"
Expand All @@ -55,6 +54,7 @@ import (
"testing"
"time"

"github.com/cloudwego/hertz/internal/bytesconv"
"github.com/cloudwego/hertz/pkg/common/test/mock"
"github.com/cloudwego/hertz/pkg/protocol"
"github.com/cloudwego/hertz/pkg/protocol/consts"
Expand Down Expand Up @@ -414,7 +414,6 @@ func testFSSingleByteRangeOfWriteTo(t *testing.T, h HandlerFunc, filePath string
body1, bytesconv.B2s(expectedBody[start:end+1]), filePath, startPos, endPos)
}
}

}

func testFSSingleByteRangeOfRead(t *testing.T, h HandlerFunc, filePath string) {
Expand Down Expand Up @@ -495,7 +494,7 @@ func TestFSMultiByteRangeConcurrent(t *testing.T) {

for i := 0; i < concurrency; i++ {
select {
case <-time.After(5 * time.Second):
case <-time.After(time.Second):
t.Fatalf("timeout")
case <-ch:
}
Expand Down Expand Up @@ -631,7 +630,6 @@ func testFSMultiByteRangeOfWriteTo(t *testing.T, h HandlerFunc, filePath string)
if body != string(singleBodys) {
t.Fatalf("multipart ranges content is invalid")
}

}

func testFSMultiByteRangeOfRead(t *testing.T, h HandlerFunc, filePath string) {
Expand Down Expand Up @@ -675,7 +673,7 @@ func testFSMultiByteRangeOfRead(t *testing.T, h HandlerFunc, filePath string) {
}

ct := r.Header.Peek(consts.HeaderContentType)
expectedCT := fmt.Sprintf("multipart/byteranges; boundary=")
expectedCT := "multipart/byteranges; boundary="
if !strings.HasPrefix(string(ct), expectedCT) {
t.Fatalf("unexpected content-type %q. Expecting prefix %q. filePath=%q", ct, expectedCT, filePath)
}
Expand Down

0 comments on commit a30bed6

Please sign in to comment.