Skip to content
This repository has been archived by the owner on Apr 18, 2024. It is now read-only.

Saturn L2 V0 #22

Merged
merged 13 commits into from Jul 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 1 addition & 2 deletions .gitignore
Expand Up @@ -14,5 +14,4 @@
# Dependency directories (remove the comment below to include it)
# vendor/

dist/
resources/webui
dist/
111 changes: 111 additions & 0 deletions carserver/server.go
@@ -0,0 +1,111 @@
package carserver

import (
"encoding/json"
"fmt"
"io"
"net/http"

"github.com/filecoin-project/saturn-l2/types"

"github.com/filecoin-project/saturn-l2/station"

"github.com/filecoin-project/saturn-l2/logs"

"github.com/filecoin-project/saturn-l2/carstore"

"github.com/pkg/errors"

bstore "github.com/ipfs/go-ipfs-blockstore"

cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/storage/bsadapter"

car "github.com/ipld/go-car/v2"
)

var (
maxRequestSize = int64(1048576) // 1 MiB - max size of the CAR transfer request
)

// HTTPCARServer serves CAR files for a given root and selector over http.
type HTTPCARServer struct {
cs *carstore.CarStore
logger *logs.SaturnLogger
spai station.StationAPI
}

func New(cs *carstore.CarStore, logger *logs.SaturnLogger, sapi station.StationAPI) *HTTPCARServer {
return &HTTPCARServer{
cs: cs,
logger: logger,
spai: sapi,
}
}

func (l *HTTPCARServer) ServeCARFile(w http.ResponseWriter, r *http.Request) {
// read the json car transfer request
var req types.CARTransferRequest
r.Body = http.MaxBytesReader(w, r.Body, maxRequestSize)
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, fmt.Sprintf("failed to parse request: %s", err), http.StatusBadRequest)
return
}
dr, err := req.ToDAGRequest()
if err != nil {
http.Error(w, fmt.Sprintf("failed to parse request: %s", err), http.StatusBadRequest)
return
}

// we have parsed the request successfully -> start logging and serving it
l.logger.Infow(dr.ReqId, "got car transfer request")

sw := &statWriter{w: w}

if err := l.cs.FetchAndWriteCAR(dr.ReqId, dr.Root, func(ro bstore.Blockstore) error {
ls := cidlink.DefaultLinkSystem()
bsa := bsadapter.Adapter{Wrapped: ro}
ls.SetReadStorage(&bsa)

_, err = car.TraverseV1(r.Context(), &ls, dr.Root, dr.Selector, sw, car.WithSkipOffset(dr.Skip))
if err != nil {
if err := l.spai.RecordRetrievalServed(r.Context(), sw.n, 1); err != nil {
l.logger.LogError(dr.ReqId, "failed to record retrieval failure", err)
}

l.logger.LogError(dr.ReqId, "car transfer failed", err)
return fmt.Errorf("car traversal failed: %w", err)
}

if err := l.spai.RecordRetrievalServed(r.Context(), sw.n, 0); err != nil {
l.logger.LogError(dr.ReqId, "failed to record successful retrieval", err)
}
return nil
}); err != nil {
if err := l.spai.RecordRetrievalServed(r.Context(), sw.n, 1); err != nil {
l.logger.LogError(dr.ReqId, "failed to record retrieval failure", err)
}
l.logger.LogError(dr.ReqId, "failed to server car", err)

if errors.Is(err, carstore.ErrNotFound) {
http.Error(w, "car not found", http.StatusNotFound)
} else {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
return
}

l.logger.Infow(dr.ReqId, "car transfer successful")
// TODO: Talk to Log injestor here
}

type statWriter struct {
w io.Writer
n uint64
}

func (sw *statWriter) Write(p []byte) (n int, err error) {
n, err = sw.w.Write(p)
sw.n += uint64(n)
return
}
252 changes: 252 additions & 0 deletions carserver/server_test.go
@@ -0,0 +1,252 @@
package carserver

import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"errors"
"io/ioutil"
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/filecoin-project/saturn-l2/types"

"golang.org/x/sync/errgroup"

"github.com/filecoin-project/saturn-l2/station"

datastore "github.com/ipfs/go-datastore"
dss "github.com/ipfs/go-datastore/sync"

"github.com/filecoin-project/saturn-l2/logs"

cid "github.com/ipfs/go-cid"

"github.com/filecoin-project/saturn-l2/testutils"

"github.com/google/uuid"

"github.com/filecoin-project/saturn-l2/carstore"

"github.com/stretchr/testify/require"
)

func TestSimpleTransfer(t *testing.T) {
ctx := context.Background()
csh := buildHarness(t, ctx)
defer csh.Stop(t)

csh.assertStationStats(t, ctx, 0, 0, 0, 0, 0)

url := csh.carserver.URL
root := csh.root1
contents := csh.bz1

// send the request
reqBz := mkRequestWithoutSelector(t, root, 0)
resp := sendHttpReq(t, url, reqBz)

require.EqualValues(t, http.StatusNotFound, resp.StatusCode)

// second fetch should not work
resp = sendHttpReq(t, url, reqBz)
require.EqualValues(t, http.StatusNotFound, resp.StatusCode)

// wait till L2 has cached the data
require.Eventually(t, func() bool {
has, err := csh.store.IsIndexed(ctx, root)
return has && err == nil
}, 1*time.Second, 100*time.Millisecond)

// third fetch should work
resp = sendHttpReq(t, url, reqBz)
require.EqualValues(t, http.StatusOK, resp.StatusCode)

bz := readHTTPResponse(t, resp)
// ensure contents match
require.EqualValues(t, contents, bz)

csh.assertStationStats(t, ctx, len(contents), len(contents), 3, 2, len(contents))

// send request with the skip param
reqBz = mkRequestWithoutSelector(t, root, 101)
resp = sendHttpReq(t, url, reqBz)
require.EqualValues(t, http.StatusOK, resp.StatusCode)

bz = readHTTPResponse(t, resp)
require.EqualValues(t, contents[101:], bz)

csh.assertStationStats(t, ctx, len(contents)+len(contents)-101, len(contents), 4, 2, len(contents))
}

