Skip to content
This repository has been archived by the owner on Apr 18, 2024. It is now read-only.

Saturn L2 V0 #22

Merged
merged 13 commits into from Jul 21, 2022
54 changes: 17 additions & 37 deletions .circleci/config.yml
@@ -1,42 +1,22 @@
# Use the latest 2.1 version of CircleCI pipeline process engine.
# See: https://circleci.com/docs/2.0/configuration-reference
version: 2.1

# Define a job to be invoked later in a workflow.
# See: https://circleci.com/docs/2.0/configuration-reference/#jobs
orbs:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you hook this up to the standard PL git management setup rather than hand-maintaining this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@willscott What is this "standard PL git management setup" that thou refer to ?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

go: circleci/go@1.7.0
workflows:
main:
jobs:
- build
jobs:
build:
working_directory: ~/repo
# Specify the execution environment. You can specify an image from Dockerhub or use one of our Convenience Images from CircleCI's Developer Hub.
# See: https://circleci.com/docs/2.0/configuration-reference/#docker-machine-macos-windows-executor
docker:
- image: circleci/golang:1.15.8
# Add steps to the job
# See: https://circleci.com/docs/2.0/configuration-reference/#steps
executor:
name: go/default
tag: '1.16'
steps:
- checkout
- restore_cache:
keys:
- go-mod-v4-{{ checksum "go.sum" }}
- run:
name: Install Dependencies
command: go mod download
- save_cache:
key: go-mod-v4-{{ checksum "go.sum" }}
paths:
- "/go/pkg/mod"
- run:
name: Run tests
command: |
mkdir -p /tmp/test-reports
gotestsum --junitfile /tmp/test-reports/unit-tests.xml
- store_test_results:
path: /tmp/test-reports

# Invoke jobs via workflows
# See: https://circleci.com/docs/2.0/configuration-reference/#workflows
workflows:
sample: # This is the name of the workflow, feel free to change it to better match your workflow.
# Inside the workflow, you define the jobs you want to run.
jobs:
- build
- go/load-cache
- go/mod-download
- go/save-cache
- go/test:
covermode: atomic
failfast: true
race: true
verbose: true
149 changes: 149 additions & 0 deletions carserver/server.go
@@ -0,0 +1,149 @@
package carserver

import (
"context"
"encoding/json"
"fmt"
"net"
"net/http"

"github.com/filecoin-project/saturn-l2/carstore"

"github.com/pkg/errors"

"github.com/google/uuid"
"github.com/libp2p/go-libp2p-core/peer"

gostream "github.com/libp2p/go-libp2p-gostream"

logging "github.com/ipfs/go-log/v2"

bstore "github.com/ipfs/go-ipfs-blockstore"

"github.com/libp2p/go-libp2p-core/host"

cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/storage/bsadapter"

car "github.com/ipld/go-car/v2"
)

// CARTransferProtocol is the protocol on which the CAR file will be streamed
const CARTransferProtocol = "/saturn/l2/car/1.0"

var log = logging.Logger("libp2p-http-server")

var (
maxRequestSize = int64(1048576) // 1 MiB - max size of the CAR transfer request
)

// Libp2pHttpCARServer serves CAR files for a given root and selector over the libp2p-http CARTransferProtocol.
type Libp2pHttpCARServer struct {
ctx context.Context
cancel context.CancelFunc

h host.Host
server *http.Server
netListener net.Listener

cs *carstore.PersistentCache
}

func New(h host.Host, cs *carstore.PersistentCache) *Libp2pHttpCARServer {
return &Libp2pHttpCARServer{
h: h,
cs: cs,
}
}

func (l *Libp2pHttpCARServer) Start(ctx context.Context) error {
l.ctx, l.cancel = context.WithCancel(ctx)

// Listen on HTTP over libp2p
listener, err := gostream.Listen(l.h, CARTransferProtocol)
if err != nil {
return fmt.Errorf("starting gostream listener: %w", err)
}

l.netListener = listener
handler := http.NewServeMux()
handler.HandleFunc("/", l.serveCARFile)
l.server = &http.Server{
Handler: handler,
// This context will be the parent of the context associated with all
// incoming requests
BaseContext: func(listener net.Listener) context.Context {
return l.ctx
},
}
go l.server.Serve(listener) //nolint:errcheck

return nil
}

func (l *Libp2pHttpCARServer) Stop() error {
l.cancel()

lerr := l.netListener.Close()
serr := l.server.Close()

if lerr != nil {
return lerr
}
if serr != nil {
return serr
}
return nil
}

func (l *Libp2pHttpCARServer) serveCARFile(w http.ResponseWriter, r *http.Request) {
// decode the remote peer ID and protect the libp2p connection for the lifetime of the transfer
pid, err := peer.Decode(r.RemoteAddr)
if err != nil {
log.Infow("car transfer request failed: parsing remote address as peer ID",
"remote-addr", r.RemoteAddr, "err", err)
http.Error(w, "Failed to parse remote address '"+r.RemoteAddr+"' as peer ID", http.StatusBadRequest)
return
}
tag := uuid.New().String()
l.h.ConnManager().Protect(pid, tag)
defer l.h.ConnManager().Unprotect(pid, tag)

// read the json car transfer request
var req CARTransferRequest
r.Body = http.MaxBytesReader(w, r.Body, maxRequestSize)
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, fmt.Sprintf("failed to parse request: %s", err), http.StatusBadRequest)
return
}
dr, err := carRequestToDAGRequest(&req)
if err != nil {
http.Error(w, fmt.Sprintf("failed to parse request: %s", err), http.StatusBadRequest)
return
}

log.Infow("car transfer request", "req", req)

if err := l.cs.FetchAndWriteCAR(dr.root, func(ro bstore.Blockstore) error {
ls := cidlink.DefaultLinkSystem()
bsa := bsadapter.Adapter{Wrapped: ro}
ls.SetReadStorage(&bsa)

_, err = car.TraverseV1(l.ctx, &ls, dr.root, dr.selector, w, car.WithSkipOffset(dr.skip))
if err != nil {
return fmt.Errorf("car traversal failed: %w", err)
}
return nil
}); err != nil {
w.WriteHeader(http.StatusInternalServerError)
if errors.Is(err, carstore.ErrNotFound) {
log.Debugw("car not found", "req", req)
} else {
log.Errorw("car transfer failed", "req", req, "err", err)
}
return
}
log.Debugw("car transfer successful", "req", req)

// TODO record sent bytes and talk to log injestor
}
139 changes: 139 additions & 0 deletions carserver/server_test.go
@@ -0,0 +1,139 @@
package carserver

import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"testing"
"time"

cid "github.com/ipfs/go-cid"

"github.com/libp2p/go-libp2p-core/host"

"github.com/filecoin-project/saturn-l2/testutils"

p2phttp "github.com/libp2p/go-libp2p-http"

"github.com/google/uuid"

"github.com/filecoin-project/saturn-l2/carstore"

"github.com/ipld/go-ipld-prime/codec/dagcbor"

selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse"

"github.com/libp2p/go-libp2p-core/peer"

mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"

"github.com/stretchr/testify/require"
)

func TestSimpleTransfer(t *testing.T) {
path := "../testdata/files/sample-v1.car"
temp := t.TempDir()
ctx := context.Background()

// create the getway api with a test http server
root, contents, svc := testutils.GetTestServerFor(t, path)
defer svc.Close()
gwAPI := carstore.NewGatewayAPI(svc.URL)
cs, err := carstore.New(temp, gwAPI)
require.NoError(t, err)
require.NoError(t, cs.Start(ctx))

// create a mock libp2p network, two peers and a connection between them
p1, p2 := buildPeers(t, ctx)

// create and start the car server
carserver := New(p2, cs)
require.NoError(t, carserver.Start(ctx))

// send the request
client := libp2pHTTPClient(p1)
reqBz := mkRequest(t, root, 0)
u := fmt.Sprintf("libp2p://%s", p2.ID())
resp := sendHttpReq(t, client, u, reqBz)
require.EqualValues(t, http.StatusInternalServerError, resp.StatusCode)

// second fetch should work
resp = sendHttpReq(t, client, u, reqBz)
require.EqualValues(t, http.StatusInternalServerError, resp.StatusCode)

// third fetch should work
require.Eventually(t, func() bool {
b, err := cs.Has(root)
return b && err == nil
}, 10*time.Second, 200*time.Millisecond)

resp = sendHttpReq(t, client, u, reqBz)
require.EqualValues(t, http.StatusOK, resp.StatusCode)

bz := readHTTPResponse(t, resp)
// ensure contents match
require.EqualValues(t, contents, bz)

// send request with the skip param
reqBz = mkRequest(t, root, 101)
resp = sendHttpReq(t, client, u, reqBz)
require.EqualValues(t, http.StatusOK, resp.StatusCode)

bz = readHTTPResponse(t, resp)
require.EqualValues(t, contents[101:], bz)
}

