Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: stream related metrics #3474

Merged
merged 3 commits into from Nov 1, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 3 additions & 1 deletion pkg/p2p/libp2p/connections_test.go
Expand Up @@ -543,7 +543,9 @@ func TestConnectRepeatHandshake(t *testing.T) {
t.Fatal(err)
}

if _, err := s2.HandshakeService().Handshake(ctx, libp2p.NewStream(stream), info.Addrs[0], info.ID); err != nil {
s := s2.WrapStream(stream)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since s is only used in the Handshake method, I would suggest nesting it there.


if _, err := s2.HandshakeService().Handshake(ctx, s, info.Addrs[0], info.ID); err != nil {
t.Fatal(err)
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/p2p/libp2p/export_test.go
Expand Up @@ -22,6 +22,10 @@ func (s *Service) NewStreamForPeerID(peerID libp2ppeer.ID, protocolName, protoco
return s.newStreamForPeerID(context.Background(), peerID, protocolName, protocolVersion, streamName)
}

func (s *Service) WrapStream(ns network.Stream) *stream {
return newStream(ns, s.metrics)
}

func (s *Service) Host() host.Host {
return s.host
}
Expand Down
31 changes: 27 additions & 4 deletions pkg/p2p/libp2p/libp2p.go
Expand Up @@ -316,6 +316,10 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay
connMetricNotify := newConnMetricNotify(s.metrics)
h.Network().Notify(peerRegistry) // update peer registry on network events
h.Network().Notify(connMetricNotify)

streamMetricNotify := newStreamMetricNotify(s.metrics)
h.Network().Notify(streamMetricNotify)

return s, nil
}

Expand Down Expand Up @@ -361,7 +365,7 @@ func (s *Service) handleIncoming(stream network.Stream) {
}

peerID := stream.Conn().RemotePeer()
handshakeStream := NewStream(stream)
handshakeStream := newStream(stream, s.metrics)
i, err := s.handshakeService.Handle(s.ctx, handshakeStream, stream.Conn().RemoteMultiaddr(), peerID)
if err != nil {
s.logger.Debug("stream handler: handshake: handle failed", "peer_id", peerID, "error", err)
Expand Down Expand Up @@ -530,7 +534,7 @@ func (s *Service) AddProtocol(p p2p.ProtocolSpec) (err error) {
return
}

stream := newStream(streamlibp2p)
stream := newStream(streamlibp2p, s.metrics)

// exchange headers
ctx, cancel := context.WithTimeout(s.ctx, s.HeadersRWTimeout)
Expand Down Expand Up @@ -693,7 +697,7 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz.
return nil, fmt.Errorf("connect new stream: %w", err)
}

handshakeStream := NewStream(stream)
handshakeStream := newStream(stream, s.metrics)
i, err := s.handshakeService.Handshake(ctx, handshakeStream, stream.Conn().RemoteMultiaddr(), stream.Conn().RemotePeer())
if err != nil {
_ = handshakeStream.Reset()
Expand Down Expand Up @@ -880,7 +884,7 @@ func (s *Service) NewStream(ctx context.Context, overlay swarm.Address, headers
return nil, fmt.Errorf("new stream for peerid: %w", err)
}

stream := newStream(streamlibp2p)
stream := newStream(streamlibp2p, s.metrics)

// tracing: add span context header
if headers == nil {
Expand Down Expand Up @@ -1080,6 +1084,25 @@ func (c *connectionNotifier) Connected(_ network.Network, _ network.Conn) {
c.metrics.HandledConnectionCount.Inc()
}

func newStreamMetricNotify(m metrics) *streamNotifier {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd name it newStreamNotifier.

return &streamNotifier{
metrics: m,
Notifiee: new(network.NoopNotifiee),
}
}

type streamNotifier struct {
metrics metrics
network.Notifiee
}

func (sn *streamNotifier) OpenedStream(network.Network, network.Stream) {
sn.metrics.Libp2pCreatedStreamCount.Inc()
}
func (sn *streamNotifier) ClosedStream(network.Network, network.Stream) {
sn.metrics.Libp2pClosedStreamCount.Inc()
}

// isNetworkOrHostUnreachableError determines based on the
// given error whether the host or network is reachable.
func isNetworkOrHostUnreachableError(err error) bool {
Expand Down
28 changes: 28 additions & 0 deletions pkg/p2p/libp2p/metrics.go
Expand Up @@ -16,6 +16,10 @@ type metrics struct {
CreatedConnectionCount prometheus.Counter
HandledConnectionCount prometheus.Counter
CreatedStreamCount prometheus.Counter
ClosedStreamCount prometheus.Counter
StreamResetCount prometheus.Counter
Libp2pCreatedStreamCount prometheus.Counter
Libp2pClosedStreamCount prometheus.Counter
HandledStreamCount prometheus.Counter
BlocklistedPeerCount prometheus.Counter
BlocklistedPeerErrCount prometheus.Counter
Expand Down Expand Up @@ -49,6 +53,30 @@ func newMetrics() metrics {
Name: "created_stream_count",
Help: "Number of initiated outgoing libp2p streams.",
}),
ClosedStreamCount: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "closed_stream_count",
Help: "Number of closed outgoing libp2p streams.",
}),
StreamResetCount: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "stream_reset_count",
Help: "Number of outgoing libp2p streams resets.",
}),
Libp2pCreatedStreamCount: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "library_reported_created_stream_count",
Help: "Number of initiated outgoing libp2p streams reported by the library.",
}),
Libp2pClosedStreamCount: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "library_reported_closed_stream_count",
Help: "Number of closed outgoing libp2p streams reported by the library.",
}),
HandledStreamCount: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Expand Down
21 changes: 12 additions & 9 deletions pkg/p2p/libp2p/stream.go
Expand Up @@ -23,14 +23,11 @@ type stream struct {
network.Stream
headers map[string][]byte
responseHeaders map[string][]byte
metrics metrics
}

func NewStream(s network.Stream) p2p.Stream {
return &stream{Stream: s}
}

func newStream(s network.Stream) *stream {
return &stream{Stream: s}
func newStream(s network.Stream, metrics metrics) *stream {
return &stream{Stream: s, metrics: metrics}
}
func (s *stream) Headers() p2p.Headers {
return s.headers
Expand All @@ -40,12 +37,18 @@ func (s *stream) ResponseHeaders() p2p.Headers {
return s.responseHeaders
}

func (s *stream) Reset() error {
defer s.metrics.StreamResetCount.Inc()
return s.Stream.Reset()
}

func (s *stream) FullClose() error {
defer s.metrics.ClosedStreamCount.Inc()
// close the stream to make sure it is gc'd
defer s.Close()

if err := s.CloseWrite(); err != nil {
_ = s.Reset()
_ = s.Stream.Reset()
return err
}

Expand All @@ -60,11 +63,11 @@ func (s *stream) FullClose() error {
// protocol the other side is speaking.
n, err := s.Read([]byte{0})
if n > 0 || err == nil {
_ = s.Reset()
_ = s.Stream.Reset()
return errExpectedEof
}
if !errors.Is(err, io.EOF) {
_ = s.Reset()
_ = s.Stream.Reset()
return err
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/postage/batchservice/batchservice.go
Expand Up @@ -206,7 +206,7 @@ func (svc *batchService) UpdatePrice(price *big.Int, txHash common.Hash) error {
return fmt.Errorf("update checksum: %w", err)
}

svc.logger.Debug("updated chain price", "new_price", price, txHash, "tx_checksum", sum)
svc.logger.Debug("updated chain price", "new_price", price, "tx_hash", txHash, "tx_checksum", sum)
return nil
}

Expand Down