Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Duplicate callback found for "0:Connection.Unblocked" #1052

Open
yuxifu opened this issue May 25, 2018 · 9 comments
Open

Duplicate callback found for "0:Connection.Unblocked" #1052

yuxifu opened this issue May 25, 2018 · 9 comments

Comments

@yuxifu
Copy link

yuxifu commented May 25, 2018

We are using BlockingConnection and basic_get and no callback function. How come we got a lot of Duplicate callback found for "0:Connection.Unblocked" errors? We use pika 0.11.2

Any idea? or thoughts?

@lukebakken
Copy link
Member

Hello! Please share the code you are using. I will re-open this issue if I can reproduce. Thanks!

@yuxifu
Copy link
Author

yuxifu commented May 30, 2018

It's not always reproducible. Basically this is what I do:

def consume_until_empty_or_expire(self, channel, callback=None, expires_sec=0, echo=False):
        """
        consume the messages in the queue until no message available
        callback: func(event_data)
        expires_sec: quit consuming after certain time. <=0, never expires.
        """
        start_time = time.time()
        if callback is None:
            callback = self._default_consume_until_empty_callback
        total = 0
        processed = 0
        returned = 0
        skipped = 0
        message_count = 0
        while True:
            elapsed_time = time.time() - start_time
            if expires_sec > 0 and elapsed_time >= expires_sec:
                return {
                    'success': True,
                    'total': total,
                    'processed': processed,
                    'returned': returned,
                    'skipped': skipped,
                    'message_count': message_count,
                    'elapsed_time_sec': elapsed_time
                }
            method_frame, header_frame, body = channel.basic_get(
                queue=self.queue_name)
            if method_frame is None or method_frame.NAME == 'Basic.GetEmpty':
                return {
                    'success': True,
                    'total': total,
                    'processed': processed,
                    'returned': returned,
                    'skipped': skipped,
                    'message_count': message_count,
                    'elapsed_time_sec': elapsed_time
                }
            else:
                event_data = {}
                event_data['frame'] = vars(method_frame)
                event_data['header'] = vars(header_frame)
                event_data['body'] = json.loads(body)
                message_count = event_data['frame']['message_count']
                total += 1
                result = callback(event_data)
                if result == EventProcessingResult.PROCESSED:
                    channel.basic_ack(delivery_tag=method_frame.delivery_tag)
                    processed += 1
                if result == EventProcessingResult.SKIPPED:
                    channel.basic_ack(delivery_tag=method_frame.delivery_tag)
                    skipped += 1
                if result == EventProcessingResult.RETURED:
                    returned += 1
                if echo:
                    print event_data['frame']['routing_key'] + ': ' + result

Nothing really fancy. Probably this is related:

if instance.queue_ttl_min > 0:
                queue_args = {"x-expires": instance.queue_ttl_min * 60000}
            channel.queue_declare(
                instance.queue_name,
                durable=instance.queue_durable,
                arguments=queue_args)

I did not really see the problem until started to use "x-expires". Could be a coincidence, though.

@benizri-ofir
Copy link

Hi,
I'm getting the same messages :
Duplicate callback found for "0:Connection.Unblocked"

did you solve this?

@lukebakken
Copy link
Member

@benizri-ofir - please provide code that can reproduce this, or at the very least, a working set of code or steps that might reproduce the issue.

@lukebakken lukebakken self-assigned this Aug 27, 2018
@lukebakken lukebakken modified the milestone: 0.12.1 Aug 27, 2018
@lukebakken lukebakken removed their assignment Aug 27, 2018
@lukebakken lukebakken added this to the 0.12.1 milestone Aug 27, 2018
@lukebakken lukebakken self-assigned this Aug 27, 2018
@nkborisov
Copy link

Hi
did you resolve that bug? I'm getting same stuff with BlockingConnection

@lukebakken
Copy link
Member

@nkborisov no, I never received the requested information. There is no evidence at this point of a bug in Pika.

@nkborisov
Copy link

@lukebakken Thank you for your answer
So let me try to describe my case, throught i'm not sure, that it's exactly a bug of pika. Perhaps it's some kind of wrong using scenario or something like that.
I have following simple code:


class QueueConnection(object):
    def __init__(self, pair_node_id, conn_params, msg_queue, is_master):
        self.is_master = is_master
        self.reconnect_delay = 1  # sec
        self.node_id = pair_node_id
        self.thread = None
        self.is_running = False
        self.connection = None
        self.attempts_count = 1
        self.channel = None
        self.conn_params = conn_params
        self.msg_queue = msg_queue
        self.queue_name = None  

    @staticmethod
    def retrieve_queue_name(node_id, pair_node_id, is_master):
        nodes = sorted([int(node_id), int(pair_node_id)])
        queue_name = "{}_{}_".format(nodes[0], nodes[1])
        queue_name += "TX_QUEUE" if is_master else "RX_QUEUE"
        return queue_name

    def start(self):
        self._connect(self.conn_params)
        if self.is_running:
            raise RuntimeError("{} is already running for node #{}".format(self.__class__.__name__, self.node_id))
        self.is_running = True
        self.thread = Thread(target=self._loop, name="Thread [{}({})]".format(self.__class__.__name__, self.node_id))
        self.thread.start()

    def stop(self):
        if not self.is_running or self.thread is None:
            return
        self._disconnect()
        self.is_running = False
        try:
            self.thread.join(timeout=30)
        except RuntimeError:
            log.warning("Thread[{}({})] join timeout".format(self.__class__.__name__, self.node_id))

    def _connect(self, conn_params):
        self.connection = pika.BlockingConnection(conn_params)
        self.channel = self.connection.channel()
        self.channel.queue_declare(queue=self.queue_name, auto_delete=False, durable=False)
        self.channel.basic_qos(prefetch_count=1)

    def _disconnect(self):
        if not self.connection.is_closed:
            self.channel.cancel()
            self.channel.close()
            self.connection.close()
            self.channel.queue_delete(queue=self.queue_name)

    def _reconnect(self):
        log.info('Connection recovering {} with node #{}, attempt {}...'.format(self.__class__.__name__,
                                                                                self.node_id,
                                                                                self.attempts_count))
        try:
            log.info('Connection was deleted')
            self._connect(self.conn_params)
        except Exception as e:
            log.error('Exception caught while reconnecting attempt: {}: {}'
                      ' (next will be performed after {} seconds...)'.format(e, e.message,
                                                                             self.reconnect_delay))
            sleep(self.reconnect_delay)
            self.attempts_count += 1
            return
        self.attempts_count = 1
        log.info('Connection {} with node #{} was successfully recovered'.format(self.__class__.__name__,
                                                                                 self.node_id))

    def _process_queue(self):
        raise NotImplementedError()

    def _loop(self):
        assert self.connection is not None
        loop_name = self.__class__.__name__
        while self.is_running:
            if self.connection.is_open:
                try:
                    self._process_queue()
                except pika.exceptions.ConnectionClosed:
                    continue
                except pika.exceptions.AMQPChannelError as exc:
                    log.error("{} channel error {}".format(loop_name, exc))
                    continue
                except pika.exceptions.AMQPConnectionError as exc:
                    log.error("{} connection error {}".format(loop_name, exc))
                    continue
                except NotImplementedError:
                    log.error("{} process_queue method isn't implemented".format(loop_name))
                    self.is_running = False
                    self._disconnect()
            elif self.connection.is_closed:
                self._reconnect()

That class is base for two inheritances - SenderConnection and ReceiverConnection (just dummy handlers of user queues).
Top-level code has been creating one instance of each class with following connection parameters:

conn_params = pika.ConnectionParameters(host=rabbit_host, port=int(rabbit_port), socket_timeout=1, heartbeat=server_heartbeat_interval, blocked_connection_timeout=server_heartbeat_interval * 2, credentials=ExternalCredentials(), ssl=True, ssl_options=ssl_opts)

Next in RabbitMQ management plugin has been reproducing connection failure situation by force closing. As a result i got following log every time:

2020-01-14 07:50:48,855|WRN|7f25c6a9a700|pika.callback|Duplicate callback found for "0:Connection.Blocked"
2020-01-14 07:50:48,856|WRN|7f25c6a9a700|pika.callback|Duplicate callback found for "0:Connection.Unblocked"
2020-01-14 07:50:48,856|ERR|7f25c6a9a700|pika.adapters.blocking_connection|Connection close detected; result=BlockingConnection__OnClosedArgs(connection=<SelectConnection CLOSED socket=None params=>, reason_code=320, reason_text='CONNECTION_FORCED - Closed via management plugin')
2020-01-14 07:50:48,857|WRN|7f25c6a9a700|queue_connection|ReceiverQueueConnection connection with node #2 lost
2020-01-14 07:50:48,857|INF|7f25c6a9a700|queue_connection|Connection recovering ReceiverQueueConnection with node #2, attempt 1...
2020-01-14 07:50:48,859|INF|7f25c6a9a700|queue_connection|Connection was deleted
2020-01-14 07:50:48,903|INF|7f25c6a9a700|queue_connection|Connection ReceiverQueueConnection with node #2 was successfully recovered

"Duplicate callback" log warning appears every time just before expected message about the connection closing event. By debugging i understood that the warning message always appears before any calls of BlockingConnection constructors and destructors. It seems very strange for me.

@lukebakken
Copy link
Member

lukebakken commented Jan 14, 2020

I wouldn't worry too much about it. See #894 and #192

At some point I may have time to track that warning down to fix it. Not any time soon.

@lukebakken lukebakken modified the milestones: 0.13.0, 1.1.1 Jan 14, 2020
@lukebakken lukebakken reopened this Jan 14, 2020
@lukebakken lukebakken modified the milestones: 1.1.1, 1.2.0 Feb 24, 2020
@lukebakken lukebakken removed this from the 1.2.0 milestone Jan 27, 2021
@ikiss
Copy link

ikiss commented Jun 30, 2021

The problem occurred also in my case (for both blocked/unblocked callbacks).
Did some additional tracing and looks like during the Connection::_on_terminate call block/unblock callbacks are unregistered and a connection reset is performed - where the block/unblock callbacks are re-registered.
However the unregister callback from Callback::remove has a bug - in case you do not provide a callback_value - it will not remove all callbacks for the provided prefix/key - as mentioned in the documentation

 def remove(self, prefix, key, callback_value=None, arguments=None):
        """Remove a callback from the stack by prefix, key and optionally
        the callback itself. **If you only pass in prefix and key, all
        callbacks for that prefix and key will be removed.**

        :param str or int prefix: The prefix for keeping track of callbacks with
        :param str key: The callback key
        :param method callback_value: The method defined to call on callback
        :param dict arguments: Optional arguments to check
        :rtype: bool

        """
        if callback_value:
            offsets_to_remove = list()
            for offset in xrange(len(self._stack[prefix][key]), 0, -1):
                callback_dict = self._stack[prefix][key][offset - 1]

                if (callback_dict[self.CALLBACK] == callback_value and
                    self._arguments_match(callback_dict, [arguments])):
                    offsets_to_remove.append(offset - 1)

            for offset in offsets_to_remove:
                try:
                    LOGGER.debug('Removing callback #%i: %r', offset,
                                 self._stack[prefix][key][offset])
                    del self._stack[prefix][key][offset]
                except KeyError:
                    pass
        self._cleanup_callback_dict(prefix, key)

        return True

The solution would be to call the remove_all method in case the callback_value is not provided

 def remove(self, prefix, key, callback_value=None, arguments=None):
        """Remove a callback from the stack by prefix, key and optionally
        the callback itself. If you only pass in prefix and key, all
        callbacks for that prefix and key will be removed.

        :param str or int prefix: The prefix for keeping track of callbacks with
        :param str key: The callback key
        :param method callback_value: The method defined to call on callback
        :param dict arguments: Optional arguments to check
        :rtype: bool

        """
        if callback_value:
            offsets_to_remove = list()
            for offset in xrange(len(self._stack[prefix][key]), 0, -1):
                callback_dict = self._stack[prefix][key][offset - 1]

                if (callback_dict[self.CALLBACK] == callback_value and
                    self._arguments_match(callback_dict, [arguments])):
                    offsets_to_remove.append(offset - 1)

            for offset in offsets_to_remove:
                try:
                    LOGGER.debug('Removing callback #%i: %r', offset,
                                 self._stack[prefix][key][offset])
                    del self._stack[prefix][key][offset]
                except KeyError:
                    pass
            self._cleanup_callback_dict(prefix, key)
        **else:
            self.remove_all(prefix, key)**

        return True

Apparently this problem was introduced with #192

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants