Skip to content
This repository has been archived by the owner on Apr 18, 2024. It is now read-only.

Saturn L2 V0 #22

Merged
merged 13 commits into from Jul 21, 2022
54 changes: 17 additions & 37 deletions .circleci/config.yml
@@ -1,42 +1,22 @@
# Use the latest 2.1 version of CircleCI pipeline process engine.
# See: https://circleci.com/docs/2.0/configuration-reference
version: 2.1

# Define a job to be invoked later in a workflow.
# See: https://circleci.com/docs/2.0/configuration-reference/#jobs
orbs:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you hook this up to the standard PL git management setup rather than hand-maintaining this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@willscott What is this "standard PL git management setup" that thou refer to ?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

go: circleci/go@1.7.0
workflows:
main:
jobs:
- build
jobs:
build:
working_directory: ~/repo
# Specify the execution environment. You can specify an image from Dockerhub or use one of our Convenience Images from CircleCI's Developer Hub.
# See: https://circleci.com/docs/2.0/configuration-reference/#docker-machine-macos-windows-executor
docker:
- image: circleci/golang:1.15.8
# Add steps to the job
# See: https://circleci.com/docs/2.0/configuration-reference/#steps
executor:
name: go/default
tag: '1.16'
steps:
- checkout
- restore_cache:
keys:
- go-mod-v4-{{ checksum "go.sum" }}
- run:
name: Install Dependencies
command: go mod download
- save_cache:
key: go-mod-v4-{{ checksum "go.sum" }}
paths:
- "/go/pkg/mod"
- run:
name: Run tests
command: |
mkdir -p /tmp/test-reports
gotestsum --junitfile /tmp/test-reports/unit-tests.xml
- store_test_results:
path: /tmp/test-reports

# Invoke jobs via workflows
# See: https://circleci.com/docs/2.0/configuration-reference/#workflows
workflows:
sample: # This is the name of the workflow, feel free to change it to better match your workflow.
# Inside the workflow, you define the jobs you want to run.
jobs:
- build
- go/load-cache
- go/mod-download
- go/save-cache
- go/test:
covermode: atomic
failfast: true
race: true
verbose: true
156 changes: 156 additions & 0 deletions carserver/server.go
@@ -0,0 +1,156 @@
package carserver

import (
"context"
"encoding/json"
"fmt"
"net"
"net/http"

"github.com/filecoin-project/saturn-l2/logs"

"github.com/filecoin-project/saturn-l2/carstore"

"github.com/pkg/errors"

"github.com/google/uuid"
"github.com/libp2p/go-libp2p-core/peer"

gostream "github.com/libp2p/go-libp2p-gostream"

logging "github.com/ipfs/go-log/v2"

bstore "github.com/ipfs/go-ipfs-blockstore"

"github.com/libp2p/go-libp2p-core/host"

cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/storage/bsadapter"

car "github.com/ipld/go-car/v2"
)

// CARTransferProtocol is the protocol on which the CAR file will be streamed
const CARTransferProtocol = "/saturn/l2/car/1.0"

var log = logging.Logger("libp2p-http-server")

var (
maxRequestSize = int64(1048576) // 1 MiB - max size of the CAR transfer request
)

// Libp2pHttpCARServer serves CAR files for a given root and selector over the libp2p-http CARTransferProtocol.
type Libp2pHttpCARServer struct {
ctx context.Context
cancel context.CancelFunc

h host.Host
server *http.Server
netListener net.Listener

cs *carstore.CarStore
logger *logs.SaturnLogger
}

func New(h host.Host, cs *carstore.CarStore, logger *logs.SaturnLogger) *Libp2pHttpCARServer {
return &Libp2pHttpCARServer{
h: h,
cs: cs,
logger: logger,
}
}

func (l *Libp2pHttpCARServer) Start(ctx context.Context) error {
l.ctx, l.cancel = context.WithCancel(ctx)

// Listen on HTTP over libp2p
listener, err := gostream.Listen(l.h, CARTransferProtocol)
if err != nil {
return fmt.Errorf("starting gostream listener: %w", err)
}

l.netListener = listener
handler := http.NewServeMux()
handler.HandleFunc("/", l.serveCARFile)
l.server = &http.Server{
Handler: handler,
// This context will be the parent of the context associated with all
// incoming requests
BaseContext: func(listener net.Listener) context.Context {
return l.ctx
},
}
go l.server.Serve(listener) //nolint:errcheck

return nil
}

