Skip to content

Commit

Permalink
Switch to using a generic pool for channels in Connection
Browse files Browse the repository at this point in the history
Signed-off-by: Ed Warnicke <hagbard@gmail.com>
  • Loading branch information
edwarnicke committed Sep 9, 2022
1 parent 4516f0c commit 4dfb8cd
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 26 deletions.
34 changes: 11 additions & 23 deletions core/channel.go
Expand Up @@ -110,34 +110,22 @@ type Channel struct {
}

func (c *Connection) newChannel(reqChanBufSize, replyChanBufSize int) (*Channel, error) {
// create new channel
channel := &Channel{
conn: c,
msgCodec: c.codec,
msgIdentifier: c,
reqChan: make(chan *vppRequest, reqChanBufSize),
replyChan: make(chan *vppReply, replyChanBufSize),
replyTimeout: DefaultReplyTimeout,
receiveReplyTimeout: ReplyChannelTimeout,
// get a channel from the pool
channel := c.channelPool.Get()
if channel == nil {
return nil, errors.New("all channel IDs are in use")
}
if len(channel.reqChan) != reqChanBufSize {
channel.reqChan = make(chan *vppRequest, reqChanBufSize)
}
if len(channel.replyChan) != replyChanBufSize {
channel.replyChan = make(chan *vppReply, replyChanBufSize)
}

// store API channel within the client
c.channelsLock.Lock()
if len(c.channels) >= 0x7fff {
return nil, errors.New("all channel IDs are used")
}
for {
c.nextChannelID++
chID := c.nextChannelID & 0x7fff
_, ok := c.channels[chID]
if !ok {
channel.id = chID
c.channels[chID] = channel
break
}
}
c.channels[channel.id] = channel
c.channelsLock.Unlock()

return channel, nil
}

Expand Down
29 changes: 26 additions & 3 deletions core/connection.go
Expand Up @@ -24,6 +24,7 @@ import (
"time"

logger "github.com/sirupsen/logrus"
"go.fd.io/govpp/core/genericpool"

"go.fd.io/govpp/adapter"
"go.fd.io/govpp/api"
Expand Down Expand Up @@ -109,9 +110,9 @@ type Connection struct {
msgIDs map[string]uint16 // map of message IDs indexed by message name + CRC
msgMapByPath map[string]map[uint16]api.Message // map of messages indexed by message ID which are indexed by path

channelsLock sync.RWMutex // lock for the channels map and the channel ID
nextChannelID uint16 // next potential channel ID (the real limit is 2^15)
channels map[uint16]*Channel // map of all API channels indexed by the channel ID
channelsLock sync.RWMutex // lock for the channels map and the channel ID
channels map[uint16]*Channel // map of all API channels indexed by the channel ID
channelPool *genericpool.Pool[*Channel]

subscriptionsLock sync.RWMutex // lock for the subscriptions map
subscriptions map[uint16][]*subscriptionCtx // map od all notification subscriptions indexed by message ID
Expand Down Expand Up @@ -154,6 +155,26 @@ func newConnection(binapi adapter.VppAPI, attempts int, interval time.Duration)
mux: &sync.Mutex{},
},
}

var nextChannelID uint32
c.channelPool = genericpool.New[*Channel](func() *Channel {
chID := atomic.AddUint32(&nextChannelID, 1)
if chID > 0x7fff {
return nil
}
// create new channel
return &Channel{
id: uint16(chID),
conn: c,
msgCodec: c.codec,
msgIdentifier: c,
reqChan: make(chan *vppRequest, RequestChanBufSize),
replyChan: make(chan *vppReply, ReplyChanBufSize),
replyTimeout: DefaultReplyTimeout,
receiveReplyTimeout: ReplyChannelTimeout,
}
})

binapi.SetMsgCallback(c.msgCallback)
return c
}
Expand Down Expand Up @@ -269,6 +290,8 @@ func (c *Connection) releaseAPIChannel(ch *Channel) {
"channel": ch.id,
}).Debug("API channel released")

c.channelPool.Put(ch)

// delete the channel from channels map
c.channelsLock.Lock()
delete(c.channels, ch.id)
Expand Down
25 changes: 25 additions & 0 deletions core/genericpool/generic_pool.go
@@ -0,0 +1,25 @@
package genericpool

import (
"sync"
)

type Pool[T any] struct {
p sync.Pool
}

func (p *Pool[T]) Get() T {
return p.p.Get().(T)
}

func (p *Pool[T]) Put(x T) {
p.p.Put(x)
}

func New[T any](f func() T) *Pool[T] {
return &Pool[T]{
p: sync.Pool{
New: func() any { return f() },
},
}
}

0 comments on commit 4dfb8cd

Please sign in to comment.