Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix infinite tracing spec compliance #430

Merged
merged 8 commits into from Nov 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
82 changes: 60 additions & 22 deletions newrelic/core/agent_streaming.py
Expand Up @@ -17,7 +17,8 @@

try:
import grpc
from newrelic.core.infinite_tracing_pb2 import Span, RecordStatus

from newrelic.core.infinite_tracing_pb2 import RecordStatus, Span
except ImportError:
grpc = None

Expand All @@ -33,25 +34,39 @@ class StreamingRpc(object):
"""

PATH = "/com.newrelic.trace.v1.IngestService/RecordSpan"
RETRY_POLICY = (
(15, False),
(15, False),
(30, False),
(60, False),
(120, False),
(300, True),
)
OPTIONS = [("grpc.enable_retries", 0)]

def __init__(self, endpoint, stream_buffer, metadata, record_metric, ssl=True):
if ssl:
credentials = grpc.ssl_channel_credentials()
channel = grpc.secure_channel(endpoint, credentials)
else:
channel = grpc.insecure_channel(endpoint)
self.channel = channel
self._endpoint = endpoint
self._ssl = ssl
self.metadata = metadata
self.request_iterator = stream_buffer
self.response_processing_thread = threading.Thread(
target=self.process_responses, name="NR-StreamingRpc-process-responses"
)
self.response_processing_thread.daemon = True
self.notify = self.condition()
self.rpc = self.channel.stream_stream(
self.PATH, Span.SerializeToString, RecordStatus.FromString
)
self.record_metric = record_metric
self.closed = False

self.create_channel()

def create_channel(self):
if self._ssl:
credentials = grpc.ssl_channel_credentials()
self.channel = grpc.secure_channel(self._endpoint, credentials, options=self.OPTIONS)
else:
self.channel = grpc.insecure_channel(self._endpoint, options=self.OPTIONS)

self.rpc = self.channel.stream_stream(self.PATH, Span.SerializeToString, RecordStatus.FromString)

@staticmethod
def condition(*args, **kwargs):
Expand All @@ -63,6 +78,7 @@ def close(self):
if self.channel:
channel = self.channel
self.channel = None
self.closed = True
self.notify.notify_all()

if channel:
Expand All @@ -80,6 +96,7 @@ def connect(self):
def process_responses(self):
response_iterator = None

retry = 0
while True:
with self.notify:
if self.channel and response_iterator:
Expand Down Expand Up @@ -112,21 +129,42 @@ def process_responses(self):
)
break

_logger.warning(
"Streaming RPC closed. "
"Will attempt to reconnect in 15 seconds. "
"Code: %s Details: %s",
code,
details,
)
self.notify.wait(15)
# Unpack retry policy settings
if retry >= len(self.RETRY_POLICY):
retry_time, error = self.RETRY_POLICY[-1]
else:
retry_time, error = self.RETRY_POLICY[retry]
retry += 1

# Emit appropriate retry logs
if not error:
_logger.warning(
"Streaming RPC closed. Will attempt to reconnect in %d seconds. Check the prior log entries and remedy any issue as necessary, or if the problem persists, report this problem to New Relic support for further investigation. Code: %s Details: %s",
retry_time,
code,
details,
)
else:
_logger.error(
"Streaming RPC closed after additional attempts. Will attempt to reconnect in %d seconds. Please report this problem to New Relic support for further investigation. Code: %s Details: %s",
retry_time,
code,
details,
)

# Reconnect channel with backoff
self.channel.close()
self.notify.wait(retry_time)
if self.closed:
break
else:
_logger.debug("Attempting to reconnect Streaming RPC.")
self.create_channel()

if not self.channel:
if self.closed:
break

response_iterator = self.rpc(
self.request_iterator, metadata=self.metadata
)
response_iterator = self.rpc(self.request_iterator, metadata=self.metadata)
_logger.info("Streaming RPC connect completed.")

try:
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -156,7 +156,7 @@ def build_extension(self, ext):
"newrelic": ["newrelic.ini", "version.txt", "packages/urllib3/LICENSE.txt", "common/cacert.pem"],
},
scripts=["scripts/newrelic-admin"],
extras_require={"infinite-tracing": ["grpcio<1.40", "protobuf<4"]},
extras_require={"infinite-tracing": ["grpcio", "protobuf<4"]},
)

if with_setuptools:
Expand Down
4 changes: 3 additions & 1 deletion tox.ini
Expand Up @@ -114,7 +114,9 @@ envlist =
python-framework_fastapi-{py36,py37,py38,py39,py310},
python-framework_flask-{pypy,py27}-flask0012,
python-framework_flask-{pypy,py27,py36,py37,py38,py39,py310,pypy3}-flask0101,
python-framework_flask-{py37,py38,py39,py310,pypy3}-flask{latest,master},
;temporarily disabling tests on flask master
; python-framework_flask-{py37,py38,py39,py310,pypy3}-flask{latest,master},
python-framework_flask-{py37,py38,py39,py310,pypy3}-flask{latest},
python-framework_graphene-{py27,py36,py37,py38,py39,py310,pypy,pypy3}-graphenelatest,
python-framework_graphene-py37-graphene{0200,0201},
python-framework_graphql-{py27,py36,py37,py38,py39,py310,pypy,pypy3}-graphql02,
Expand Down