Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add NEW_RELIC_K8S_OPERATOR_ENABLED #1127

Merged
merged 2 commits into from Apr 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion newrelic/bootstrap/sitecustomize.py
Expand Up @@ -121,8 +121,9 @@ def log_message(text, *args, **kwargs):

log_message("python_prefix_matches = %r", python_prefix_matches)
log_message("python_version_matches = %r", python_version_matches)
k8s_operator_enabled = os.environ.get("NEW_RELIC_K8S_OPERATOR_ENABLED", False)

if python_prefix_matches and python_version_matches:
if k8s_operator_enabled or (python_prefix_matches and python_version_matches):
# We also need to skip agent initialisation if neither the license
# key or config file environment variables are set. We do this as
# some people like to use a common startup script which always uses
Expand Down
16 changes: 1 addition & 15 deletions newrelic/config.py
Expand Up @@ -564,6 +564,7 @@ def _process_configuration(section):
_process_setting(section, "ai_monitoring.enabled", "getboolean", None)
_process_setting(section, "ai_monitoring.record_content.enabled", "getboolean", None)
_process_setting(section, "ai_monitoring.streaming.enabled", "getboolean", None)
_process_setting(section, "k8s_operator.enabled", "getboolean", None)
_process_setting(section, "package_reporting.enabled", "getboolean", None)


Expand Down Expand Up @@ -4362,26 +4363,11 @@ def _process_module_builtin_defaults():
"instrument_celery_worker",
)

_process_module_definition(
"celery.execute.trace",
"newrelic.hooks.application_celery",
"instrument_celery_execute_trace",
)
_process_module_definition(
"celery.task.trace",
"newrelic.hooks.application_celery",
"instrument_celery_execute_trace",
)
_process_module_definition(
"celery.app.base",
"newrelic.hooks.application_celery",
"instrument_celery_app_base",
)
_process_module_definition(
"celery.app.trace",
"newrelic.hooks.application_celery",
"instrument_celery_execute_trace",
)
_process_module_definition("billiard.pool", "newrelic.hooks.application_celery", "instrument_billiard_pool")

_process_module_definition("flup.server.cgi", "newrelic.hooks.adapter_flup", "instrument_flup_server_cgi")
Expand Down
10 changes: 9 additions & 1 deletion newrelic/core/config.py
Expand Up @@ -162,6 +162,10 @@ class AIMonitoringRecordContentSettings(Settings):
pass


class K8sOperatorSettings(Settings):
pass


class PackageReportingSettings(Settings):
pass

Expand Down Expand Up @@ -430,6 +434,7 @@ class EventHarvestConfigHarvestLimitSettings(Settings):
_settings.ai_monitoring = AIMonitoringSettings()
_settings.ai_monitoring.streaming = AIMonitoringStreamingSettings()
_settings.ai_monitoring.record_content = AIMonitoringRecordContentSettings()
_settings.k8s_operator = K8sOperatorSettings()
_settings.package_reporting = PackageReportingSettings()
_settings.attributes = AttributesSettings()
_settings.browser_monitoring = BrowserMonitorSettings()
Expand Down Expand Up @@ -745,7 +750,9 @@ def default_otlp_host(host):
_settings.gc_runtime_metrics.enabled = False
_settings.gc_runtime_metrics.top_object_count_limit = 5

_settings.memory_runtime_pid_metrics.enabled = _environ_as_bool("NEW_RELIC_MEMORY_RUNTIME_METRICS_ENABLED", default=True)
_settings.memory_runtime_pid_metrics.enabled = _environ_as_bool(
"NEW_RELIC_MEMORY_RUNTIME_METRICS_ENABLED", default=True
)

_settings.transaction_events.enabled = True
_settings.transaction_events.attributes.enabled = True
Expand Down Expand Up @@ -953,6 +960,7 @@ def default_otlp_host(host):
"NEW_RELIC_AI_MONITORING_RECORD_CONTENT_ENABLED", default=True
)
_settings.ai_monitoring._llm_token_count_callback = None
_settings.k8s_operator.enabled = _environ_as_bool("NEW_RELIC_K8S_OPERATOR_ENABLED", default=False)
_settings.package_reporting.enabled = _environ_as_bool("NEW_RELIC_PACKAGE_REPORTING_ENABLED", default=True)
_settings.ml_insights_events.enabled = _environ_as_bool("NEW_RELIC_ML_INSIGHTS_EVENTS_ENABLED", default=False)

Expand Down
110 changes: 49 additions & 61 deletions newrelic/hooks/application_celery.py
Expand Up @@ -28,43 +28,45 @@
from newrelic.api.message_trace import MessageTrace
from newrelic.api.pre_function import wrap_pre_function
from newrelic.api.transaction import current_transaction
from newrelic.common.object_names import callable_name
from newrelic.common.object_wrapper import FunctionWrapper, wrap_function_wrapper
from newrelic.core.agent import shutdown_agent

