forked from apache/skywalking-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add aioredis, aiormq, amqp, asyncpg, aio-pika, kombu RMQ (apache#230)
* improved ignore path regex * update test * fix sw_psycopg2 register_type() * fix complexity level * fix psycopg2 register_type() second arg default * fix rabbitmq BlockingChannel consume cb span link * add BlockingChannel.consume() instrumentation * fix rabbit basic_get(), Makefile missing packages * fix again for BlockingChannel.basic_get() * fix tornado socket == None * aioredis, aiormq, amqp, asyncpg, aio_pika, kombu
- Loading branch information
Showing
6 changed files
with
379 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
# | ||
# Licensed to the Apache Software Foundation (ASF) under one or more | ||
# contributor license agreements. See the NOTICE file distributed with | ||
# this work for additional information regarding copyright ownership. | ||
# The ASF licenses this file to You under the Apache License, Version 2.0 | ||
# (the "License"); you may not use this file except in compliance with | ||
# the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
# | ||
|
||
from skywalking import Layer, Component | ||
from skywalking.trace.context import get_context | ||
from skywalking.trace.tags import TagDbType, TagDbInstance, TagDbStatement | ||
|
||
link_vector = ['https://aioredis.readthedocs.io/'] | ||
support_matrix = { | ||
'aioredis': { | ||
'>=3.6': ['2.0.1'] | ||
} | ||
} | ||
note = """""" | ||
|
||
|
||
def install(): | ||
from aioredis import Redis | ||
|
||
async def _sw_execute_command(self, op, *args, **kwargs): | ||
connargs = self.connection_pool.connection_kwargs | ||
peer = f'{connargs.get("host", "localhost")}:{connargs.get("port", 6379)}' | ||
|
||
context = get_context() | ||
with context.new_exit_span(op=f'Redis/AIORedis/{op}' or '/', peer=peer, component=Component.AIORedis) as span: | ||
span.layer = Layer.Cache | ||
|
||
span.tag(TagDbType('Redis')) | ||
span.tag(TagDbInstance(str(connargs.get('db', 0)))) | ||
span.tag(TagDbStatement(op)) | ||
|
||
return await _execute_command(self, op, *args, **kwargs) | ||
|
||
_execute_command = Redis.execute_command | ||
Redis.execute_command = _sw_execute_command | ||
|
||
|
||
# Example code for someone who might want to make tests: | ||
# | ||
# async def aioredis_(): | ||
# redis = aioredis.from_url("redis://localhost") | ||
# await redis.set("my-key", "value") | ||
# value = await redis.get("my-key") | ||
# print(value) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,104 @@ | ||
# | ||
# Licensed to the Apache Software Foundation (ASF) under one or more | ||
# contributor license agreements. See the NOTICE file distributed with | ||
# this work for additional information regarding copyright ownership. | ||
# The ASF licenses this file to You under the Apache License, Version 2.0 | ||
# (the "License"); you may not use this file except in compliance with | ||
# the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
# | ||
|
||
from skywalking import Layer, Component | ||
from skywalking.trace.carrier import Carrier | ||
from skywalking.trace.context import get_context | ||
from skywalking.trace.tags import TagMqBroker, TagMqTopic, TagMqQueue | ||
|
||
# version_rule = { | ||
# "name": "aiormq", | ||
# "rules": [">=3.3.1"] | ||
# } | ||
|
||
link_vector = ['https://pypi.org/project/aiormq/'] | ||
support_matrix = { | ||
'aiormq': { | ||
'>=3.6': ['3.3.1', '6.4.1'] | ||
} | ||
} | ||
note = """""" | ||
|
||
|
||
def install(): | ||
from aiormq import Channel | ||
from aiormq.tools import awaitable | ||
|
||
try: | ||
from pamqp import commands as spec # aiormq v6.4.1 | ||
except ImportError: | ||
from pamqp import specification as spec # aiormq v3.3.1 | ||
|
||
async def _sw_basic_publish(self, body, exchange='', routing_key='', properties=None, **kwargs): | ||
url = self.connection.url | ||
peer = f'{url.host}:{url.port}' if url.port else url.host | ||
context = get_context() | ||
|
||
with context.new_exit_span(op=f'RabbitMQ/Topic/{exchange}/Queue/{routing_key}/Producer', | ||
peer=peer, component=Component.RabbitmqProducer) as span: | ||
span.tag(TagMqBroker(peer)) | ||
span.tag(TagMqTopic(exchange)) | ||
span.tag(TagMqQueue(routing_key)) | ||
|
||
span.layer = Layer.MQ | ||
carrier = span.inject() | ||
|
||
if properties is None: | ||
properties = spec.Basic.Properties(delivery_mode=1) | ||
|
||
headers = getattr(properties, 'headers', None) | ||
|
||
if headers is None: | ||
headers = properties.headers = {} | ||
|
||
for item in carrier: | ||
headers[item.key] = item.val | ||
|
||
return await _basic_publish(self, body, exchange=exchange, routing_key=routing_key, properties=properties, **kwargs) | ||
|
||
async def _sw_basic_consume(self, queue, consumer_callback, *args, **kwargs): | ||
async def _callback(msg): | ||
context = get_context() | ||
url = self.connection.url | ||
peer = f'{url.host}:{url.port}' if url.port else url.host | ||
exchange = msg.delivery.exchange | ||
routing_key = msg.delivery.routing_key | ||
headers = msg.header.properties.headers | ||
carrier = Carrier() | ||
|
||
for item in carrier: | ||
if item.key in headers: | ||
val = headers.get(item.key) | ||
if val is not None: | ||
item.val = val if isinstance(val, str) else val.decode() | ||
|
||
with context.new_entry_span(op='RabbitMQ/Topic/' + exchange + '/Queue/' + routing_key | ||
+ '/Consumer' or '', carrier=carrier) as span: | ||
span.layer = Layer.MQ | ||
span.component = Component.RabbitmqConsumer | ||
span.tag(TagMqBroker(peer)) | ||
span.tag(TagMqTopic(exchange)) | ||
span.tag(TagMqQueue(routing_key)) | ||
|
||
return await awaitable(consumer_callback)(msg) | ||
|
||
return await _basic_consume(self, queue, _callback, *args, **kwargs) | ||
|
||
_basic_publish = Channel.basic_publish | ||
_basic_consume = Channel.basic_consume | ||
Channel.basic_publish = _sw_basic_publish | ||
Channel.basic_consume = _sw_basic_consume |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
# | ||
# Licensed to the Apache Software Foundation (ASF) under one or more | ||
# contributor license agreements. See the NOTICE file distributed with | ||
# this work for additional information regarding copyright ownership. | ||
# The ASF licenses this file to You under the Apache License, Version 2.0 | ||
# (the "License"); you may not use this file except in compliance with | ||
# the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
# | ||
|
||
from skywalking import Layer, Component | ||
from skywalking.trace.carrier import Carrier | ||
from skywalking.trace.context import get_context | ||
from skywalking.trace.tags import TagMqBroker, TagMqTopic, TagMqQueue | ||
|
||
link_vector = ['https://pypi.org/project/amqp/'] | ||
support_matrix = { | ||
'amqp': { | ||
'>=3.6': ['2.6.1'] | ||
} | ||
} | ||
note = """""" | ||
|
||
|
||
def install(): | ||
from amqp import Channel | ||
|
||
def _sw_basic_publish(self, msg, exchange='', routing_key='', *args, **kwargs): | ||
peer = getattr(self.connection, 'host', '<unavailable>') | ||
|
||
with get_context().new_exit_span(op=f'RabbitMQ/Topic/{exchange}/Queue/{routing_key}/Producer', | ||
peer=peer, component=Component.RabbitmqProducer) as span: | ||
span.tag(TagMqBroker(peer)) | ||
span.tag(TagMqTopic(exchange)) | ||
span.tag(TagMqQueue(routing_key)) | ||
|
||
span.layer = Layer.MQ | ||
carrier = span.inject() | ||
headers = getattr(msg, 'headers', None) | ||
|
||
if headers is None: | ||
headers = msg.headers = {} | ||
|
||
for item in carrier: | ||
headers[item.key] = item.val | ||
|
||
return _basic_publish(self, msg, exchange, routing_key, *args, **kwargs) | ||
|
||
def _sw_basic_consume(self, queue='', consumer_tag='', no_local=False, | ||
no_ack=False, exclusive=False, nowait=False, | ||
callback=None, *args, **kwargs): | ||
def _callback(msg): | ||
peer = getattr(self.connection, 'host', '<unavailable>') | ||
delivery_info = getattr(msg, 'delivery_info', {}) | ||
exchange = delivery_info.get('exchange', '<unavailable>') | ||
routing_key = delivery_info.get('routing_key', '<unavailable>') | ||
headers = getattr(msg, 'headers', {}) | ||
carrier = Carrier() | ||
|
||
for item in carrier: | ||
if item.key in headers: | ||
val = headers.get(item.key) | ||
if val is not None: | ||
item.val = val | ||
|
||
with get_context().new_entry_span(op='RabbitMQ/Topic/' + exchange + '/Queue/' + routing_key | ||
+ '/Consumer' or '', carrier=carrier) as span: | ||
span.layer = Layer.MQ | ||
span.component = Component.RabbitmqConsumer | ||
span.tag(TagMqBroker(peer)) | ||
span.tag(TagMqTopic(exchange)) | ||
span.tag(TagMqQueue(routing_key)) | ||
|
||
return callback(msg) | ||
|
||
return _basic_consume(self, queue=queue, consumer_tag=consumer_tag, no_local=no_local, no_ack=no_ack, | ||
exclusive=exclusive, nowait=nowait, callback=_callback, *args, **kwargs) | ||
|
||
_basic_publish = Channel.basic_publish | ||
_basic_consume = Channel.basic_consume | ||
Channel.basic_publish = Channel._basic_publish = _sw_basic_publish | ||
Channel.basic_consume = _sw_basic_consume |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,122 @@ | ||
# | ||
# Licensed to the Apache Software Foundation (ASF) under one or more | ||
# contributor license agreements. See the NOTICE file distributed with | ||
# this work for additional information regarding copyright ownership. | ||
# The ASF licenses this file to You under the Apache License, Version 2.0 | ||
# (the "License"); you may not use this file except in compliance with | ||
# the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
# | ||
|
||
from skywalking import Layer, Component, config | ||
from skywalking.trace.context import get_context | ||
from skywalking.trace.tags import TagDbType, TagDbInstance, TagDbStatement, TagDbSqlParameters | ||
|
||
link_vector = ['https://github.com/MagicStack/asyncpg'] | ||
support_matrix = { | ||
'asyncpg': { | ||
'>=3.6': ['0.25.0'], | ||
} | ||
} | ||
note = """""" | ||
|
||
|
||
def install(): | ||
from asyncpg import Connection | ||
from asyncpg.protocol import Protocol | ||
|
||
def _sw_init(self, *args, **kwargs): | ||
_init(self, *args, **kwargs) | ||
self._protocol._addr = f'{self._addr[0]}:{self._addr[1]}' | ||
self._protocol._database = self._params.database | ||
|
||
async def __bind(proto, query, params, future, is_many=False): | ||
peer = getattr(proto, '_addr', '<unavailable>') # just in case | ||
|
||
with get_context().new_exit_span(op='PostgreSLQ/AsyncPG/bind', peer=peer, | ||
component=Component.AsyncPG) as span: | ||
span.layer = Layer.Database | ||
|
||
span.tag(TagDbType('PostgreSQL')) | ||
span.tag(TagDbInstance(getattr(proto, '_database', '<unavailable>'))) | ||
span.tag(TagDbStatement(query)) | ||
|
||
if config.sql_parameters_length and params is not None: | ||
if not is_many: | ||
text = ','.join(str(v) for v in params) | ||
|
||
if len(text) > config.sql_parameters_length: | ||
text = f'{text[:config.sql_parameters_length]}...' | ||
|
||
span.tag(TagDbSqlParameters(f'[{text}]')) | ||
|
||
else: | ||
max_len = config.sql_parameters_length | ||
total_len = 0 | ||
text_list = [] | ||
|
||
for _params in params: | ||
text = f"[{','.join(str(v) for v in _params)}]" | ||
total_len += len(text) | ||
|
||
if total_len > max_len: | ||
text_list.append(f'{text[:max_len - total_len]}...') | ||
|
||
break | ||
|
||
text_list.append(text) | ||
|
||
span.tag(TagDbSqlParameters(f"[{','.join(text_list)}]")) | ||
|
||
return await future | ||
|
||
async def _sw_bind(proto, stmt, params, *args, **kwargs): | ||
return await __bind(proto, stmt.query, params, _bind(proto, stmt, params, *args, **kwargs)) | ||
|
||
async def _sw_bind_execute(proto, stmt, params, *args, **kwargs): | ||
return await __bind(proto, stmt.query, params, _bind_execute(proto, stmt, params, *args, **kwargs)) | ||
|
||
async def _sw_bind_execute_many(proto, stmt, params, *args, **kwargs): | ||
return await __bind(proto, stmt.query, params, _bind_execute_many(proto, stmt, params, *args, **kwargs), True) | ||
|
||
async def _sw_query(proto, query, *args, **kwargs): | ||
return await __bind(proto, query, (), _query(proto, query, *args, **kwargs)) | ||
|
||
# async def _sw_execute(proto, stmt, *args, **kwargs): # these may be useful in the future, left here for documentation purposes | ||
# async def _sw_prepare(*args, **kwargs): | ||
|
||
_init = Connection.__init__ | ||
_bind = Protocol.bind | ||
_bind_execute = Protocol.bind_execute | ||
_bind_execute_many = Protocol.bind_execute_many | ||
_query = Protocol.query | ||
# _execute = Protocol.execute | ||
# _prepare = Protocol.prepare | ||
|
||
Connection.__init__ = _sw_init | ||
Protocol.bind = _sw_bind | ||
Protocol.bind_execute = _sw_bind_execute | ||
Protocol.bind_execute_many = _sw_bind_execute_many | ||
Protocol.query = _sw_query | ||
# Protocol.execute = _sw_execute | ||
# Protocol.prepare = _sw_prepare | ||
|
||
|
||
# Example code for someone who might want to make tests: | ||
# | ||
# async def conncetAsycPg(): | ||
# power=3 | ||
# con = await asyncpg.connect(user='user',password='cv9493a32',database="db" , host='localhost') | ||
# await con.fetchval('SELECT 2 ^ $1', power) | ||
# types = await con.fetch('SELECT * FROM pg_type') | ||
# async with con.transaction(): | ||
# async for record in con.cursor('SELECT generate_series(0, 100)'): | ||
# print(record) | ||
# await con.close() |