From 3d49da221ef681c0ac3b2f2d3377b52e9b5de330 Mon Sep 17 00:00:00 2001 From: notanatol Date: Mon, 31 Oct 2022 12:28:46 +0000 Subject: [PATCH 1/3] fix: stream related metrics --- pkg/p2p/libp2p/connections_test.go | 4 +++- pkg/p2p/libp2p/export_test.go | 4 ++++ pkg/p2p/libp2p/libp2p.go | 31 ++++++++++++++++++++++++++---- pkg/p2p/libp2p/metrics.go | 28 +++++++++++++++++++++++++++ pkg/p2p/libp2p/stream.go | 21 +++++++++++--------- 5 files changed, 74 insertions(+), 14 deletions(-) diff --git a/pkg/p2p/libp2p/connections_test.go b/pkg/p2p/libp2p/connections_test.go index 3f2d64164da..52bba9b005e 100644 --- a/pkg/p2p/libp2p/connections_test.go +++ b/pkg/p2p/libp2p/connections_test.go @@ -543,7 +543,9 @@ func TestConnectRepeatHandshake(t *testing.T) { t.Fatal(err) } - if _, err := s2.HandshakeService().Handshake(ctx, libp2p.NewStream(stream), info.Addrs[0], info.ID); err != nil { + s := s2.WrapStream(stream) + + if _, err := s2.HandshakeService().Handshake(ctx, s, info.Addrs[0], info.ID); err != nil { t.Fatal(err) } diff --git a/pkg/p2p/libp2p/export_test.go b/pkg/p2p/libp2p/export_test.go index 698247b5ccd..c9fa401d654 100644 --- a/pkg/p2p/libp2p/export_test.go +++ b/pkg/p2p/libp2p/export_test.go @@ -22,6 +22,10 @@ func (s *Service) NewStreamForPeerID(peerID libp2ppeer.ID, protocolName, protoco return s.newStreamForPeerID(context.Background(), peerID, protocolName, protocolVersion, streamName) } +func (s *Service) WrapStream(ns network.Stream) *stream { + return newStream(ns, s.metrics) +} + func (s *Service) Host() host.Host { return s.host } diff --git a/pkg/p2p/libp2p/libp2p.go b/pkg/p2p/libp2p/libp2p.go index 10b7fe57940..63fa21697c0 100644 --- a/pkg/p2p/libp2p/libp2p.go +++ b/pkg/p2p/libp2p/libp2p.go @@ -316,6 +316,10 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay connMetricNotify := newConnMetricNotify(s.metrics) h.Network().Notify(peerRegistry) // update peer registry on network events h.Network().Notify(connMetricNotify) + + streamMetricNotify := newStreamMetricNotify(s.metrics) + h.Network().Notify(streamMetricNotify) + return s, nil } @@ -361,7 +365,7 @@ func (s *Service) handleIncoming(stream network.Stream) { } peerID := stream.Conn().RemotePeer() - handshakeStream := NewStream(stream) + handshakeStream := newStream(stream, s.metrics) i, err := s.handshakeService.Handle(s.ctx, handshakeStream, stream.Conn().RemoteMultiaddr(), peerID) if err != nil { s.logger.Debug("stream handler: handshake: handle failed", "peer_id", peerID, "error", err) @@ -530,7 +534,7 @@ func (s *Service) AddProtocol(p p2p.ProtocolSpec) (err error) { return } - stream := newStream(streamlibp2p) + stream := newStream(streamlibp2p, s.metrics) // exchange headers ctx, cancel := context.WithTimeout(s.ctx, s.HeadersRWTimeout) @@ -693,7 +697,7 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz. return nil, fmt.Errorf("connect new stream: %w", err) } - handshakeStream := NewStream(stream) + handshakeStream := newStream(stream, s.metrics) i, err := s.handshakeService.Handshake(ctx, handshakeStream, stream.Conn().RemoteMultiaddr(), stream.Conn().RemotePeer()) if err != nil { _ = handshakeStream.Reset() @@ -880,7 +884,7 @@ func (s *Service) NewStream(ctx context.Context, overlay swarm.Address, headers return nil, fmt.Errorf("new stream for peerid: %w", err) } - stream := newStream(streamlibp2p) + stream := newStream(streamlibp2p, s.metrics) // tracing: add span context header if headers == nil { @@ -1080,6 +1084,25 @@ func (c *connectionNotifier) Connected(_ network.Network, _ network.Conn) { c.metrics.HandledConnectionCount.Inc() } +func newStreamMetricNotify(m metrics) *streamNotifier { + return &streamNotifier{ + metrics: m, + Notifiee: new(network.NoopNotifiee), + } +} + +type streamNotifier struct { + metrics metrics + network.Notifiee +} + +func (sn *streamNotifier) OpenedStream(network.Network, network.Stream) { + sn.metrics.Libp2pCreatedStreamCount.Inc() +} +func (sn *streamNotifier) ClosedStream(network.Network, network.Stream) { + sn.metrics.Libp2pClosedStreamCount.Inc() +} + // isNetworkOrHostUnreachableError determines based on the // given error whether the host or network is reachable. func isNetworkOrHostUnreachableError(err error) bool { diff --git a/pkg/p2p/libp2p/metrics.go b/pkg/p2p/libp2p/metrics.go index 7b974326173..8e0c8d8dd92 100644 --- a/pkg/p2p/libp2p/metrics.go +++ b/pkg/p2p/libp2p/metrics.go @@ -16,6 +16,10 @@ type metrics struct { CreatedConnectionCount prometheus.Counter HandledConnectionCount prometheus.Counter CreatedStreamCount prometheus.Counter + ClosedStreamCount prometheus.Counter + StreamResetCount prometheus.Counter + Libp2pCreatedStreamCount prometheus.Counter + Libp2pClosedStreamCount prometheus.Counter HandledStreamCount prometheus.Counter BlocklistedPeerCount prometheus.Counter BlocklistedPeerErrCount prometheus.Counter @@ -49,6 +53,30 @@ func newMetrics() metrics { Name: "created_stream_count", Help: "Number of initiated outgoing libp2p streams.", }), + ClosedStreamCount: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "closed_stream_count", + Help: "Number of closed outgoing libp2p streams.", + }), + StreamResetCount: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "stream_reset_count", + Help: "Number of outgoing libp2p streams resets.", + }), + Libp2pCreatedStreamCount: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "libp2p_created_stream_count", + Help: "Number of initiated outgoing libp2p streams reported by the library.", + }), + Libp2pClosedStreamCount: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "libp2p_closed_stream_count", + Help: "Number of closed outgoing libp2p streams reported by the library.", + }), HandledStreamCount: prometheus.NewCounter(prometheus.CounterOpts{ Namespace: m.Namespace, Subsystem: subsystem, diff --git a/pkg/p2p/libp2p/stream.go b/pkg/p2p/libp2p/stream.go index 5ed94688d7a..0b1791aa3f0 100644 --- a/pkg/p2p/libp2p/stream.go +++ b/pkg/p2p/libp2p/stream.go @@ -23,14 +23,11 @@ type stream struct { network.Stream headers map[string][]byte responseHeaders map[string][]byte + metrics metrics } -func NewStream(s network.Stream) p2p.Stream { - return &stream{Stream: s} -} - -func newStream(s network.Stream) *stream { - return &stream{Stream: s} +func newStream(s network.Stream, metrics metrics) *stream { + return &stream{Stream: s, metrics: metrics} } func (s *stream) Headers() p2p.Headers { return s.headers @@ -40,12 +37,18 @@ func (s *stream) ResponseHeaders() p2p.Headers { return s.responseHeaders } +func (s *stream) Reset() error { + defer s.metrics.StreamResetCount.Inc() + return s.Stream.Reset() +} + func (s *stream) FullClose() error { + defer s.metrics.ClosedStreamCount.Inc() // close the stream to make sure it is gc'd defer s.Close() if err := s.CloseWrite(); err != nil { - _ = s.Reset() + _ = s.Stream.Reset() return err } @@ -60,11 +63,11 @@ func (s *stream) FullClose() error { // protocol the other side is speaking. n, err := s.Read([]byte{0}) if n > 0 || err == nil { - _ = s.Reset() + _ = s.Stream.Reset() return errExpectedEof } if !errors.Is(err, io.EOF) { - _ = s.Reset() + _ = s.Stream.Reset() return err } return nil From 72112e20b73a1a991689e047d615ed607efcb99b Mon Sep 17 00:00:00 2001 From: notanatol Date: Mon, 31 Oct 2022 18:08:53 +0000 Subject: [PATCH 2/3] fix: library reported --- pkg/p2p/libp2p/metrics.go | 4 ++-- pkg/postage/batchservice/batchservice.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/p2p/libp2p/metrics.go b/pkg/p2p/libp2p/metrics.go index 8e0c8d8dd92..b0c9ea155b0 100644 --- a/pkg/p2p/libp2p/metrics.go +++ b/pkg/p2p/libp2p/metrics.go @@ -68,13 +68,13 @@ func newMetrics() metrics { Libp2pCreatedStreamCount: prometheus.NewCounter(prometheus.CounterOpts{ Namespace: m.Namespace, Subsystem: subsystem, - Name: "libp2p_created_stream_count", + Name: "library_reported_created_stream_count", Help: "Number of initiated outgoing libp2p streams reported by the library.", }), Libp2pClosedStreamCount: prometheus.NewCounter(prometheus.CounterOpts{ Namespace: m.Namespace, Subsystem: subsystem, - Name: "libp2p_closed_stream_count", + Name: "library_reported_closed_stream_count", Help: "Number of closed outgoing libp2p streams reported by the library.", }), HandledStreamCount: prometheus.NewCounter(prometheus.CounterOpts{ diff --git a/pkg/postage/batchservice/batchservice.go b/pkg/postage/batchservice/batchservice.go index 8ad70d69db6..82d48a3d76f 100644 --- a/pkg/postage/batchservice/batchservice.go +++ b/pkg/postage/batchservice/batchservice.go @@ -206,7 +206,7 @@ func (svc *batchService) UpdatePrice(price *big.Int, txHash common.Hash) error { return fmt.Errorf("update checksum: %w", err) } - svc.logger.Debug("updated chain price", "new_price", price, txHash, "tx_checksum", sum) + svc.logger.Debug("updated chain price", "new_price", price, "tx_hash", txHash, "tx_checksum", sum) return nil } From 96d6c1fe2c9953c6b6cf7ca3f810cd33cd86f546 Mon Sep 17 00:00:00 2001 From: notanatol Date: Tue, 1 Nov 2022 17:29:29 +0100 Subject: [PATCH 3/3] fix: renamings --- pkg/p2p/libp2p/connections_test.go | 4 +--- pkg/p2p/libp2p/libp2p.go | 6 +++--- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/pkg/p2p/libp2p/connections_test.go b/pkg/p2p/libp2p/connections_test.go index 52bba9b005e..f653dc57dc3 100644 --- a/pkg/p2p/libp2p/connections_test.go +++ b/pkg/p2p/libp2p/connections_test.go @@ -543,9 +543,7 @@ func TestConnectRepeatHandshake(t *testing.T) { t.Fatal(err) } - s := s2.WrapStream(stream) - - if _, err := s2.HandshakeService().Handshake(ctx, s, info.Addrs[0], info.ID); err != nil { + if _, err := s2.HandshakeService().Handshake(ctx, s2.WrapStream(stream), info.Addrs[0], info.ID); err != nil { t.Fatal(err) } diff --git a/pkg/p2p/libp2p/libp2p.go b/pkg/p2p/libp2p/libp2p.go index 63fa21697c0..29cd5d0cb69 100644 --- a/pkg/p2p/libp2p/libp2p.go +++ b/pkg/p2p/libp2p/libp2p.go @@ -317,8 +317,8 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay h.Network().Notify(peerRegistry) // update peer registry on network events h.Network().Notify(connMetricNotify) - streamMetricNotify := newStreamMetricNotify(s.metrics) - h.Network().Notify(streamMetricNotify) + streamNotify := newStreamNotifier(s.metrics) + h.Network().Notify(streamNotify) return s, nil } @@ -1084,7 +1084,7 @@ func (c *connectionNotifier) Connected(_ network.Network, _ network.Conn) { c.metrics.HandledConnectionCount.Inc() } -func newStreamMetricNotify(m metrics) *streamNotifier { +func newStreamNotifier(m metrics) *streamNotifier { return &streamNotifier{ metrics: m, Notifiee: new(network.NoopNotifiee),