Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Producer] Performance drop when 'send' is called from multiple Futures #528

Open
panagiks opened this issue Jul 7, 2019 · 10 comments · May be fixed by #814
Open

[Producer] Performance drop when 'send' is called from multiple Futures #528

panagiks opened this issue Jul 7, 2019 · 10 comments · May be fixed by #814
Milestone

Comments

@panagiks
Copy link

panagiks commented Jul 7, 2019

The following pattern (also used in benchmark/simple_produce_bench.py) is well performant:

for i in range(n):
    await producer.send(topic, message, partition=partition)

The same goes for send_and_wait, while the throughput is slightly reduced it's still highly performing.

When however the producer.send is invoked from multiple Futures there is a very steep performance drop as well as a long lasting high CPU utilization accompanied by eventual KafkaTimeoutErrors.

for i in range(n):
    asyncio.ensure_future(producer.send(topic, message, partition=partition))

Of course the above is a very over-simplified example and if you are able to call all the producer.sends centrally (as in the examples) you should always go with the first but the second is used to illustrate the case were already existing Futures eventually call producer.send.

The performance deteriorates as the number of futures trying to send at the same time increases.

I am using the following modification on benchmark/simple_produce_bench.py to test and a locally setup 1-kafka 1-zookeeper setup:

$ git diff
diff --git a/benchmark/simple_produce_bench.py b/benchmark/simple_produce_bench.py
index 2bc364f..9ff380f 100644
--- a/benchmark/simple_produce_bench.py
+++ b/benchmark/simple_produce_bench.py
@@ -9,6 +9,7 @@ import random
 class Benchmark:
 
     def __init__(self, args):
+        self._done = 0
         self._num = args.num
         self._size = args.size
         self._topic = args.topic
@@ -59,7 +60,15 @@ class Benchmark:
                 )
             )
 
+    async def send(self, producer, topic, payload, partition):
+        await producer.send(topic, payload, partition=partition)
+        self._stats[-1]['count'] += 1
+        self._done += 1
+        if self._done >= self._num:
+            self.done.set_result(None)
+
     async def bench_simple(self):
+        self.done = asyncio.Future()
         payload = bytearray(b"m" * self._size)
         topic = self._topic
         partition = self._partition
@@ -75,9 +84,9 @@ class Benchmark:
         try:
             if not self._is_transactional:
                 for i in range(self._num):
+                    asyncio.ensure_future(self.send(producer, topic, payload, partition))
                     # payload[i % self._size] = random.randint(0, 255)
