Skip to content
This repository has been archived by the owner on Apr 18, 2024. It is now read-only.

Commit

Permalink
Merge pull request #22 from filecoin-project/feat/v0-car-traversal
Browse files Browse the repository at this point in the history
Saturn L2 V0
  • Loading branch information
aarshkshah1992 committed Jul 21, 2022
2 parents 1e93c87 + 9fab846 commit d8f754e
Show file tree
Hide file tree
Showing 28 changed files with 2,291 additions and 592 deletions.
3 changes: 1 addition & 2 deletions .gitignore
Expand Up @@ -14,5 +14,4 @@
# Dependency directories (remove the comment below to include it)
# vendor/

dist/
resources/webui
dist/
111 changes: 111 additions & 0 deletions carserver/server.go
@@ -0,0 +1,111 @@
package carserver

import (
"encoding/json"
"fmt"
"io"
"net/http"

"github.com/filecoin-project/saturn-l2/types"

"github.com/filecoin-project/saturn-l2/station"

"github.com/filecoin-project/saturn-l2/logs"

"github.com/filecoin-project/saturn-l2/carstore"

"github.com/pkg/errors"

bstore "github.com/ipfs/go-ipfs-blockstore"

cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/storage/bsadapter"

car "github.com/ipld/go-car/v2"
)

var (
maxRequestSize = int64(1048576) // 1 MiB - max size of the CAR transfer request
)

// HTTPCARServer serves CAR files for a given root and selector over http.
type HTTPCARServer struct {
cs *carstore.CarStore
logger *logs.SaturnLogger
spai station.StationAPI
}

func New(cs *carstore.CarStore, logger *logs.SaturnLogger, sapi station.StationAPI) *HTTPCARServer {
return &HTTPCARServer{
cs: cs,
logger: logger,
spai: sapi,
}
}

func (l *HTTPCARServer) ServeCARFile(w http.ResponseWriter, r *http.Request) {
// read the json car transfer request
var req types.CARTransferRequest
r.Body = http.MaxBytesReader(w, r.Body, maxRequestSize)
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, fmt.Sprintf("failed to parse request: %s", err), http.StatusBadRequest)
return
}
dr, err := req.ToDAGRequest()
if err != nil {
http.Error(w, fmt.Sprintf("failed to parse request: %s", err), http.StatusBadRequest)
return
}

// we have parsed the request successfully -> start logging and serving it
l.logger.Infow(dr.ReqId, "got car transfer request")

sw := &statWriter{w: w}

if err := l.cs.FetchAndWriteCAR(dr.ReqId, dr.Root, func(ro bstore.Blockstore) error {
ls := cidlink.DefaultLinkSystem()
bsa := bsadapter.Adapter{Wrapped: ro}
ls.SetReadStorage(&bsa)

_, err = car.TraverseV1(r.Context(), &ls, dr.Root, dr.Selector, sw, car.WithSkipOffset(dr.Skip))
if err != nil {
if err := l.spai.RecordRetrievalServed(r.Context(), sw.n, 1); err != nil {
l.logger.LogError(dr.ReqId, "failed to record retrieval failure", err)
}

l.logger.LogError(dr.ReqId, "car transfer failed", err)
return fmt.Errorf("car traversal failed: %w", err)
}

if err := l.spai.RecordRetrievalServed(r.Context(), sw.n, 0); err != nil {
l.logger.LogError(dr.ReqId, "failed to record successful retrieval", err)
}
return nil
}); err != nil {
if err := l.spai.RecordRetrievalServed(r.Context(), sw.n, 1); err != nil {
l.logger.LogError(dr.ReqId, "failed to record retrieval failure", err)
}
l.logger.LogError(dr.ReqId, "failed to server car", err)

if errors.Is(err, carstore.ErrNotFound) {
http.Error(w, "car not found", http.StatusNotFound)
} else {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
return
}

l.logger.Infow(dr.ReqId, "car transfer successful")
// TODO: Talk to Log injestor here
}

type statWriter struct {
w io.Writer
n uint64
}

func (sw *statWriter) Write(p []byte) (n int, err error) {
n, err = sw.w.Write(p)
sw.n += uint64(n)
return
}
252 changes: 252 additions & 0 deletions carserver/server_test.go
@@ -0,0 +1,252 @@
package carserver

import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"errors"
"io/ioutil"
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/filecoin-project/saturn-l2/types"

"golang.org/x/sync/errgroup"

"github.com/filecoin-project/saturn-l2/station"

datastore "github.com/ipfs/go-datastore"
dss "github.com/ipfs/go-datastore/sync"

"github.com/filecoin-project/saturn-l2/logs"

cid "github.com/ipfs/go-cid"

"github.com/filecoin-project/saturn-l2/testutils"

"github.com/google/uuid"

"github.com/filecoin-project/saturn-l2/carstore"

"github.com/stretchr/testify/require"
)

func TestSimpleTransfer(t *testing.T) {
ctx := context.Background()
csh := buildHarness(t, ctx)
defer csh.Stop(t)

csh.assertStationStats(t, ctx, 0, 0, 0, 0, 0)

url := csh.carserver.URL
root := csh.root1
contents := csh.bz1

// send the request
reqBz := mkRequestWithoutSelector(t, root, 0)
resp := sendHttpReq(t, url, reqBz)

require.EqualValues(t, http.StatusNotFound, resp.StatusCode)

// second fetch should not work
resp = sendHttpReq(t, url, reqBz)
require.EqualValues(t, http.StatusNotFound, resp.StatusCode)

// wait till L2 has cached the data
require.Eventually(t, func() bool {
has, err := csh.store.IsIndexed(ctx, root)
return has && err == nil
}, 1*time.Second, 100*time.Millisecond)

// third fetch should work
resp = sendHttpReq(t, url, reqBz)
require.EqualValues(t, http.StatusOK, resp.StatusCode)

bz := readHTTPResponse(t, resp)
// ensure contents match
require.EqualValues(t, contents, bz)

csh.assertStationStats(t, ctx, len(contents), len(contents), 3, 2, len(contents))

// send request with the skip param
reqBz = mkRequestWithoutSelector(t, root, 101)
resp = sendHttpReq(t, url, reqBz)
require.EqualValues(t, http.StatusOK, resp.StatusCode)

bz = readHTTPResponse(t, resp)
require.EqualValues(t, contents[101:], bz)

csh.assertStationStats(t, ctx, len(contents)+len(contents)-101, len(contents), 4, 2, len(contents))
}

func TestParallelTransfers(t *testing.T) {
ctx := context.Background()
csh := buildHarness(t, ctx)
defer csh.Stop(t)

csh.assertStationStats(t, ctx, 0, 0, 0, 0, 0)

url := csh.carserver.URL
root1 := csh.root1
root2 := csh.root2
contents1 := csh.bz1
contents2 := csh.bz2

count := 0

// send the requests so both get cached
require.Eventually(t, func() bool {
count++
reqBz := mkRequestWithoutSelector(t, root1, 0)
resp := sendHttpReq(t, url, reqBz)
if resp.StatusCode == http.StatusOK {
bz := readHTTPResponse(t, resp)
return bytes.Equal(contents1, bz)
}
return false
}, 5*time.Second, 100*time.Millisecond)

require.Eventually(t, func() bool {
count++
reqBz := mkRequestWithoutSelector(t, root2, 0)
resp := sendHttpReq(t, url, reqBz)
if resp.StatusCode == http.StatusOK {
bz := readHTTPResponse(t, resp)
return bytes.Equal(contents2, bz)
}

return false
}, 5*time.Second, 100*time.Millisecond)

l := len(contents1) + len(contents2)
csh.assertStationStats(t, ctx, l, l, count, count-2, l)

var errg errgroup.Group

// fetch 10 in parallel
for i := 0; i < 10; i++ {
i := i
errg.Go(func() error {
var root cid.Cid

if i%2 == 0 {
root = root2
} else {
root = root1
}

reqBz := mkRequestWithoutSelector(t, root, 0)
resp := sendHttpReq(t, url, reqBz)
if resp.StatusCode != http.StatusOK {
return errors.New("failed")
}
return nil
})

}
require.NoError(t, errg.Wait())

time.Sleep(1 * time.Second)

csh.assertStationStats(t, ctx, 6*l, l, count+10, count-2, l)
}