UNKNOWN_TASK_NAME = "<Unknown Task>"
MAPPING_TASK_NAMES = {"celery.starmap", "celery.map"}

def CeleryTaskWrapper(wrapped, application=None, name=None):
def wrapper(wrapped, instance, args, kwargs):
transaction = current_transaction(active_only=False)

if callable(name):
# Start Hotfix v2.2.1.
# if instance and inspect.ismethod(wrapped):
# _name = name(instance, *args, **kwargs)
# else:
# _name = name(*args, **kwargs)
def task_name(*args, **kwargs):
# Grab the current task, which can be located in either place
if args:
task = args[0]
elif "task" in kwargs:
task = kwargs["task"]
else:
return UNKNOWN_TASK_NAME # Failsafe

if instance is not None:
_name = name(instance, *args, **kwargs)
else:
_name = name(*args, **kwargs)
# End Hotfix v2.2.1.
# Task can be either a task instance or a signature, which subclasses dict, or an actual dict in some cases.
task_name = getattr(task, "name", None) or task.get("task", UNKNOWN_TASK_NAME)

elif name is None:
_name = callable_name(wrapped)
# Under mapping tasks, the root task name isn't descriptive enough so we append the
# subtask name to differentiate between different mapping tasks
if task_name in MAPPING_TASK_NAMES:
try:
subtask = kwargs["task"]["task"]
task_name = "/".join((task_name, subtask))
except Exception:
pass

else:
_name = name
return task_name

# Helper for obtaining the appropriate application object. If
# has an activate() method assume it is a valid application
# object. Don't check by type so se can easily mock it for
# testing if need be.

def _application():
if hasattr(application, "activate"):
return application
return application_instance(application)
def CeleryTaskWrapper(wrapped):
def wrapper(wrapped, instance, args, kwargs):
transaction = current_transaction(active_only=False)

if instance is not None:
_name = task_name(instance, *args, **kwargs)
else:
_name = task_name(*args, **kwargs)

# A Celery Task can be called either outside of a transaction, or
# within the context of an existing transaction. There are 3
Expand Down Expand Up @@ -95,13 +97,14 @@ def _application():
return wrapped(*args, **kwargs)

else:
with BackgroundTask(_application(), _name, "Celery", source=instance) as transaction:
with BackgroundTask(application_instance(), _name, "Celery", source=instance) as transaction:
# Attempt to grab distributed tracing headers
try:
# Headers on earlier versions of Celery may end up as attributes
# on the request context instead of as custom headers. Handler this
# by defaulting to using vars() if headers is not available
headers = getattr(wrapped.request, "headers", None) or vars(wrapped.request)
request = instance.request
headers = getattr(request, "headers", None) or vars(request)

settings = transaction.settings
if headers is not None and settings is not None:
Expand All @@ -128,20 +131,30 @@ def _application():
# instrumentation via FunctionWrapper() relies on __call__ being called which
# in turn executes the wrapper() function defined above. Since the micro
# optimization bypasses __call__ method it breaks our instrumentation of
# celery. To circumvent this problem, we added a run() attribute to our
# celery.
#
# For versions of celery 2.5.3 to 2.5.5+
# Celery has included a monkey-patching provision which did not perform this
# optimization on functions that were monkey-patched. Unfortunately, our
# wrappers are too transparent for celery to detect that they've even been
# monky-patched. To circumvent this, we set the __module__ of our wrapped task
# to this file which causes celery to properly detect that it has been patched.
#
# For versions of celery 2.5.3 to 2.5.5
# To circumvent this problem, we added a run() attribute to our
# FunctionWrapper which points to our __call__ method. This causes Celery
# to execute our __call__ method which in turn applies the wrapper
# correctly before executing the task.
#
# This is only a problem in Celery versions 2.5.3 to 2.5.5. The later
# versions included a monkey-patching provision which did not perform this
# optimization on functions that were monkey-patched.

class TaskWrapper(FunctionWrapper):
def run(self, *args, **kwargs):
return self.__call__(*args, **kwargs)

return TaskWrapper(wrapped, wrapper)
wrapped_task = TaskWrapper(wrapped, wrapper)
# Reset __module__ to be less transparent so celery detects our monkey-patching
wrapped_task.__module__ = CeleryTaskWrapper.__module__

return wrapped_task


def instrument_celery_app_task(module):
Expand All @@ -162,11 +175,8 @@ def instrument_celery_app_task(module):
# the task doesn't pass through it. For Celery 2.5+ need to wrap
# the tracer instead.