-                    await producer.send(topic, payload, partition=partition)
-                    self._stats[-1]['count'] += 1
+                await self.done
             else:
                 for i in range(self._num // transaction_size):
                     # payload[i % self._size] = random.randint(0, 255)

A few test runs at low number of futures:

$ python benchmark/simple_produce_bench.py -s 200 -n 500
Total produced 500 messages in 0.05 second(s). Avg 9680.0 m/s
$ python benchmark/simple_produce_bench.py -s 200 -n 1000
Total produced 1000 messages in 0.16 second(s). Avg 6125.0 m/s
$ python benchmark/simple_produce_bench.py -s 200 -n 2000
Total produced 2000 messages in 0.70 second(s). Avg 2867.0 m/s
$ python benchmark/simple_produce_bench.py -s 200 -n 3000
Produced 1170 messages in 1 second(s).
Total produced 3000 messages in 1.68 second(s). Avg 1789.0 m/s

Already a gradual drop is visible but this becomes more clear when increasing the number of futures:

$ python benchmark/simple_produce_bench.py -s 200 -n 10000
Produced 312 messages in 1 second(s).
Produced 312 messages in 1 second(s).
[...]
Produced 624 messages in 1 second(s).
Produced 1092 messages in 1 second(s).
Total produced 10000 messages in 25.29 second(s). Avg 395.0 m/s

Switching to 20k send operations will manage to produce ~4.2k messages before the rest timeout:

Total produced 4212 messages in 54.19 second(s). Avg 77.0 m/s

On the contrary, running with the original benchmark code (not from within Futures):

$ python benchmark/simple_produce_bench.py -s 200 -n 20000
Total produced 20000 messages in 0.35 second(s). Avg 57651.0 m/s

Usecase

In my usecase I have job system where I/O blocking jobs are scheduled, each one as an own Future, and after the I/O waiting part of each job is completed it produces a Kafka message. The 'traffic' of jobs is bursty with peaks reaching 20k ~ 30k jobs scheduled concurrently and lower input periods.

First Look into the issue

I had a first swing at tackling the issue by running my modified benchmark with profiling enabled.

Comparing the profiling output of the Futures approach to the one already in the benchmark file, the first thing that pops up is that message_accumulator's add_message function overshadows all else in both cumtime and tottime. Also interesting is the ncalls that shows that with the Futures approach add_message recursed heavily compared to the first approach.

First approach:

ncalls tottime percall cumtime percall filename:lineno(function)
9230/9115 0.0104 1.141e-06 0.1546 1.696e-05 message_accumulator.py:310(add_message)

With Futures:

ncalls tottime percall cumtime percall filename:lineno(function)
20493720/523740 6.781 1.295e-05 25.45 4.86e-05 message_accumulator.py:310(add_message)

The per-call time also seems to have increased significantly (takes ~4x) but in the range of values we have this could be insignificant.

Unfortunately, that's how far I managed to get. I was not able to find (even approximately) what is that causes so heavy recursion. I mean I found that the recursion exists in the first place due to aiokafka/producer/message_accumulator.py#L341-L342 but I could not figure out why the same load of messages would trigger such a heavy recursion when coming from multiple Futures and not in the other approach.

My only guess would be that in the benchmark send (and consequently add_message) are never really called asynchronously, they are async and are awaited but they are awaited sequentially (instead of doing for example an asyncio.gather on them). This however is nothing more than a guess as I was not able to find any actual evidence pointing to this being the issue.

I used the following changes on the benchmark code to test asynchronous calls to send without wrapping each call in an additional Future and the behavior is similar to the Futures approach (i.e. there is a breakdown at ~20k messages):

$ git diff
diff --git a/benchmark/simple_produce_bench.py b/benchmark/simple_produce_bench.py
index 2bc364f..893ea9e 100644
--- a/benchmark/simple_produce_bench.py
+++ b/benchmark/simple_produce_bench.py
@@ -59,6 +59,10 @@ class Benchmark:
                 )
             )
 
+    async def send(self, producer, topic, payload, partition):
+        await producer.send(topic, payload, partition=partition)
+        self._stats[-1]['count'] += 1
+
     async def bench_simple(self):
         payload = bytearray(b"m" * self._size)
         topic = self._topic
@@ -74,10 +78,10 @@ class Benchmark:
 
         try:
             if not self._is_transactional:
-                for i in range(self._num):
-                    # payload[i % self._size] = random.randint(0, 255)
-                    await producer.send(topic, payload, partition=partition)
-                    self._stats[-1]['count'] += 1
+                await asyncio.gather(*[
+                    self.send(producer, topic, payload, partition)
+                    for i in range(self._num)
+                ])
             else:
                 for i in range(self._num // transaction_size):
                     # payload[i % self._size] = random.randint(0, 255)
@aure-olli
Copy link
Contributor

This is quadratic explosion due to the fact that a batch of messages is limited in size. If you have 10000 messages waiting in parallel and can only send 100 at once, then every time 100 messages are sent, all the 9900 waiting messages will be woken up all together, but 9800 will have to sleep back once the queue is full again.

There's actually no interest in parallel calls to send since they are just appended in a list sequentially. If you want better performances, try creating several producers, then you can send several requests at once (but you'll loose the order).

@panagiks
Copy link
Author

panagiks commented Jul 8, 2019

Hello @aure-olli and thank you for your response :)

If you want better performances, try creating several producers, then you can send several requests at once (but you'll loose the order).

I will try this solution since order is not really important to my usecase, thank you!

@tvoinarovskyi tvoinarovskyi added this to the v0.6.0 milestone Jul 28, 2019
@tvoinarovskyi
Copy link
Member

I will test this scenario, it maybe makes sense to find some way to handle it under the hood. It may even be less of a problem now, as we removed the recursion. As for a workaround you can also try to work it out using "batch" API for sending. As you handle the scheduling yourself it will not have such problems. See https://aiokafka.readthedocs.io/en/stable/producer.html#direct-batch-control

@tvoinarovskyi tvoinarovskyi modified the milestones: v0.6.0, 0.6.1 May 11, 2020
@patkivikram
Copy link
Contributor

Is this fixed or still a problem?

@tvoinarovskyi
Copy link
Member

@patkivikram Hi, can you reproduce the behavior, is this problematic for you?
I found the fix for the problem quite problematic to implement actually, as it would require a lot of rewriting. It's a rather specific use-case, so I did not work on it yet. Asyncio is constantly enhancing its structures so it may as well be not as bad in the newer versions too. If you have a specific use-case that is hitting the issue I can help you either work around it or prioritize the fix more.

@patkivikram
Copy link
Contributor

patkivikram commented Dec 13, 2020

@tvoinarovskyi I am seeing a bottleneck where my producer is writing to multiple topics and partitions and gets full even with a high batch-size. Should I use a different producer per topic/partition?

@abersheeran
Copy link

I reproduced it. I have an HTTP API, which receives data and pushes it to Kafka. 10-20 QPS concurrency can keep the CPU at 100%.

@Ronserruya
Copy link

I "solved" this by putting a Semaphore before sending the request, looks like aiokafka cant handle having so many pending tasks in its buffer

This is the benchmark code:

import asyncio
from asyncio import as_completed
import time

from aiokafka import AIOKafkaProducer


async def send_msg(msg, limiter, producer):
    async with limiter:
        await producer.send_and_wait(value=msg, topic="my_topic",
                                     headers=[('x-datadog-trace-id', b'12593897844508091594'),
                                              ('x-datadog-parent-id', b'3332965923773247704'),
                                              ('x-datadog-sampling-priority', b'1')])


async def main():
    rate_limier = asyncio.Semaphore(150)
    async with AIOKafkaProducer(bootstrap_servers="localhost:9092", client_id="abc", acks=1) as producer:
        msgs = [b"qwjeoiqjwelkqndlkqweoiqwjeqliwjeoi12u3o1i2uwjeqlwiejqlwksdalkwnoiqweoi12u3" \
                b"iqwjlaksdlm1o23uo12ueolqwdjlakwejo12iu3oiejdlqakwsdnasdalksdalksdjalksdjalsdjal" \
                b"ksdjalsdja;ldsjaksdjaskje123u1023u12i3j12lk3j12k3123"
                for i in range(30_000)]
        tasks = [send_msg(msg, rate_limier, producer) for msg in msgs]

        start = time.time()
        sent = 0
        for fut in as_completed(tasks):
            await fut
            sent += 1
            print(f'\r{sent} | rate={sent/(time.time()-start):.2f} msgs/s', end='', flush=True)
        total_time = time.time() - start
        print(f'\n{total_time=:.2f} | rate={30_000/total_time:.2f} msgs/s')


if __name__ == '__main__':
    asyncio.run(main())

This is the output with the semaphore:

30000 | rate=13573.36 msgs/s
total_time=2.21 | rate=13573.01 msgs/s

This is without it:

5250 | rate=120.97 msgs/s

After 45 seconds everything timed out and crashed

@tvoinarovskyi tvoinarovskyi linked a pull request Jan 3, 2022 that will close this issue
4 tasks
@abersheeran
Copy link

The following code is my solution.

import asyncio

from loguru import logger
from indexpy import HTTPException

from .mq import get_kafka_client

push_queue: asyncio.Queue[
    tuple[asyncio.Event, tuple[str, bytes, bytes]]
] = asyncio.Queue(1000)


async def _real_push():
    while not push_queue.empty():
        event, (topic, value, key) = await push_queue.get()
        try:
            await get_kafka_client().send_and_wait(topic=topic, value=value, key=key)
        except Exception as e:
            logger.error(f"Failed to push message to Kafka: {e}")
        finally:
            event.set()


worker: asyncio.Task | None = None


async def push_message(topic: str, value: bytes, key: bytes) -> None:
    """
    Push a message to a Kafka topic.
    """
    event = asyncio.Event()
    await push_queue.put((event, (topic, value, key)))
    global worker
    if worker is None:
        worker = asyncio.create_task(_real_push())
    if worker.done():
        worker = asyncio.create_task(_real_push())
    await event.wait()

@Ronserruya
Copy link

Ronserruya commented Jul 31, 2023

I "solved" this by putting a Semaphore before sending the request, looks like aiokafka cant handle having so many pending tasks in its buffer

This is the benchmark code:

import asyncio
from asyncio import as_completed
import time

from aiokafka import AIOKafkaProducer


async def send_msg(msg, limiter, producer):
    async with limiter:
        await producer.send_and_wait(value=msg, topic="my_topic",
                                     headers=[('x-datadog-trace-id', b'12593897844508091594'),
                                              ('x-datadog-parent-id', b'3332965923773247704'),
                                              ('x-datadog-sampling-priority', b'1')])


async def main():
    rate_limier = asyncio.Semaphore(150)
    async with AIOKafkaProducer(bootstrap_servers="localhost:9092", client_id="abc", acks=1) as producer:
        msgs = [b"qwjeoiqjwelkqndlkqweoiqwjeqliwjeoi12u3o1i2uwjeqlwiejqlwksdalkwnoiqweoi12u3" \
                b"iqwjlaksdlm1o23uo12ueolqwdjlakwejo12iu3oiejdlqakwsdnasdalksdalksdjalksdjalsdjal" \
                b"ksdjalsdja;ldsjaksdjaskje123u1023u12i3j12lk3j12k3123"
                for i in range(30_000)]
        tasks = [send_msg(msg, rate_limier, producer) for msg in msgs]

        start = time.time()
        sent = 0
        for fut in as_completed(tasks):
            await fut
            sent += 1
            print(f'\r{sent} | rate={sent/(time.time()-start):.2f} msgs/s', end='', flush=True)
        total_time = time.time() - start
        print(f'\n{total_time=:.2f} | rate={30_000/total_time:.2f} msgs/s')


if __name__ == '__main__':
    asyncio.run(main())

This is the output with the semaphore:

30000 | rate=13573.36 msgs/s
total_time=2.21 | rate=13573.01 msgs/s

This is without it:

5250 | rate=120.97 msgs/s

After 45 seconds everything timed out and crashed

Updating my previous solution, I recently hit a bottleneck with my previous "solution" when using a Semaphore, I instead wrote this function, and got MUCH faster results, x2 in the benchmark for example.

import asyncio
import random
from asyncio import as_completed
import time

from aiokafka import AIOKafkaProducer
from typing import List

async def write_msgs_to_kafka(kafka_conn: AIOKafkaProducer, msgs: List[bytes], topic: str):
    batches = [kafka_conn.create_batch()]
    for msg in msgs:
        if not batches[-1].append(key=None, value=msg, timestamp=None, headers=[('x-datadog-trace-id', b'12593897844508091594'),
                                              ('x-datadog-parent-id', b'3332965923773247704'),
                                              ('x-datadog-sampling-priority', b'1')]):
            batches.append(kafka_conn.create_batch())
            batches[-1].append(key=None, value=msg, timestamp=None)

    futs = await asyncio.gather(*[
        kafka_conn.send_batch(batch, topic=topic, partition=random.randint(0, 3)) for batch in batches
    ])

    await asyncio.gather(*futs)

async def main_batch():
    async with AIOKafkaProducer(bootstrap_servers="localhost:9092", client_id="abc", acks=1,
                                request_timeout_ms=40000 * 3,
                                metadata_max_age_ms=300000 * 3) as producer:
        chunks = []
        for i in range(30):
            chunks.append([b"qwjeoiqjwelkqndlkqweoiqwjeqliwjeoi12u3o1i2uwjeqlwiejqlwksdalkwnoiqweoi12u3" \
                           b"iqwjlaksdlm1o23uo12ueolqwdjlakwejo12iu3oiejdlqakwsdnasdalksdalksdjalksdjalsdjal" \
                           b"ksdjalsdja;ldsjaksdjaskje123u1023u12i3j12lk3j12k3123"
                           for i in range(1000)])

        start = time.time()
        await asyncio.gather(*[write_msgs_to_kafka(producer, chunk, 'my_topic2') for chunk in chunks])
        total_time = time.time() - start
        print(f'\n{total_time=:.2f} | rate={30_000 / total_time:.2f} msgs/s')
total_time=1.09 | rate=27631.94 msgs/s

Also, when removing the headers, the semaphore method outputs 17341 msgs/s, but the batch method 52570 msgs/s !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants