diff --git a/pkg/puller/puller.go b/pkg/puller/puller.go index 8409e36a548..c383f5c1220 100644 --- a/pkg/puller/puller.go +++ b/pkg/puller/puller.go @@ -260,7 +260,7 @@ func (p *Puller) histSyncWorker(ctx context.Context, peer swarm.Address, bin uin p.logger.Debug("histSyncWorker syncing finished", "bin", bin, "cursor", cur) return } - top, _, err := p.syncer.SyncInterval(ctx, peer, bin, s, cur) + top, err := p.syncer.SyncInterval(ctx, peer, bin, s, cur) if err != nil { p.metrics.HistWorkerErrCounter.Inc() p.logger.Error(err, "histSyncWorker syncing interval failed", "peer_address", peer, "bin", bin, "cursor", cur, "start", s, "topmost", top) @@ -307,7 +307,7 @@ func (p *Puller) liveSyncWorker(ctx context.Context, peer swarm.Address, bin uin default: } - top, _, err := p.syncer.SyncInterval(ctx, peer, bin, from, pullsync.MaxCursor) + top, err := p.syncer.SyncInterval(ctx, peer, bin, from, pullsync.MaxCursor) if err != nil { p.metrics.LiveWorkerErrCounter.Inc() p.logger.Error(err, "liveSyncWorker sync error", "peer_address", peer, "bin", bin, "from", from, "topmost", top) diff --git a/pkg/pullsync/mock/pullsync.go b/pkg/pullsync/mock/pullsync.go index 2ddc56331a6..c246675a2f3 100644 --- a/pkg/pullsync/mock/pullsync.go +++ b/pkg/pullsync/mock/pullsync.go @@ -113,7 +113,7 @@ func NewPullSync(opts ...Option) *PullSyncMock { return s } -func (p *PullSyncMock) SyncInterval(ctx context.Context, peer swarm.Address, bin uint8, from, to uint64) (topmost uint64, ruid uint32, err error) { +func (p *PullSyncMock) SyncInterval(ctx context.Context, peer swarm.Address, bin uint8, from, to uint64) (topmost uint64, err error) { isLive := to == math.MaxUint64 @@ -129,7 +129,7 @@ func (p *PullSyncMock) SyncInterval(ctx context.Context, peer swarm.Address, bin p.mtx.Unlock() if p.syncErr != nil { - return 0, 0, p.syncErr + return 0, p.syncErr } if isLive && p.lateReply { @@ -141,9 +141,9 @@ func (p *PullSyncMock) SyncInterval(ctx context.Context, peer swarm.Address, bin select { case <-p.quit: - return 0, 1, context.Canceled + return 0, context.Canceled case <-ctx.Done(): - return 0, 1, ctx.Err() + return 0, ctx.Err() default: } @@ -162,12 +162,12 @@ func (p *PullSyncMock) SyncInterval(ctx context.Context, peer swarm.Address, bin if sr.block { select { case <-p.quit: - return 0, 1, context.Canceled + return 0, context.Canceled case <-ctx.Done(): - return 0, 1, ctx.Err() + return 0, ctx.Err() } } - return sr.topmost, 0, nil + return sr.topmost, nil } panic(fmt.Sprintf("bin %d from %d to %d", bin, from, to)) } @@ -175,7 +175,7 @@ func (p *PullSyncMock) SyncInterval(ctx context.Context, peer swarm.Address, bin if isLive && p.blockLiveSync { // don't respond, wait for quit <-p.quit - return 0, 1, context.Canceled + return 0, context.Canceled } if isLive && len(p.liveSyncReplies) > 0 { p.mtx.Lock() @@ -184,12 +184,12 @@ func (p *PullSyncMock) SyncInterval(ctx context.Context, peer swarm.Address, bin <-p.quit // when shutting down, onthe puller side we cancel the context going into the pullsync protocol request // this results in SyncInterval returning with a context cancelled error - return 0, 0, context.Canceled + return 0, context.Canceled } v := p.liveSyncReplies[p.liveSyncCalls] p.liveSyncCalls++ p.mtx.Unlock() - return v, 1, nil + return v, nil } if p.autoReply { @@ -198,9 +198,9 @@ func (p *PullSyncMock) SyncInterval(ctx context.Context, peer swarm.Address, bin if t > to { t = to } - return t, 1, nil + return t, nil } - return to, 1, nil + return to, nil } func (p *PullSyncMock) GetCursors(_ context.Context, peer swarm.Address) ([]uint64, error) { diff --git a/pkg/pullsync/pb/pullsync.pb.go b/pkg/pullsync/pb/pullsync.pb.go index 1e1f7bd1adf..2df60ba34c5 100644 --- a/pkg/pullsync/pb/pullsync.pb.go +++ b/pkg/pullsync/pb/pullsync.pb.go @@ -102,94 +102,6 @@ func (m *Ack) GetCursors() []uint64 { return nil } -type Ruid struct { - Ruid uint32 `protobuf:"varint,1,opt,name=Ruid,proto3" json:"Ruid,omitempty"` -} - -func (m *Ruid) Reset() { *m = Ruid{} } -func (m *Ruid) String() string { return proto.CompactTextString(m) } -func (*Ruid) ProtoMessage() {} -func (*Ruid) Descriptor() ([]byte, []int) { - return fileDescriptor_d1dee042cf9c065c, []int{2} -} -func (m *Ruid) XXX_Unmarshal(b []byte) error { - return m.Unmarshal(b) -} -func (m *Ruid) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - if deterministic { - return xxx_messageInfo_Ruid.Marshal(b, m, deterministic) - } else { - b = b[:cap(b)] - n, err := m.MarshalToSizedBuffer(b) - if err != nil { - return nil, err - } - return b[:n], nil - } -} -func (m *Ruid) XXX_Merge(src proto.Message) { - xxx_messageInfo_Ruid.Merge(m, src) -} -func (m *Ruid) XXX_Size() int { - return m.Size() -} -func (m *Ruid) XXX_DiscardUnknown() { - xxx_messageInfo_Ruid.DiscardUnknown(m) -} - -var xxx_messageInfo_Ruid proto.InternalMessageInfo - -func (m *Ruid) GetRuid() uint32 { - if m != nil { - return m.Ruid - } - return 0 -} - -type Cancel struct { - Ruid uint32 `protobuf:"varint,1,opt,name=Ruid,proto3" json:"Ruid,omitempty"` -} - -func (m *Cancel) Reset() { *m = Cancel{} } -func (m *Cancel) String() string { return proto.CompactTextString(m) } -func (*Cancel) ProtoMessage() {} -func (*Cancel) Descriptor() ([]byte, []int) { - return fileDescriptor_d1dee042cf9c065c, []int{3} -} -func (m *Cancel) XXX_Unmarshal(b []byte) error { - return m.Unmarshal(b) -} -func (m *Cancel) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - if deterministic { - return xxx_messageInfo_Cancel.Marshal(b, m, deterministic) - } else { - b = b[:cap(b)] - n, err := m.MarshalToSizedBuffer(b) - if err != nil { - return nil, err - } - return b[:n], nil - } -} -func (m *Cancel) XXX_Merge(src proto.Message) { - xxx_messageInfo_Cancel.Merge(m, src) -} -func (m *Cancel) XXX_Size() int { - return m.Size() -} -func (m *Cancel) XXX_DiscardUnknown() { - xxx_messageInfo_Cancel.DiscardUnknown(m) -} - -var xxx_messageInfo_Cancel proto.InternalMessageInfo - -func (m *Cancel) GetRuid() uint32 { - if m != nil { - return m.Ruid - } - return 0 -} - type GetRange struct { Bin int32 `protobuf:"varint,1,opt,name=Bin,proto3" json:"Bin,omitempty"` From uint64 `protobuf:"varint,2,opt,name=From,proto3" json:"From,omitempty"` @@ -200,7 +112,7 @@ func (m *GetRange) Reset() { *m = GetRange{} } func (m *GetRange) String() string { return proto.CompactTextString(m) } func (*GetRange) ProtoMessage() {} func (*GetRange) Descriptor() ([]byte, []int) { - return fileDescriptor_d1dee042cf9c065c, []int{4} + return fileDescriptor_d1dee042cf9c065c, []int{2} } func (m *GetRange) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -259,7 +171,7 @@ func (m *Offer) Reset() { *m = Offer{} } func (m *Offer) String() string { return proto.CompactTextString(m) } func (*Offer) ProtoMessage() {} func (*Offer) Descriptor() ([]byte, []int) { - return fileDescriptor_d1dee042cf9c065c, []int{5} + return fileDescriptor_d1dee042cf9c065c, []int{3} } func (m *Offer) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -310,7 +222,7 @@ func (m *Want) Reset() { *m = Want{} } func (m *Want) String() string { return proto.CompactTextString(m) } func (*Want) ProtoMessage() {} func (*Want) Descriptor() ([]byte, []int) { - return fileDescriptor_d1dee042cf9c065c, []int{6} + return fileDescriptor_d1dee042cf9c065c, []int{4} } func (m *Want) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -356,7 +268,7 @@ func (m *Delivery) Reset() { *m = Delivery{} } func (m *Delivery) String() string { return proto.CompactTextString(m) } func (*Delivery) ProtoMessage() {} func (*Delivery) Descriptor() ([]byte, []int) { - return fileDescriptor_d1dee042cf9c065c, []int{7} + return fileDescriptor_d1dee042cf9c065c, []int{5} } func (m *Delivery) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -409,8 +321,6 @@ func (m *Delivery) GetStamp() []byte { func init() { proto.RegisterType((*Syn)(nil), "pullsync.Syn") proto.RegisterType((*Ack)(nil), "pullsync.Ack") - proto.RegisterType((*Ruid)(nil), "pullsync.Ruid") - proto.RegisterType((*Cancel)(nil), "pullsync.Cancel") proto.RegisterType((*GetRange)(nil), "pullsync.GetRange") proto.RegisterType((*Offer)(nil), "pullsync.Offer") proto.RegisterType((*Want)(nil), "pullsync.Want") @@ -420,27 +330,25 @@ func init() { func init() { proto.RegisterFile("pullsync.proto", fileDescriptor_d1dee042cf9c065c) } var fileDescriptor_d1dee042cf9c065c = []byte{ - // 307 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x91, 0xcf, 0x4a, 0x03, 0x31, - 0x10, 0xc6, 0x9b, 0xfd, 0x53, 0xeb, 0x50, 0x8b, 0x04, 0x91, 0x45, 0x4a, 0x2c, 0xc1, 0x43, 0x4f, - 0x5e, 0x3c, 0x79, 0xb3, 0x7f, 0x50, 0x4f, 0x0a, 0x69, 0x51, 0xf0, 0x96, 0x6e, 0x53, 0x5d, 0xdc, - 0x26, 0x4b, 0x92, 0x15, 0xf6, 0x2d, 0x7c, 0x2c, 0x8f, 0x3d, 0x7a, 0x94, 0xdd, 0x17, 0x91, 0x4d, - 0x77, 0xf1, 0xe2, 0x29, 0xdf, 0x6f, 0x26, 0x33, 0xdf, 0x07, 0x03, 0x83, 0x2c, 0x4f, 0x53, 0x53, - 0xc8, 0xf8, 0x32, 0xd3, 0xca, 0x2a, 0xdc, 0x6b, 0x99, 0x86, 0xe0, 0x2f, 0x0a, 0x49, 0xcf, 0xc1, - 0x9f, 0xc4, 0xef, 0x38, 0x82, 0x83, 0x59, 0xae, 0x8d, 0xd2, 0x26, 0x42, 0x23, 0x7f, 0x1c, 0xb0, - 0x16, 0xe9, 0x19, 0x04, 0x2c, 0x4f, 0xd6, 0x18, 0xef, 0xdf, 0x08, 0x8d, 0xd0, 0xf8, 0x88, 0x39, - 0x4d, 0x87, 0xd0, 0x9d, 0x71, 0x19, 0x8b, 0xf4, 0xdf, 0xee, 0x0d, 0xf4, 0xee, 0x84, 0x65, 0x5c, - 0xbe, 0x0a, 0x7c, 0x0c, 0xfe, 0x34, 0x91, 0xae, 0x1d, 0xb2, 0x5a, 0xd6, 0x13, 0xb7, 0x5a, 0x6d, - 0x23, 0x6f, 0x84, 0xc6, 0x01, 0x73, 0x1a, 0x0f, 0xc0, 0x5b, 0xaa, 0xc8, 0x77, 0x15, 0x6f, 0xa9, - 0xe8, 0x35, 0x84, 0x8f, 0x9b, 0x8d, 0xd0, 0x75, 0xbc, 0xa5, 0xca, 0xb6, 0xca, 0x58, 0xb7, 0x22, - 0x60, 0x2d, 0xe2, 0x53, 0xe8, 0xde, 0x73, 0xf3, 0x26, 0x8c, 0x5b, 0xd4, 0x67, 0x0d, 0xd1, 0x0b, - 0x08, 0x9e, 0xb9, 0xb4, 0x78, 0x08, 0x87, 0xd3, 0xc4, 0x3e, 0x89, 0xd8, 0x2a, 0xed, 0x66, 0xfb, - 0xec, 0xaf, 0x40, 0x1f, 0xa0, 0x37, 0x17, 0x69, 0xf2, 0x21, 0x74, 0x51, 0x7b, 0x4c, 0xd6, 0x6b, - 0x2d, 0x8c, 0x69, 0xfe, 0xb5, 0x58, 0x47, 0x9d, 0x73, 0xcb, 0x1b, 0x07, 0xa7, 0xf1, 0x09, 0x84, - 0x0b, 0xcb, 0xb7, 0x99, 0x4b, 0xdb, 0x67, 0x7b, 0x98, 0x0e, 0xbf, 0x4a, 0x82, 0x76, 0x25, 0x41, - 0x3f, 0x25, 0x41, 0x9f, 0x15, 0xe9, 0xec, 0x2a, 0xd2, 0xf9, 0xae, 0x48, 0xe7, 0xc5, 0xcb, 0x56, - 0xab, 0xae, 0xbb, 0xc1, 0xd5, 0x6f, 0x00, 0x00, 0x00, 0xff, 0xff, 0xf7, 0xd1, 0x30, 0xa1, 0x95, - 0x01, 0x00, 0x00, + // 283 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x44, 0x90, 0x4f, 0x4b, 0xf4, 0x30, + 0x18, 0xc4, 0x37, 0xfd, 0xb3, 0xef, 0xbe, 0x0f, 0x65, 0x91, 0x20, 0xd2, 0xc3, 0x12, 0x4b, 0xf0, + 0xd0, 0x93, 0x17, 0x4f, 0xde, 0xdc, 0xba, 0xa8, 0x27, 0x85, 0x6c, 0x51, 0xf0, 0x96, 0xed, 0x66, + 0xb5, 0xd8, 0x36, 0x25, 0xc9, 0x0a, 0xfd, 0x16, 0x7e, 0x2c, 0x8f, 0x7b, 0xf4, 0x28, 0xed, 0x17, + 0x91, 0xc6, 0x16, 0x6f, 0xf3, 0x1b, 0x92, 0x99, 0xe1, 0x81, 0x79, 0xbd, 0x2f, 0x0a, 0xdd, 0x54, + 0xd9, 0x79, 0xad, 0xa4, 0x91, 0x78, 0x36, 0x32, 0xf5, 0xc1, 0x5d, 0x37, 0x15, 0x3d, 0x05, 0x77, + 0x99, 0xbd, 0xe1, 0x10, 0xfe, 0x5d, 0xef, 0x95, 0x96, 0x4a, 0x87, 0x28, 0x72, 0x63, 0x8f, 0x8d, + 0x48, 0xaf, 0x60, 0x76, 0x2b, 0x0c, 0xe3, 0xd5, 0x8b, 0xc0, 0x47, 0xe0, 0x26, 0x79, 0x15, 0xa2, + 0x08, 0xc5, 0x3e, 0xeb, 0x25, 0xc6, 0xe0, 0xdd, 0x28, 0x59, 0x86, 0x4e, 0x84, 0x62, 0x8f, 0x59, + 0x8d, 0xe7, 0xe0, 0xa4, 0x32, 0x74, 0xad, 0xe3, 0xa4, 0x92, 0x5e, 0x82, 0xff, 0xb0, 0xdb, 0x09, + 0xd5, 0x97, 0xa4, 0xb2, 0x2e, 0xa5, 0x36, 0x36, 0xc2, 0x63, 0x23, 0xe2, 0x13, 0x98, 0xde, 0x71, + 0xfd, 0x2a, 0xb4, 0x0d, 0x0a, 0xd8, 0x40, 0xf4, 0x0c, 0xbc, 0x27, 0x5e, 0x19, 0xbc, 0x80, 0xff, + 0x49, 0x6e, 0x1e, 0x45, 0x66, 0xa4, 0xb2, 0x7f, 0x03, 0xf6, 0x67, 0xd0, 0x7b, 0x98, 0xad, 0x44, + 0x91, 0xbf, 0x0b, 0xd5, 0xf4, 0x1d, 0xcb, 0xed, 0x56, 0x09, 0xad, 0x87, 0x77, 0x23, 0xf6, 0x53, + 0x57, 0xdc, 0xf0, 0xa1, 0xc1, 0x6a, 0x7c, 0x0c, 0xfe, 0xda, 0xf0, 0xb2, 0xb6, 0x6b, 0x03, 0xf6, + 0x0b, 0xc9, 0xe2, 0xb3, 0x25, 0xe8, 0xd0, 0x12, 0xf4, 0xdd, 0x12, 0xf4, 0xd1, 0x91, 0xc9, 0xa1, + 0x23, 0x93, 0xaf, 0x8e, 0x4c, 0x9e, 0x9d, 0x7a, 0xb3, 0x99, 0xda, 0x4b, 0x5e, 0xfc, 0x04, 0x00, + 0x00, 0xff, 0xff, 0x7b, 0xc0, 0x49, 0xf9, 0x5b, 0x01, 0x00, 0x00, } func (m *Syn) Marshal() (dAtA []byte, err error) { @@ -507,62 +415,6 @@ func (m *Ack) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } -func (m *Ruid) Marshal() (dAtA []byte, err error) { - size := m.Size() - dAtA = make([]byte, size) - n, err := m.MarshalToSizedBuffer(dAtA[:size]) - if err != nil { - return nil, err - } - return dAtA[:n], nil -} - -func (m *Ruid) MarshalTo(dAtA []byte) (int, error) { - size := m.Size() - return m.MarshalToSizedBuffer(dAtA[:size]) -} - -func (m *Ruid) MarshalToSizedBuffer(dAtA []byte) (int, error) { - i := len(dAtA) - _ = i - var l int - _ = l - if m.Ruid != 0 { - i = encodeVarintPullsync(dAtA, i, uint64(m.Ruid)) - i-- - dAtA[i] = 0x8 - } - return len(dAtA) - i, nil -} - -func (m *Cancel) Marshal() (dAtA []byte, err error) { - size := m.Size() - dAtA = make([]byte, size) - n, err := m.MarshalToSizedBuffer(dAtA[:size]) - if err != nil { - return nil, err - } - return dAtA[:n], nil -} - -func (m *Cancel) MarshalTo(dAtA []byte) (int, error) { - size := m.Size() - return m.MarshalToSizedBuffer(dAtA[:size]) -} - -func (m *Cancel) MarshalToSizedBuffer(dAtA []byte) (int, error) { - i := len(dAtA) - _ = i - var l int - _ = l - if m.Ruid != 0 { - i = encodeVarintPullsync(dAtA, i, uint64(m.Ruid)) - i-- - dAtA[i] = 0x8 - } - return len(dAtA) - i, nil -} - func (m *GetRange) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -746,30 +598,6 @@ func (m *Ack) Size() (n int) { return n } -func (m *Ruid) Size() (n int) { - if m == nil { - return 0 - } - var l int - _ = l - if m.Ruid != 0 { - n += 1 + sovPullsync(uint64(m.Ruid)) - } - return n -} - -func (m *Cancel) Size() (n int) { - if m == nil { - return 0 - } - var l int - _ = l - if m.Ruid != 0 { - n += 1 + sovPullsync(uint64(m.Ruid)) - } - return n -} - func (m *GetRange) Size() (n int) { if m == nil { return 0 @@ -1026,150 +854,6 @@ func (m *Ack) Unmarshal(dAtA []byte) error { } return nil } -func (m *Ruid) Unmarshal(dAtA []byte) error { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - preIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowPullsync - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - if wireType == 4 { - return fmt.Errorf("proto: Ruid: wiretype end group for non-group") - } - if fieldNum <= 0 { - return fmt.Errorf("proto: Ruid: illegal tag %d (wire type %d)", fieldNum, wire) - } - switch fieldNum { - case 1: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field Ruid", wireType) - } - m.Ruid = 0 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowPullsync - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - m.Ruid |= uint32(b&0x7F) << shift - if b < 0x80 { - break - } - } - default: - iNdEx = preIndex - skippy, err := skipPullsync(dAtA[iNdEx:]) - if err != nil { - return err - } - if skippy < 0 { - return ErrInvalidLengthPullsync - } - if (iNdEx + skippy) < 0 { - return ErrInvalidLengthPullsync - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - iNdEx += skippy - } - } - - if iNdEx > l { - return io.ErrUnexpectedEOF - } - return nil -} -func (m *Cancel) Unmarshal(dAtA []byte) error { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - preIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowPullsync - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - if wireType == 4 { - return fmt.Errorf("proto: Cancel: wiretype end group for non-group") - } - if fieldNum <= 0 { - return fmt.Errorf("proto: Cancel: illegal tag %d (wire type %d)", fieldNum, wire) - } - switch fieldNum { - case 1: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field Ruid", wireType) - } - m.Ruid = 0 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowPullsync - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - m.Ruid |= uint32(b&0x7F) << shift - if b < 0x80 { - break - } - } - default: - iNdEx = preIndex - skippy, err := skipPullsync(dAtA[iNdEx:]) - if err != nil { - return err - } - if skippy < 0 { - return ErrInvalidLengthPullsync - } - if (iNdEx + skippy) < 0 { - return ErrInvalidLengthPullsync - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - iNdEx += skippy - } - } - - if iNdEx > l { - return io.ErrUnexpectedEOF - } - return nil -} func (m *GetRange) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 diff --git a/pkg/pullsync/pb/pullsync.proto b/pkg/pullsync/pb/pullsync.proto index a27b5b1375a..cd3117d8bd5 100644 --- a/pkg/pullsync/pb/pullsync.proto +++ b/pkg/pullsync/pb/pullsync.proto @@ -14,14 +14,6 @@ message Ack { repeated uint64 Cursors = 1; } -message Ruid { - uint32 Ruid = 1; -} - -message Cancel { - uint32 Ruid = 1; -} - message GetRange { int32 Bin = 1; uint64 From = 2; diff --git a/pkg/pullsync/pullsync.go b/pkg/pullsync/pullsync.go index 8a84d19dd86..ae086dacb15 100644 --- a/pkg/pullsync/pullsync.go +++ b/pkg/pullsync/pullsync.go @@ -8,8 +8,6 @@ package pullsync import ( "context" - "crypto/rand" - "encoding/binary" "errors" "fmt" "io" @@ -36,7 +34,7 @@ const loggerName = "pullsync" const ( protocolName = "pullsync" - protocolVersion = "1.1.0" + protocolVersion = "1.2.0" streamName = "pullsync" cursorStreamName = "cursors" cancelStreamName = "cancel" @@ -52,8 +50,6 @@ var ( const ( storagePutTimeout = 5 * time.Second - // explicit ruid cancellation message timeout - cancellationTimeout = 5 * time.Second ) // how many maximum chunks in a batch @@ -65,12 +61,9 @@ type Interface interface { // It returns the BinID of highest chunk that was synced from the given // interval. If the requested interval is too large, the downstream peer // has the liberty to provide less chunks than requested. - SyncInterval(ctx context.Context, peer swarm.Address, bin uint8, from, to uint64) (topmost uint64, ruid uint32, err error) + SyncInterval(ctx context.Context, peer swarm.Address, bin uint8, from, to uint64) (topmost uint64, err error) // GetCursors retrieves all cursors from a downstream peer. GetCursors(ctx context.Context, peer swarm.Address) ([]uint64, error) - // CancelRuid cancels active pullsync operation identified by ruid on - // a downstream peer. - CancelRuid(ctx context.Context, peer swarm.Address, ruid uint32) error } type Syncer struct { @@ -84,14 +77,12 @@ type Syncer struct { validStamp postage.ValidStampFn rate *rate.Rate - ruidMtx sync.Mutex - ruidCtx map[string]map[uint32]func() - Interface io.Closer } func New(streamer p2p.Streamer, storage pullstorage.Storer, unwrap func(swarm.Chunk), validStamp postage.ValidStampFn, logger log.Logger) *Syncer { + return &Syncer{ streamer: streamer, storage: storage, @@ -99,7 +90,6 @@ func New(streamer p2p.Streamer, storage pullstorage.Storer, unwrap func(swarm.Ch unwrap: unwrap, validStamp: validStamp, logger: logger.WithName(loggerName).Register(), - ruidCtx: make(map[string]map[uint32]func()), wg: sync.WaitGroup{}, quit: make(chan struct{}), rate: rate.New(rateWindowSize), @@ -119,10 +109,6 @@ func (s *Syncer) Protocol() p2p.ProtocolSpec { Name: cursorStreamName, Handler: s.cursorHandler, }, - { - Name: cancelStreamName, - Handler: s.cancelHandler, - }, }, } } @@ -131,61 +117,46 @@ func (s *Syncer) Protocol() p2p.ProtocolSpec { // It returns the BinID of highest chunk that was synced from the given interval. // If the requested interval is too large, the downstream peer has the liberty to // provide less chunks than requested. -func (s *Syncer) SyncInterval(ctx context.Context, peer swarm.Address, bin uint8, from, to uint64) (topmost uint64, ruid uint32, err error) { +func (s *Syncer) SyncInterval(ctx context.Context, peer swarm.Address, bin uint8, from, to uint64) (topmost uint64, err error) { isLiveSync := to == MaxCursor loggerV2 := s.logger.V(2).Register() - var ru pb.Ruid stream, err := s.streamer.NewStream(ctx, peer, nil, protocolName, protocolVersion, streamName) if err != nil { - return 0, 0, fmt.Errorf("new stream: %w", err) + return 0, fmt.Errorf("new stream: %w", err) } defer func() { if err != nil { _ = stream.Reset() - loggerV2.Debug("error syncing peer", "peer_address", peer, "ruid", ru.Ruid, "bin", bin, "from", from, "to", to, "error", err) + loggerV2.Debug("error syncing peer", "peer_address", peer, "bin", bin, "from", from, "to", to, "error", err) } else { go stream.FullClose() } }() - b := make([]byte, 4) - _, err = rand.Read(b) - if err != nil { - return 0, 0, fmt.Errorf("crypto rand: %w", err) - } - - ru.Ruid = binary.BigEndian.Uint32(b) - loggerV2.Debug("syncing peer", "peer_address", peer, "ruid", ru.Ruid, "bin", bin, "from", from, "to", to) - w, r := protobuf.NewWriterAndReader(stream) - if err = w.WriteMsgWithContext(ctx, &ru); err != nil { - return 0, 0, fmt.Errorf("write ruid: %w", err) - } - rangeMsg := &pb.GetRange{Bin: int32(bin), From: from, To: to} if err = w.WriteMsgWithContext(ctx, rangeMsg); err != nil { - return 0, ru.Ruid, fmt.Errorf("write get range: %w", err) + return 0, fmt.Errorf("write get range: %w", err) } var offer pb.Offer if err = r.ReadMsgWithContext(ctx, &offer); err != nil { - return 0, ru.Ruid, fmt.Errorf("read offer: %w", err) + return 0, fmt.Errorf("read offer: %w", err) } if len(offer.Hashes)%swarm.HashSize != 0 { - return 0, ru.Ruid, fmt.Errorf("inconsistent hash length") + return 0, fmt.Errorf("inconsistent hash length") } // empty interval (no chunks present in interval). // return the end of the requested range as topmost. if len(offer.Hashes) == 0 { - return offer.Topmost, ru.Ruid, nil + return offer.Topmost, nil } topmost = offer.Topmost - ruid = ru.Ruid var ( bvLen = len(offer.Hashes) / swarm.HashSize @@ -293,9 +264,15 @@ func (s *Syncer) SyncInterval(ctx context.Context, peer swarm.Address, bin uint8 } // handler handles an incoming request to sync an interval -func (s *Syncer) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) (err error) { +func (s *Syncer) handler(streamCtx context.Context, p p2p.Peer, stream p2p.Stream) (err error) { loggerV2 := s.logger.V(2).Register() + select { + case <-s.quit: + return nil + default: + } + r := protobuf.NewReader(stream) defer func() { if err != nil { @@ -304,46 +281,20 @@ func (s *Syncer) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) (er _ = stream.FullClose() } }() - var ru pb.Ruid - if err := r.ReadMsgWithContext(ctx, &ru); err != nil { - return fmt.Errorf("send ruid: %w", err) - } - loggerV2.Debug("peer pulling", "peer_address", p.Address, "ruid", ru.Ruid) - ctx, cancel := context.WithCancel(ctx) + loggerV2.Debug("peer pulling", "peer_address", p.Address) + + ctx, cancel := context.WithCancel(streamCtx) + defer cancel() - s.ruidMtx.Lock() - if _, ok := s.ruidCtx[p.Address.ByteString()]; !ok { - s.ruidCtx[p.Address.ByteString()] = make(map[uint32]func()) - } - if c, ok := s.ruidCtx[p.Address.ByteString()][ru.Ruid]; ok { - s.metrics.DuplicateRuid.Inc() - c() - } - s.ruidCtx[p.Address.ByteString()][ru.Ruid] = cancel - s.ruidMtx.Unlock() - cc := make(chan struct{}) - defer close(cc) go func() { select { case <-s.quit: + cancel() case <-ctx.Done(): - case <-cc: - } - cancel() - s.ruidMtx.Lock() - delete(s.ruidCtx[p.Address.ByteString()], ru.Ruid) - if len(s.ruidCtx[p.Address.ByteString()]) == 0 { - delete(s.ruidCtx, p.Address.ByteString()) + return } - s.ruidMtx.Unlock() }() - select { - case <-s.quit: - return nil - default: - } - s.wg.Add(1) defer s.wg.Done() @@ -499,70 +450,6 @@ func (s *Syncer) cursorHandler(ctx context.Context, p p2p.Peer, stream p2p.Strea return nil } -func (s *Syncer) CancelRuid(ctx context.Context, peer swarm.Address, ruid uint32) (err error) { - loggerV2 := s.logger.V(2).Register() - - stream, err := s.streamer.NewStream(ctx, peer, nil, protocolName, protocolVersion, cancelStreamName) - if err != nil { - return fmt.Errorf("new stream: %w", err) - } - loggerV2.Debug("sending ruid cancellation", "ruid", ruid, "peer_address", peer) - w := protobuf.NewWriter(stream) - defer func() { - if err != nil { - _ = stream.Reset() - loggerV2.Debug("error sending ruid cancellation failed", "ruid", ruid, "peer_address", peer, "error", err) - - } else { - go stream.FullClose() - } - }() - - ctx, cancel := context.WithTimeout(ctx, cancellationTimeout) - defer cancel() - - var c pb.Cancel - c.Ruid = ruid - if err := w.WriteMsgWithContext(ctx, &c); err != nil { - return fmt.Errorf("send cancellation: %w", err) - } - return nil -} - -// handler handles an incoming request to explicitly cancel a ruid -func (s *Syncer) cancelHandler(ctx context.Context, p p2p.Peer, stream p2p.Stream) (err error) { - loggerV2 := s.logger.V(2).Register() - - r := protobuf.NewReader(stream) - var c pb.Cancel - defer func() { - if err != nil { - _ = stream.Reset() - loggerV2.Debug("cancellation failed", "peer_address", p.Address, "ruid", c.Ruid, "error", err) - } else { - _ = stream.FullClose() - } - }() - - if err := r.ReadMsgWithContext(ctx, &c); err != nil { - return fmt.Errorf("read cancel: %w", err) - } - - loggerV2.Debug("cancelling", "peer_address", p.Address, "ruid", c.Ruid) - - s.ruidMtx.Lock() - defer s.ruidMtx.Unlock() - - if cancel, ok := s.ruidCtx[p.Address.ByteString()][c.Ruid]; ok { - cancel() - delete(s.ruidCtx[p.Address.ByteString()], c.Ruid) - if len(s.ruidCtx[p.Address.ByteString()]) == 0 { - delete(s.ruidCtx, p.Address.ByteString()) - } - } - return nil -} - func (s *Syncer) Rate() float64 { return s.rate.Rate() } @@ -576,15 +463,6 @@ func (s *Syncer) Close() error { s.wg.Wait() }() - // cancel all contexts - s.ruidMtx.Lock() - for _, peer := range s.ruidCtx { - for _, c := range peer { - c() - } - } - s.ruidMtx.Unlock() - select { case <-cc: case <-time.After(5 * time.Second): diff --git a/pkg/pullsync/pullsync_test.go b/pkg/pullsync/pullsync_test.go index a0477c3c3aa..333bbaacd59 100644 --- a/pkg/pullsync/pullsync_test.go +++ b/pkg/pullsync/pullsync_test.go @@ -58,7 +58,7 @@ func TestIncoming_WantEmptyInterval(t *testing.T) { psClient, clientDb = newPullSync(recorder) ) - topmost, _, err := psClient.SyncInterval(context.Background(), swarm.ZeroAddress, 1, 0, 5) + topmost, err := psClient.SyncInterval(context.Background(), swarm.ZeroAddress, 1, 0, 5) if err != nil { t.Fatal(err) } @@ -82,7 +82,7 @@ func TestIncoming_WantNone(t *testing.T) { psClient, clientDb = newPullSync(recorder, mock.WithChunks(chunks...)) ) - topmost, _, err := psClient.SyncInterval(context.Background(), swarm.ZeroAddress, 0, 0, 5) + topmost, err := psClient.SyncInterval(context.Background(), swarm.ZeroAddress, 0, 0, 5) if err != nil { t.Fatal(err) } @@ -105,7 +105,7 @@ func TestIncoming_WantOne(t *testing.T) { psClient, clientDb = newPullSync(recorder, mock.WithChunks(someChunks(1, 2, 3, 4)...)) ) - topmost, _, err := psClient.SyncInterval(context.Background(), swarm.ZeroAddress, 0, 0, 5) + topmost, err := psClient.SyncInterval(context.Background(), swarm.ZeroAddress, 0, 0, 5) if err != nil { t.Fatal(err) } @@ -131,7 +131,7 @@ func TestIncoming_WantAll(t *testing.T) { psClient, clientDb = newPullSync(recorder) ) - topmost, _, err := psClient.SyncInterval(context.Background(), swarm.ZeroAddress, 0, 0, 5) + topmost, err := psClient.SyncInterval(context.Background(), swarm.ZeroAddress, 0, 0, 5) if err != nil { t.Fatal(err) } @@ -162,7 +162,7 @@ func TestIncoming_UnsolicitedChunk(t *testing.T) { psClient, _ = newPullSync(recorder) ) - _, _, err := psClient.SyncInterval(context.Background(), swarm.ZeroAddress, 0, 0, 5) + _, err := psClient.SyncInterval(context.Background(), swarm.ZeroAddress, 0, 0, 5) if !errors.Is(err, pullsync.ErrUnsolicitedChunk) { t.Fatalf("expected ErrUnsolicitedChunk but got %v", err) }