func (l *Libp2pHttpCARServer) Stop() error {
l.cancel()

lerr := l.netListener.Close()
serr := l.server.Close()

if lerr != nil {
return lerr
}
if serr != nil {
return serr
}
return nil
}

func (l *Libp2pHttpCARServer) serveCARFile(w http.ResponseWriter, r *http.Request) {
// decode the remote peer ID and protect the libp2p connection for the lifetime of the transfer
pid, err := peer.Decode(r.RemoteAddr)
if err != nil {
log.Infow("car transfer request failed: parsing remote address as peer ID",
"remote-addr", r.RemoteAddr, "err", err)
http.Error(w, "Failed to parse remote address '"+r.RemoteAddr+"' as peer ID", http.StatusBadRequest)
return
}
tag := uuid.New().String()
l.h.ConnManager().Protect(pid, tag)
defer l.h.ConnManager().Unprotect(pid, tag)

// read the json car transfer request
var req CARTransferRequest
r.Body = http.MaxBytesReader(w, r.Body, maxRequestSize)
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, fmt.Sprintf("failed to parse request: %s", err), http.StatusBadRequest)
return
}
dr, err := carRequestToDAGRequest(&req)
if err != nil {
http.Error(w, fmt.Sprintf("failed to parse request: %s", err), http.StatusBadRequest)
return
}

// we have parsed the request successfully -> start logging and serving it
l.logger.Infow(dr.reqId, "got car transfer request")

if err := l.cs.FetchAndWriteCAR(dr.reqId, dr.root, func(ro bstore.Blockstore) error {
ls := cidlink.DefaultLinkSystem()
bsa := bsadapter.Adapter{Wrapped: ro}
ls.SetReadStorage(&bsa)

_, err = car.TraverseV1(l.ctx, &ls, dr.root, dr.selector, w, car.WithSkipOffset(dr.skip))
if err != nil {
l.logger.LogError(dr.reqId, "car transfer failed", err)
return fmt.Errorf("car traversal failed: %w", err)
}
return nil
}); err != nil {
if errors.Is(err, carstore.ErrNotFound) {
l.logger.Debugw(dr.reqId, "car not found")
w.WriteHeader(http.StatusNotFound)
} else {
l.logger.LogError(dr.reqId, "car transfer failed", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
}
return
}

l.logger.Infow(dr.reqId, "car transfer successful")
// TODO record sent bytes and talk to log injestor
}
147 changes: 147 additions & 0 deletions carserver/server_test.go
@@ -0,0 +1,147 @@
package carserver

import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"testing"
"time"

"github.com/filecoin-project/saturn-l2/logs"

cid "github.com/ipfs/go-cid"

"github.com/libp2p/go-libp2p-core/host"

"github.com/filecoin-project/saturn-l2/testutils"

p2phttp "github.com/libp2p/go-libp2p-http"

"github.com/google/uuid"

"github.com/filecoin-project/saturn-l2/carstore"

"github.com/ipld/go-ipld-prime/codec/dagcbor"

selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse"

"github.com/libp2p/go-libp2p-core/peer"

mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"

"github.com/stretchr/testify/require"
)

