- Sponsor
-
Notifications
You must be signed in to change notification settings - Fork 333
Stuff SkipMessage and fail skipped messages #449
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Fixes Bogdanp#440 Stuff SkipMessage exceptions into the message proxy to ensure that the result is consistent when used in conjunction with the Results middleware. Additionally, move the message.fail() call into the SkipMessage handling to ensure that skipped messages are nack'd rather than ack'd. Note that if failing the message is left to the middleware, then unintentionally omitting message.fail() can lead to no result ever being stored by the results middleware for the message. If it should be left to the middleware to decide to fail the skipped message then it would be more sensible to have fail as the default. This could be achieved by making the SkipMessage exception take an optional "fail_message" parameter that defaults to True, and have the message conditionally fail() inside the worker SkipMessage exception handling. Custom middleware could then raise SkipMessage with fail_message=False to override this behaviour and lead to the message being skipped and ack'd rather than skipped and nack'd.
dramatiq/worker.py
Outdated
@@ -492,15 +495,13 @@ def process_message(self, message): | |||
|
|||
self.broker.emit_after("process_message", message, result=res) | |||
|
|||
except SkipMessage: | |||
except SkipMessage as e: | |||
message.fail() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SkipMessage
s shouldn't be marked as having failed. The idea behind SkipMessage
is to let users ack messages that they don't intend to process. Nacking means messages get put on the DLQ, which is semantically the wrong thing here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok that makes sense. Thanks for the feedback. But what would you expect the stored result to be if a user implements a middleware that skips messages, but does not fail them? Would it be None
or a SkipMessage
exception? If it should be a SkipMessage
exception then I can work this in by adding an after_skip_message
implementation to the results middleware that is similar to its after_nack
implementation, but checks for a non-None message._exception
(rather than message.failed
) to store the exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the result of a skipped message should probably be None
.
Adding after_skip_message
to the Results
middleware conflicts with the after_nack
hook (well, it doesn't exactly "conflict", but it's confusing): the AgeLimit
middleware both fails the message and raises a Skip, so both hooks run, and it happens to be that after_skip
runs first, then after_nack
, so things work out OK in that case.
More on None
vs exception: neither option is ideal, but my feeling is it's more natural for a skipped message's result to be None
than it is for it to be an exception. Both are equally bad in code that doesn't expect them, so we should document this interaction between AgeLimit
and Results
. In code that expects actors to potentially skip messages, dealing with None
is simpler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok that makes sense. I agree that neither option is ideal. I have put up another commit addressing this.
@@ -46,5 +46,4 @@ def before_process_message(self, broker, message): | |||
|
|||
if current_millis() - message.message_timestamp >= max_age: | |||
self.logger.warning("Message %r has exceeded its age limit.", message.message_id) | |||
message.fail() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the right change here is to retain the message.fail()
line and raise SkipMessage
.
Fixes #440
Stuff SkipMessage exceptions into the message proxy to ensure that
the result is consistent when used in conjunction with the Results
middleware.
Additionally, move the message.fail() call into the SkipMessage
handling to ensure that skipped messages are nack'd rather than
ack'd.
Note that if failing the message is left to the middleware,
then unintentionally omitting message.fail() can lead to no result
ever being stored by the results middleware for the message.
If it should be left to the middleware to decide to fail the skipped
message then it would be more sensible to have fail as the default.
This could be achieved by making the SkipMessage exception take an
optional "fail_message" parameter that defaults to True, and have
the message conditionally fail() inside the worker SkipMessage
exception handling. Custom middleware could then raise SkipMessage
with fail_message=False to override this behaviour and lead to the
message being skipped and ack'd rather than skipped and nack'd.