Skip to content

Commit

Permalink
'topic' lock replaced by atomic boolean
Browse files Browse the repository at this point in the history
  • Loading branch information
aratz-lasa committed Jul 21, 2022
1 parent 60cf380 commit 780ebac
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 26 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ require (
github.com/libp2p/go-msgio v0.2.0
github.com/multiformats/go-multiaddr v0.5.0
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee
go.uber.org/atomic v1.7.0
)
37 changes: 11 additions & 26 deletions topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
pb "github.com/libp2p/go-libp2p-pubsub/pb"

"github.com/libp2p/go-libp2p-core/peer"
"go.uber.org/atomic"
)

// ErrTopicClosed is returned if a Topic is utilized after it has been closed
Expand All @@ -30,8 +31,7 @@ type Topic struct {
evtHandlerMux sync.RWMutex
evtHandlers map[*TopicEventHandler]struct{}

mux sync.RWMutex
closed bool
closed atomic.Bool
}

// String returns the topic associated with t
Expand All @@ -47,10 +47,7 @@ func (t *Topic) SetScoreParams(p *TopicScoreParams) error {
return fmt.Errorf("invalid topic score parameters: %w", err)
}

t.mux.Lock()
defer t.mux.Unlock()

if t.closed {
if t.closed.Load() {
return ErrTopicClosed
}

Expand Down Expand Up @@ -84,9 +81,7 @@ func (t *Topic) SetScoreParams(p *TopicScoreParams) error {
// EventHandler creates a handle for topic specific events
// Multiple event handlers may be created and will operate independently of each other
func (t *Topic) EventHandler(opts ...TopicEventHandlerOpt) (*TopicEventHandler, error) {
t.mux.RLock()
defer t.mux.RUnlock()
if t.closed {
if t.closed.Load() {
return nil, ErrTopicClosed
}

Expand Down Expand Up @@ -141,9 +136,7 @@ func (t *Topic) sendNotification(evt PeerEvent) {
// Note that subscription is not an instantaneous operation. It may take some time
// before the subscription is processed by the pubsub main loop and propagated to our peers.
func (t *Topic) Subscribe(opts ...SubOpt) (*Subscription, error) {
t.mux.RLock()
defer t.mux.RUnlock()
if t.closed {
if t.closed.Load() {
return nil, ErrTopicClosed
}

Expand Down Expand Up @@ -184,9 +177,7 @@ func (t *Topic) Subscribe(opts ...SubOpt) (*Subscription, error) {
// cancel function. Subsequent calls increase the reference counter.
// To completely disable the relay, all references must be cancelled.
func (t *Topic) Relay() (RelayCancelFunc, error) {
t.mux.RLock()
defer t.mux.RUnlock()
if t.closed {
if t.closed.Load() {
return nil, ErrTopicClosed
}

Expand Down Expand Up @@ -215,16 +206,14 @@ type ProvideKey func() (crypto.PrivKey, peer.ID)
type PublishOptions struct {
ready RouterReady
customKey ProvideKey
local bool
local bool
}

type PubOpt func(pub *PublishOptions) error

// Publish publishes data to topic.
func (t *Topic) Publish(ctx context.Context, data []byte, opts ...PubOpt) error {
t.mux.RLock()
defer t.mux.RUnlock()
if t.closed {
if t.closed.Load() {
return ErrTopicClosed
}

Expand Down Expand Up @@ -347,9 +336,7 @@ func WithSecretKeyAndPeerId(key crypto.PrivKey, pid peer.ID) PubOpt {
// Close closes down the topic. Will return an error unless there are no active event handlers or subscriptions.
// Does not error if the topic is already closed.
func (t *Topic) Close() error {
t.mux.Lock()
defer t.mux.Unlock()
if t.closed {
if t.closed.Load() {
return nil
}

Expand All @@ -364,17 +351,15 @@ func (t *Topic) Close() error {
err := <-req.resp

if err == nil {
t.closed = true
t.closed.Swap(true)
}

return err
}

// ListPeers returns a list of peers we are connected to in the given topic.
func (t *Topic) ListPeers() []peer.ID {
t.mux.RLock()
defer t.mux.RUnlock()
if t.closed {
if t.closed.Load() {
return []peer.ID{}
}

Expand Down

0 comments on commit 780ebac

Please sign in to comment.