func readHTTPResponse(t *testing.T, resp *http.Response) []byte {
bz, err := ioutil.ReadAll(resp.Body)
require.NoError(t, err)
require.NotEmpty(t, resp)
require.NoError(t, resp.Body.Close())
return bz
}

func mkRequest(t *testing.T, root cid.Cid, offset uint64) []byte {
bf := bytes.Buffer{}
require.NoError(t, dagcbor.Encode(selectorparse.CommonSelector_ExploreAllRecursively, &bf))
req := CARTransferRequest{
Root: base64.StdEncoding.EncodeToString(root.Bytes()),
Selector: base64.StdEncoding.EncodeToString(bf.Bytes()),
UUID: uuid.New().String(),
SkipOffset: offset,
}
reqBz, err := json.Marshal(req)
require.NoError(t, err)
return reqBz
}

func buildPeers(t *testing.T, ctx context.Context) (client host.Host, server host.Host) {
mn := mocknet.New()
p1, err := mn.GenPeer()
require.NoError(t, err)
p2, err := mn.GenPeer()
require.NoError(t, err)
require.NoError(t, mn.LinkAll())
p1.Peerstore().AddAddrs(p2.ID(), p2.Addrs(), 1*time.Hour)
require.NoError(t, p1.Connect(ctx, peer.AddrInfo{ID: p2.ID()}))

return p1, p2
}

func libp2pHTTPClient(host host.Host) *http.Client {
tr := &http.Transport{}
p2ptr := p2phttp.NewTransport(host, p2phttp.ProtocolOption(CARTransferProtocol))
tr.RegisterProtocol("libp2p", p2ptr)
return &http.Client{Transport: tr}
}

func sendHttpReq(t *testing.T, client *http.Client, url string, body []byte) *http.Response {
hreq, err := http.NewRequest("GET", url, bytes.NewReader(body))
require.NoError(t, err)
resp, err := client.Do(hreq)
require.NoError(t, err)
require.NotEmpty(t, resp)
return resp
}
16 changes: 10 additions & 6 deletions libp2pcarserver/types.go → carserver/types.go
@@ -1,26 +1,29 @@
package libp2pcarserver
package carserver

import (
"bytes"
"encoding/base64"
"fmt"

"github.com/ipfs/go-cid"
"github.com/ipld/go-ipld-prime"
cid "github.com/ipfs/go-cid"
ipld "github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/codec/dagcbor"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
)

// CARTransferRequest is the request sent by the client to transfer a CAR file
// for the given root and selector
// for the given root and selector.
type CARTransferRequest struct {
Root string // base64 encoded byte array
Selector string // base 64 encoded byte array
UUID string
Root string // base64 encoded byte array
Selector string // base 64 encoded byte array
SkipOffset uint64
}

type dagTraversalRequest struct {
root cid.Cid
selector ipld.Node
skip uint64
}

func carRequestToDAGRequest(req *CARTransferRequest) (*dagTraversalRequest, error) {
Expand All @@ -45,6 +48,7 @@ func carRequestToDAGRequest(req *CARTransferRequest) (*dagTraversalRequest, erro
return &dagTraversalRequest{
root: rootcid,
selector: sel,
skip: req.SkipOffset,
}, nil
}

Expand Down