def task_name(task, *args, **kwargs):
return task.name

if module.BaseTask.__module__ == module.__name__:
module.BaseTask.__call__ = CeleryTaskWrapper(module.BaseTask.__call__, name=task_name)
module.BaseTask.__call__ = CeleryTaskWrapper(module.BaseTask.__call__)


def wrap_Celery_send_task(wrapped, instance, args, kwargs):
Expand Down Expand Up @@ -195,28 +205,6 @@ def instrument_celery_app_base(module):
wrap_function_wrapper(module, "Celery.send_task", wrap_Celery_send_task)


def instrument_celery_execute_trace(module):
# Triggered for 'celery.execute_trace'.

if hasattr(module, "build_tracer"):
# Need to add a wrapper for background task entry point.

# In Celery 2.5+ we need to wrap the task when tracer is being
# created. Note that in Celery 2.5 the 'build_tracer' function
# actually resided in the module 'celery.execute.task'. In
# Celery 3.0 the 'build_tracer' function moved to
# 'celery.task.trace'.

_build_tracer = module.build_tracer

def build_tracer(name, task, *args, **kwargs):
task = task or module.tasks[name]
task = CeleryTaskWrapper(task, name=name)
return _build_tracer(name, task, *args, **kwargs)

module.build_tracer = build_tracer


def instrument_celery_worker(module):
# Triggered for 'celery.worker' and 'celery.concurrency.processes'.

Expand Down
7 changes: 6 additions & 1 deletion tests/application_celery/_target_application.py
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from celery import Celery
from celery import Celery, shared_task
from testing_support.validators.validate_distributed_trace_accepted import (
validate_distributed_trace_accepted,
)
Expand Down Expand Up @@ -44,6 +44,11 @@ def nested_add(x, y):
return add(x, y)


@shared_task
def shared_task_add(x, y):
return x + y


@app.task
@validate_distributed_trace_accepted(transport_type="AMQP")
def assert_dt():
Expand Down
21 changes: 21 additions & 0 deletions tests/application_celery/conftest.py
Expand Up @@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest
from testing_support.fixtures import ( # noqa: F401; pylint: disable=W0611
collector_agent_registration_fixture,
collector_available_fixture,
Expand All @@ -27,3 +28,23 @@
collector_agent_registration = collector_agent_registration_fixture(
app_name="Python Agent Test (application_celery)", default_settings=_default_settings
)


@pytest.fixture(scope="session")
def celery_config():
# Used by celery pytest plugin to configure Celery instance
return {
"broker_url": "memory://",
"result_backend": "cache+memory://",
}


@pytest.fixture(scope="session")
def celery_worker_parameters():
# Used by celery pytest plugin to configure worker instance
return {"shutdown_timeout": 120}


@pytest.fixture(scope="session", autouse=True)
def celery_worker_available(celery_session_worker):
yield celery_session_worker
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from _target_application import add, nested_add, tsum
from _target_application import add, nested_add, shared_task_add, tsum
from testing_support.validators.validate_code_level_metrics import (
validate_code_level_metrics,
)
Expand All @@ -28,7 +28,7 @@


@validate_transaction_metrics(
name="test_celery:test_celery_task_as_function_trace",
name="test_application:test_celery_task_as_function_trace",
scoped_metrics=[("Function/_target_application.add", 1)],
background_task=True,
)
Expand Down Expand Up @@ -58,7 +58,7 @@ def test_celery_task_as_background_task():


@validate_transaction_metrics(
name="test_celery:test_celery_tasks_multiple_function_traces",
name="test_application:test_celery_tasks_multiple_function_traces",
scoped_metrics=[("Function/_target_application.add", 1), ("Function/_target_application.tsum", 1)],
background_task=True,
)
Expand Down Expand Up @@ -90,7 +90,7 @@ def test_celery_tasks_ignore_transaction():


@validate_transaction_metrics(
name="test_celery:test_celery_tasks_end_transaction",
name="test_application:test_celery_tasks_end_transaction",
scoped_metrics=[("Function/_target_application.add", 1)],
background_task=True,
)
Expand Down Expand Up @@ -126,3 +126,18 @@ def test_celery_nested_tasks():

add_result = nested_add(1, 2)
assert add_result == 3


@validate_transaction_metrics(
name="_target_application.shared_task_add", group="Celery", scoped_metrics=[], background_task=True
)
@validate_code_level_metrics("_target_application", "shared_task_add")
def test_celery_shared_task_as_background_task():
"""
Calling shared_task_add() outside of a transaction means the agent will create
a background transaction (with a group of 'Celery') and record shared_task_add()
as a background task.

"""
result = shared_task_add(3, 4)
assert result == 7