Skip to content

Commit

Permalink
fix: add closed check, expose storage.ErrClosed
Browse files Browse the repository at this point in the history
  • Loading branch information
rvagg committed Sep 4, 2023
1 parent e9e60af commit 7b8decb
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 11 deletions.
35 changes: 28 additions & 7 deletions v2/storage/deferred/deferredcarwriter.go
Expand Up @@ -29,6 +29,10 @@ var _ io.Closer = (*DeferredCarWriter)(nil)
// DeferredCarWriter is threadsafe, and can be used concurrently.
// Closing the writer will close, but not delete, the underlying file.
//
// DeferredCarWriter only implements the storage.WritableStorage interface and
// is not intended as a general purpose storage implementation. It only supports
// storage Put() and Get() operations.
//
// This utility is useful for cases where a CAR will be streamed but an error
// may occur before any content is written. In this case, the CAR file will not
// be created, and the output stream will not be written to. In the case of an
Expand All @@ -45,11 +49,12 @@ type DeferredCarWriter struct {
outPath string
outStream io.Writer

lk sync.Mutex
f *os.File
w carstorage.WritableCar
putCb []putCb
opts []carv2.Option
lk sync.Mutex
f *os.File
closed bool
w carstorage.WritableCar
putCb []putCb
opts []carv2.Option
}

// NewDeferredCarWriterForPath creates a DeferredCarWriter that will write to a
Expand Down Expand Up @@ -89,6 +94,10 @@ func (dcw *DeferredCarWriter) Has(ctx context.Context, key string) (bool, error)
dcw.lk.Lock()
defer dcw.lk.Unlock()

if dcw.closed {
return false, carstorage.ErrClosed
}

if dcw.w == nil { // shortcut, haven't written anything, don't even initialise
return false, nil
}
Expand All @@ -107,6 +116,10 @@ func (dcw *DeferredCarWriter) Put(ctx context.Context, key string, content []byt
dcw.lk.Lock()
defer dcw.lk.Unlock()

if dcw.closed {
return carstorage.ErrClosed
}

if dcw.putCb != nil {
// call all callbacks, remove those that were only needed once
for i := 0; i < len(dcw.putCb); i++ {
Expand Down Expand Up @@ -150,11 +163,18 @@ func (dcw *DeferredCarWriter) writer() (carstorage.WritableCar, error) {
}

// Close closes the underlying file, if one was created.
func (dcw *DeferredCarWriter) Close() error {
func (dcw *DeferredCarWriter) Close() (err error) {
dcw.lk.Lock()
defer dcw.lk.Unlock()

err := dcw.w.Finalize()
if dcw.closed {
return carstorage.ErrClosed
}
dcw.closed = true

if dcw.w != nil {
err = dcw.w.Finalize()
}

if dcw.f != nil {
defer func() { dcw.f = nil }()
Expand All @@ -163,6 +183,7 @@ func (dcw *DeferredCarWriter) Close() error {
err = err2
}
}

return err
}

Expand Down
33 changes: 33 additions & 0 deletions v2/storage/deferred/deferredcarwriter_test.go
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/ipfs/go-cid"
carv2 "github.com/ipld/go-car/v2"
"github.com/ipld/go-car/v2/storage"
deferred "github.com/ipld/go-car/v2/storage/deferred"
mh "github.com/multiformats/go-multihash"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -218,6 +219,38 @@ func TestDeferredCarWriterPutCb(t *testing.T) {
require.Equal(t, 1, pc3)
}

func TestDeferredCarWriterWriteAfterClose(t *testing.T) {
req := require.New(t)

ctx := context.Background()
testCid1, testData1 := randBlock()
testCid2, testData2 := randBlock()

var buf bytes.Buffer
cw := deferred.NewDeferredCarWriterForStream(&buf, []cid.Cid{testCid1})
// no writes
req.NoError(cw.Close())

req.ErrorIs(cw.Put(ctx, testCid1.KeyString(), testData1), storage.ErrClosed)
_, err := cw.Has(ctx, testCid1.KeyString())
req.ErrorIs(err, storage.ErrClosed)
req.ErrorIs(cw.Close(), storage.ErrClosed)

// with writes

buf = bytes.Buffer{}
cw = deferred.NewDeferredCarWriterForStream(&buf, []cid.Cid{testCid1})

req.NoError(cw.Put(ctx, testCid1.KeyString(), testData1))
req.NoError(cw.Put(ctx, testCid2.KeyString(), testData2))
req.NoError(cw.Close())

req.ErrorIs(cw.Put(ctx, testCid1.KeyString(), testData1), storage.ErrClosed)
_, err = cw.Has(ctx, testCid1.KeyString())
req.ErrorIs(err, storage.ErrClosed)
req.ErrorIs(cw.Close(), storage.ErrClosed)
}

func randBlock() (cid.Cid, []byte) {
data := make([]byte, 1024)
rngLk.Lock()
Expand Down
8 changes: 4 additions & 4 deletions v2/storage/storage.go
Expand Up @@ -18,7 +18,7 @@ import (
ipldstorage "github.com/ipld/go-ipld-prime/storage"
)

var errClosed = errors.New("cannot use a CARv2 storage after closing")
var ErrClosed = errors.New("cannot use a CAR storage after closing")

type ReaderAtWriterAt interface {
io.ReaderAt
Expand Down Expand Up @@ -314,7 +314,7 @@ func (sc *StorageCar) Put(ctx context.Context, keyStr string, data []byte) error
defer sc.mu.Unlock()

if sc.closed {
return errClosed
return ErrClosed
}

idx, ok := sc.idx.(*index.InsertionIndex)
Expand Down Expand Up @@ -361,7 +361,7 @@ func (sc *StorageCar) Has(ctx context.Context, keyStr string) (bool, error) {
defer sc.mu.RUnlock()

if sc.closed {
return false, errClosed
return false, ErrClosed
}

if idx, ok := sc.idx.(*index.InsertionIndex); ok && sc.writer != nil {
Expand Down Expand Up @@ -443,7 +443,7 @@ func (sc *StorageCar) GetStream(ctx context.Context, keyStr string) (io.ReadClos
defer sc.mu.RUnlock()

if sc.closed {
return nil, errClosed
return nil, ErrClosed
}

_, offset, size, err := store.FindCid(
Expand Down

0 comments on commit 7b8decb

Please sign in to comment.