Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pubsub emits more messages then allowed by maxMessages and allowExcessMessages after reaching maxExtension period #1213

Open
QuestionAndAnswer opened this issue Feb 17, 2021 · 9 comments
Assignees
Labels
api: pubsub Issues related to the googleapis/nodejs-pubsub API. external This issue is blocked on a bug with the actual product. priority: p3 Desirable enhancement or fix. May not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.

Comments

@QuestionAndAnswer
Copy link

QuestionAndAnswer commented Feb 17, 2021

Environment details

  • OS: Windows 10
  • Node.js version: 12.8.3
  • npm version: 6.14.6
  • @google-cloud/pubsub version: 2.6.0

Looking into the code, and issue details this also should be relevant for 2.9.0.

Steps to reproduce

You would need 1 topic and 1 subscription of this topic. Bellow is a subscription parameters.

image

  1. Create a server with the following content and run it.
const pubsub = require("@google-cloud/pubsub");

const config = {
    subName: process.env["SUB_NAME"] || "libs-test-pubsub",
};

console.log(config);

const ps = new pubsub.PubSub();
const subscription = ps.subscription(config.subName, {
    flowControl: {
        maxMessages: 2,
        allowExcessMessages: false,
        maxExtension: 5,
    },
    streamingOptions: {
        maxStreams: 1
    },
});

let rcvCounter = 0;
let prcCounter = 0;

function printCounters() {
    process.stdout.clearLine(0)
    process.stdout.write(`rcv: ${rcvCounter}\tprc: ${prcCounter}\r`);
}

const onError = err => {
    console.error("Subscription error:", err);
    process.exit(-1);
};

const onMessage = msg => {
    rcvCounter++;
    printCounters();

    setTimeout(() => {
        msg.ack();
        prcCounter++;
        printCounters();
    }, 2000);
}

subscription.on("error", onError);
subscription.on("message", onMessage);
  1. Now push into the topic 100 messages in a batch, so pubsub subscription queue should acquire 100+- messages in a short amount of time.

You can use this script for that.

const pubsub = require("@google-cloud/pubsub");

const config = {
    topicName: process.env["TOPIC_NAME"] || "libs-test-pubsub",
};

console.log(config);

const ps = new pubsub.PubSub();
const topic = ps.topic(config.topicName);

const messages = [];

for (let i = 0; i < 100; i++) {
    const p = topic.publish(Buffer.from(""), { attr1: "123" })
        .catch(err => {
            console.log(err);
        });

    messages.push(p);
}

Promise.all(messages);
  1. Now, you should see something like this (see the video bellow as well)

First, you would rcv is 2 and prc is 0. Expected, as we allowed only 2 messages being processed (passed into message callback) by flow control. Then, prc becoming 2 (as 2 second timeout reached and we acked the message) and rcv is 4. Such stepped processing and updates should happen each 2 seconds. But, after some time, somehere in the range of maxExtension period deadline (when pubsub starting either extending messages periods or removing them from internal queue), rcv counter jumps up to 100, while prc is still 4, 6 or 8 depends when it happen, but the number is always small and smaller then 20, even 15.

bandicam.2021-02-17.21-50-32-285.mp4

This leads to falling assumption that, for example k8s pod which processing the messages and have resources limits by CPU or MEM might be evicted because it would try to process all the messages and would reach resources limits (would be throttled by CPU or Killed because of memory limits).

Now, where it is happeing.
I've added logs into library, so I definitely see that it following though _extendDeadlines in lease-manager.ts.

image

Through else path in for loop, and calling this.remove(message);

image

And in remove it starting constantly falling into this.pending > 0 condition and dispensing current message (which actually already expired and should be redelivered by pubsub). It's obvious here, that such message should not be dispensed to the client code.

I'm not sure how to fix this in pubsub lib code, either as I don't think that there is a proper workaround for the client code (ours, as we are relying that flow control would be met).

@product-auto-label product-auto-label bot added the api: pubsub Issues related to the googleapis/nodejs-pubsub API. label Feb 17, 2021
@yoshi-automation yoshi-automation added the triage me I really want to be triaged. label Feb 18, 2021
@feywind
Copy link
Collaborator

feywind commented Feb 18, 2021

@QuestionAndAnswer Thanks for the super detailed report. I'll have to go over this in more detail, but I'll mark it as a bug for now.

I like your editor's comment on the method. 😂 You must be kidding

@feywind feywind added priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns. and removed triage me I really want to be triaged. labels Feb 18, 2021
@QuestionAndAnswer
Copy link
Author

@feywind so, how is it going? Did you have a chance to look into this issue?

@yoshi-automation yoshi-automation added 🚨 This issue needs some love. and removed 🚨 This issue needs some love. labels May 23, 2021
@procedurallygenerated
Copy link

@feywind we are running into the exact same issue in production.

Node 14
@google-cloud/pubsub version: 2.10.0

Running it in a similar env, docker container inside k8s.

Any progress here?

@meredithslota meredithslota changed the title Pubsub emits more masseges then allowed by maxMessages and allowExcessMessages after reaching maxExtension period Pubsub emits more messages then allowed by maxMessages and allowExcessMessages after reaching maxExtension period Aug 25, 2021
@feywind
Copy link
Collaborator

feywind commented Sep 9, 2021

Quick (sadly content-less) update - no changes here yet, but I've been on a bit of a feature development rotation and hope to get back to these issues.

@yoshi-automation yoshi-automation added the 🚨 This issue needs some love. label Nov 3, 2021
@feywind
Copy link
Collaborator

feywind commented Nov 26, 2021

I stared at this for a while today, and I'm not as familiar with this bit of the library, but I don't know why we'd want to dispense messages from the pending queue just because another message was removed. It feels like there was a paradigm shift here that wasn't taken into account, but I'm going to ask some others.

@feywind feywind added the status: investigating The issue is under investigation, which is determined to be non-trivial. label Jan 26, 2022
@feywind feywind added external This issue is blocked on a bug with the actual product. priority: p3 Desirable enhancement or fix. May not be included in next release. and removed priority: p2 Moderately-important priority. Fix may not be included in next release. 🚨 This issue needs some love. status: investigating The issue is under investigation, which is determined to be non-trivial. labels Aug 15, 2022
@feywind
Copy link
Collaborator

feywind commented Aug 17, 2022

Linked to the meta-issue about transport problems: b/242894947

@michal-zap
Copy link

michal-zap commented Aug 22, 2022

Is there any way to bypass the problem? Our cluster is running out of memory and options regarding flowController don't seem to work. Maybe is there a way to stop subscribers to listen an enormous amount of messages pulled?

@jcgomezcorrea
Copy link

We have the same problem with @google-cloud/pubsub version: 3.4.1 . Any updates or workaround for this issue?

adrian-borodziuk added a commit to zaplify/nodejs-pubsub that referenced this issue Aug 18, 2023
…-max-messages-limit

fix: googleapis#1213 Pubsub emits more messages then allowed by maxMessages and allowExcessMessages after reaching maxExtension period
adrian-borodziuk added a commit to zaplify/nodejs-pubsub that referenced this issue Aug 18, 2023
…-max-messages-limit

fix: googleapis#1213 Pubsub emits more messages then allowed by maxMessages and allowExcessMessages after reaching maxExtension period
@alexby
Copy link

alexby commented Oct 14, 2023

For those who were interested in a workaround:

subscription.setOptions({ flowControl: { maxMessages: 1, allowExcessMessages: false } })

(It could work without allowExcessMessages, but I didn't test it without this option).

The point is that you need to set it on every connection to the broker. It looks like the setting is ignored when you use it in the subscription.create(). So every time you create a topic on initialization and then listening to the queue, or you're subscribing to the topic that already exists - set it to this specific subscription/connection.

I've tested it on node:16.13-alpine + @google-cloud/pubsub@3.4.1 and on the latest @google-cloud/pubsub@4.0.6

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the googleapis/nodejs-pubsub API. external This issue is blocked on a bug with the actual product. priority: p3 Desirable enhancement or fix. May not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.
Projects
None yet
Development

No branches or pull requests

7 participants