func TestSimpleTransfer(t *testing.T) {
path := "../testdata/files/sample-v1.car"
temp := t.TempDir()
ctx := context.Background()

// create the getway api with a test http server
root, contents, svc := testutils.GetTestServerFor(t, path)
defer svc.Close()
gwAPI := carstore.NewGatewayAPI(svc.URL)
lg := logs.NewSaturnLogger()
cfg := carstore.Config{}
cs, err := carstore.New(temp, gwAPI, cfg, lg)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When running inside Filecoin Station, we need Saturn L2 node to store all cache files in the directory provided by the Station. (Typically, this directory is OS-specific. See #36 and ipfs/ipfs-desktop#1656 for more details.)

I am proposing to introduce a new env var, e.g. CACHE_DIR or SATURN_CACHEDIR:

  • Filecoin Station sets CACHE_DIR env var, similarly to how it sets FIL_WALLET_ADDRESS
  • Saturn L2 Node uses this value as the root dir for all caches

To make it easier to run saturn-l2 on its own, we can introduce a sensible default when CACHE_DIR is not provided. I don't think t.TempDir() is a good one - IMHO, we should preserve cached data across node restarts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey I have introduced an ENV VAR called ROOT_DIR. This will be the root directory for the L2 Node where it will persist all it's state and cached data.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't added a sensible default yet but we can add it down the line if need be. Should be easy.

require.NoError(t, err)
require.NoError(t, cs.Start(ctx))

// create a mock libp2p network, two peers and a connection between them
p1, p2 := buildPeers(t, ctx)

// create and start the car server
carserver := New(p2, cs, lg)
require.NoError(t, carserver.Start(ctx))

// send the request
client := libp2pHTTPClient(p1)
reqBz := mkRequest(t, root, 0)
u := fmt.Sprintf("libp2p://%s", p2.ID())
resp := sendHttpReq(t, client, u, reqBz)

require.EqualValues(t, http.StatusNotFound, resp.StatusCode)

// second fetch should not work
resp = sendHttpReq(t, client, u, reqBz)
require.EqualValues(t, http.StatusNotFound, resp.StatusCode)

// wait till L2 has cached the data
require.Eventually(t, func() bool {
has, err := cs.IsIndexed(ctx, root)
return has && err == nil
}, 1*time.Second, 100*time.Millisecond)

// third fetch should work
resp = sendHttpReq(t, client, u, reqBz)
require.EqualValues(t, http.StatusOK, resp.StatusCode)

bz := readHTTPResponse(t, resp)
// ensure contents match
require.EqualValues(t, contents, bz)

// send request with the skip param
reqBz = mkRequest(t, root, 101)
resp = sendHttpReq(t, client, u, reqBz)
require.EqualValues(t, http.StatusOK, resp.StatusCode)

bz = readHTTPResponse(t, resp)
require.EqualValues(t, contents[101:], bz)
}

// TODO -> Test Parallel Transfers

func readHTTPResponse(t *testing.T, resp *http.Response) []byte {
bz, err := ioutil.ReadAll(resp.Body)
require.NoError(t, err)
require.NotEmpty(t, resp)
require.NoError(t, resp.Body.Close())
return bz
}

func mkRequest(t *testing.T, root cid.Cid, offset uint64) []byte {
bf := bytes.Buffer{}
require.NoError(t, dagcbor.Encode(selectorparse.CommonSelector_ExploreAllRecursively, &bf))
req := CARTransferRequest{
Root: base64.StdEncoding.EncodeToString(root.Bytes()),
Selector: base64.StdEncoding.EncodeToString(bf.Bytes()),
ReqId: uuid.New().String(),
SkipOffset: offset,
}
reqBz, err := json.Marshal(req)
require.NoError(t, err)
return reqBz
}

func buildPeers(t *testing.T, ctx context.Context) (client host.Host, server host.Host) {
mn := mocknet.New()
p1, err := mn.GenPeer()
require.NoError(t, err)
p2, err := mn.GenPeer()
require.NoError(t, err)
require.NoError(t, mn.LinkAll())
p1.Peerstore().AddAddrs(p2.ID(), p2.Addrs(), 1*time.Hour)
require.NoError(t, p1.Connect(ctx, peer.AddrInfo{ID: p2.ID()}))

return p1, p2
}

func libp2pHTTPClient(host host.Host) *http.Client {
tr := &http.Transport{}
p2ptr := p2phttp.NewTransport(host, p2phttp.ProtocolOption(CARTransferProtocol))
tr.RegisterProtocol("libp2p", p2ptr)
return &http.Client{Transport: tr}
}

func sendHttpReq(t *testing.T, client *http.Client, url string, body []byte) *http.Response {
hreq, err := http.NewRequest("GET", url, bytes.NewReader(body))
require.NoError(t, err)
resp, err := client.Do(hreq)
require.NoError(t, err)
require.NotEmpty(t, resp)
return resp
}