type carServerHarness struct {
store *carstore.CarStore
gwapi *httptest.Server
carserver *httptest.Server
sapi station.StationAPI
root1 cid.Cid
bz1 []byte
root2 cid.Cid
bz2 []byte
}

func (csh *carServerHarness) assertStationStats(t *testing.T, ctx context.Context, upload, download, reqs, errors, storage int) {
as, err := csh.sapi.AllStats(ctx)
require.NoError(t, err)
require.EqualValues(t, upload, as.TotalBytesUploaded)
require.EqualValues(t, reqs, as.NContentRequests)
require.EqualValues(t, errors, as.NContentReqErrors)
require.EqualValues(t, download, as.TotalBytesDownloaded)
require.EqualValues(t, storage, as.StorageStats.BytesCurrentlyStored)
}

func (csh *carServerHarness) Stop(t *testing.T) {
require.NoError(t, csh.store.Close())
csh.gwapi.Close()
csh.carserver.Close()
}

func buildHarness(t *testing.T, ctx context.Context) *carServerHarness {
carFile1 := "../testdata/files/sample-v1.car"
rootcid1, bz1 := testutils.ParseCar(t, ctx, carFile1)
carFile2 := "../testdata/files/sample-rw-bs-v2.car"
rootcid2, bz2 := testutils.ParseCar(t, ctx, carFile2)
out := make(map[string][]byte)
out[rootcid1.String()] = bz1
out[rootcid2.String()] = bz2

temp := t.TempDir()

mds := dss.MutexWrap(datastore.NewMapDatastore())
sapi := NewStationAPIImpl(mds, nil)

// create the getway api with a test http server
svc := testutils.GetTestServerForRoots(t, out)
gwAPI := carstore.NewGatewayAPI(svc.URL, sapi)
lg := logs.NewSaturnLogger()
cfg := carstore.Config{MaxCARFilesDiskSpace: 100000000}
cs, err := carstore.New(temp, gwAPI, cfg, lg)
require.NoError(t, err)
sapi.SetStorageStatsFetcher(cs)
require.NoError(t, cs.Start(ctx))

// create and start the car server
carserver := New(cs, lg, sapi)
csvc := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
carserver.ServeCARFile(w, r)
}))

return &carServerHarness{
store: cs,
gwapi: svc,
carserver: csvc,
sapi: sapi,
root1: rootcid1,
root2: rootcid2,
bz1: bz1,
bz2: bz2,
}
}

func readHTTPResponse(t *testing.T, resp *http.Response) []byte {
bz, err := ioutil.ReadAll(resp.Body)
require.NoError(t, err)
require.NotEmpty(t, resp)
require.NoError(t, resp.Body.Close())
return bz
}

func mkRequestWithoutSelector(t *testing.T, root cid.Cid, offset uint64) []byte {
req := types.CARTransferRequest{
Root: base64.StdEncoding.EncodeToString(root.Bytes()),
ReqId: uuid.New().String(),
SkipOffset: offset,
}
reqBz, err := json.Marshal(req)
require.NoError(t, err)
return reqBz
}

func sendHttpReq(t *testing.T, url string, body []byte) *http.Response {
hreq, err := http.NewRequest("GET", url, bytes.NewReader(body))
require.NoError(t, err)
resp, err := http.DefaultClient.Do(hreq)
require.NoError(t, err)
require.NotEmpty(t, resp)
return resp
}

0 comments on commit d8f754e

Please sign in to comment.