Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Monitor command #1033

Merged
merged 1 commit into from
May 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 12 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,18 @@ supported:
>>> r.pubsub_numpat()
1204

Monitor
^^^^^^^
redis-py includes a `Monitor` object that that streams back every command
processed by the Redis server. Use `listen` on the `Monitor` object to block
until message available.

.. code-block:: pycon

>>> r = redis.StrictRedis(...)
>>> with sr.monitor() as m:
>>> for command in m.listen():
>>> print(command)

Lua Scripting
^^^^^^^^^^^^^
Expand Down
49 changes: 49 additions & 0 deletions redis/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import time
import threading
import time as mod_time
import re
import hashlib
from redis._compat import (basestring, imap, iteritems, iterkeys,
itervalues, izip, long, nativestr, safe_unicode)
Expand Down Expand Up @@ -764,6 +765,9 @@ def pubsub(self, **kwargs):
"""
return PubSub(self.connection_pool, **kwargs)

def monitor(self):
return Monitor(self.connection_pool)

# COMMAND EXECUTION AND PROTOCOL PARSING
def execute_command(self, *args, **options):
"Execute a command and return a parsed response"
Expand Down Expand Up @@ -2922,6 +2926,51 @@ def _georadiusgeneric(self, command, *args, **kwargs):
StrictRedis = Redis


class Monitor(object):
"""
Monitor is useful for handling the MONITOR command to the redis server.
next_command() method returns one command from monitor
listen() method yields commands from monitor.
"""
def __init__(self, connection_pool):
self.connection_pool = connection_pool
self.connection = self.connection_pool.get_connection('MONITOR')

def __enter__(self):
self.connection.send_command('MONITOR')
# check that monitor returns 'OK', but don't return it to user
response = self.connection.read_response()
if not bool_ok(response):
raise RedisError('MONITOR failed: %s' % response)
return self

def __exit__(self, *args):
self.connection.disconnect()
self.connection_pool.release(self.connection)

def next_command(self):
"Parse the response from a monitor command"
response = self.connection.read_response()
if isinstance(response, bytes):
response = self.connection.encoder.decode(response, force=True)
command_time, command_data = response.split(' ', 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is going to fail on Python3 with decode_responses=False. I'm looking at another solution for this now...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for now, I'm setting "If decode_responses=False, return monitor output without parsing".

m = re.match(r'\[(\d+) (.+):(\d+)\] (.*)', command_data)
db_id, client_address, client_port, command = m.groups()
command = re.match(r'"(\w*)"+', command).groups()
return {
'time': float(command_time),
'db': db_id,
'client_address': client_address,
'client_port': client_port,
'command': command
}

def listen(self):
"Listen for commands coming to the server."
while 1:
yield self.next_command()


class PubSub(object):
"""
PubSub provides publish, subscribe and listen support to Redis channels.
Expand Down
10 changes: 10 additions & 0 deletions tests/test_monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from __future__ import unicode_literals


class TestPipeline(object):
def test_monitor(self, r):
with r.monitor() as m:
r.ping()
response = m.next_command()
assert set(response.keys()) == {'time', 'db', 'client_address',
'client_port', 'command'}