func TestParallelTransfers(t *testing.T) {
ctx := context.Background()
csh := buildHarness(t, ctx)
defer csh.Stop(t)

csh.assertStationStats(t, ctx, 0, 0, 0, 0, 0)

url := csh.carserver.URL
root1 := csh.root1
root2 := csh.root2
contents1 := csh.bz1
contents2 := csh.bz2

count := 0

// send the requests so both get cached
require.Eventually(t, func() bool {
count++
reqBz := mkRequestWithoutSelector(t, root1, 0)
resp := sendHttpReq(t, url, reqBz)
if resp.StatusCode == http.StatusOK {
bz := readHTTPResponse(t, resp)
return bytes.Equal(contents1, bz)
}
return false
}, 5*time.Second, 100*time.Millisecond)

require.Eventually(t, func() bool {
count++
reqBz := mkRequestWithoutSelector(t, root2, 0)
resp := sendHttpReq(t, url, reqBz)
if resp.StatusCode == http.StatusOK {
bz := readHTTPResponse(t, resp)
return bytes.Equal(contents2, bz)
}

return false
}, 5*time.Second, 100*time.Millisecond)

l := len(contents1) + len(contents2)
csh.assertStationStats(t, ctx, l, l, count, count-2, l)

var errg errgroup.Group

// fetch 10 in parallel
for i := 0; i < 10; i++ {
i := i
errg.Go(func() error {
var root cid.Cid

if i%2 == 0 {
root = root2
} else {
root = root1
}

reqBz := mkRequestWithoutSelector(t, root, 0)
resp := sendHttpReq(t, url, reqBz)
if resp.StatusCode != http.StatusOK {
return errors.New("failed")
}
return nil
})

}
require.NoError(t, errg.Wait())

time.Sleep(1 * time.Second)

csh.assertStationStats(t, ctx, 6*l, l, count+10, count-2, l)
}

type carServerHarness struct {
store *carstore.CarStore
gwapi *httptest.Server
carserver *httptest.Server
sapi station.StationAPI
root1 cid.Cid
bz1 []byte
root2 cid.Cid
bz2 []byte
}

func (csh *carServerHarness) assertStationStats(t *testing.T, ctx context.Context, upload, download, reqs, errors, storage int) {
as, err := csh.sapi.AllStats(ctx)
require.NoError(t, err)
require.EqualValues(t, upload, as.TotalBytesUploaded)
require.EqualValues(t, reqs, as.NContentRequests)
require.EqualValues(t, errors, as.NContentReqErrors)
require.EqualValues(t, download, as.TotalBytesDownloaded)
require.EqualValues(t, storage, as.StorageStats.BytesCurrentlyStored)
}

func (csh *carServerHarness) Stop(t *testing.T) {
require.NoError(t, csh.store.Close())
csh.gwapi.Close()
csh.carserver.Close()
}

func buildHarness(t *testing.T, ctx context.Context) *carServerHarness {
carFile1 := "../testdata/files/sample-v1.car"
rootcid1, bz1 := testutils.ParseCar(t, ctx, carFile1)
carFile2 := "../testdata/files/sample-rw-bs-v2.car"
rootcid2, bz2 := testutils.ParseCar(t, ctx, carFile2)
out := make(map[string][]byte)
out[rootcid1.String()] = bz1
out[rootcid2.String()] = bz2

temp := t.TempDir()

mds := dss.MutexWrap(datastore.NewMapDatastore())
sapi := NewStationAPIImpl(mds, nil)

// create the getway api with a test http server
svc := testutils.GetTestServerForRoots(t, out)
gwAPI := carstore.NewGatewayAPI(svc.URL, sapi)
lg := logs.NewSaturnLogger()
cfg := carstore.Config{MaxCARFilesDiskSpace: 100000000}
cs, err := carstore.New(temp, gwAPI, cfg, lg)
require.NoError(t, err)
sapi.SetStorageStatsFetcher(cs)
require.NoError(t, cs.Start(ctx))

// create and start the car server
carserver := New(cs, lg, sapi)
csvc := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
carserver.ServeCARFile(w, r)
}))

return &carServerHarness{
store: cs,
gwapi: svc,
carserver: csvc,
sapi: sapi,
root1: rootcid1,
root2: rootcid2,
bz1: bz1,
bz2: bz2,
}
}

func readHTTPResponse(t *testing.T, resp *http.Response) []byte {
bz, err := ioutil.ReadAll(resp.Body)
require.NoError(t, err)
require.NotEmpty(t, resp)
require.NoError(t, resp.Body.Close())
return bz
}

func mkRequestWithoutSelector(t *testing.T, root cid.Cid, offset uint64) []byte {
req := types.CARTransferRequest{
Root: base64.StdEncoding.EncodeToString(root.Bytes()),
ReqId: uuid.New().String(),
SkipOffset: offset,
}
reqBz, err := json.Marshal(req)
require.NoError(t, err)
return reqBz
}

func sendHttpReq(t *testing.T, url string, body []byte) *http.Response {
hreq, err := http.NewRequest("GET", url, bytes.NewReader(body))
require.NoError(t, err)
resp, err := http.DefaultClient.Do(hreq)
require.NoError(t, err)
require.NotEmpty(t, resp)
return resp
}