Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support request pipelining in AsyncProducer #2094

Merged
merged 6 commits into from
Jan 18, 2022

Conversation

slaunay
Copy link
Contributor

@slaunay slaunay commented Dec 24, 2021

Enhancement

Although there is a Config.Net.MaxOpenRequests parameter, the AsyncProducer uses at most one in-flight produce request when producing to a single Kafka broker.

As shown in #1539 a while back, using request pipelining increases throughput when writing to a Kafka cluster over a high latency network link and something that is recommended when implementing the Kafka protocol as described in
their network section:

The broker's request processing allows only a single in-flight request per
connection in order to guarantee this ordering. Note that clients can (and
ideally should) use non-blocking IO to implement request pipelining and
achieve higher throughput. i.e., clients can send requests even while
awaiting responses for preceding requests since the outstanding requests
will be buffered in the underlying OS socket buffer.

MaxOpenRequests defaults to 5 but if you were to execute the following code:

for {
  // Blocks till we get a response from the broker or an error is returned
  res, err := broker.Produce(req)
  // Check for error or else use response
}

then you would only see a single in-flight Produce request.
That is, to send the next request you must first wait for the responseReceiver Go routine to read the current response bytes and return it to calling Go routine.

Obviously, if it takes a while to read such response because of network latency or slow replication, the calling Go routine will seat idle instead of sending more records therefore reducing the throughput (more on that in the Testing done
section).

Unfortunately this is how the AsyncProducer currently works, at least how the brokerProducer "bridge" Go routine processes batches (i.e. *produceSet):
https://github.com/Shopify/sarama/blob/635bcf350a7b8a92b4da0aacd288ee09311e673d/async_producer.go#L693-L707

By adding a new Broker.AyncProduce receiver and using it in the AsyncProducer, we can achieve proper pipelining of Produce requests.

The new receiver relies on a callback to eventually receive the response:

for {
  // Blocks once we reach MaxOpenRequests in-flight requests
  err := broker.AsyncProduce(req, func(res *ProduceResponse, err error) {
    // Check for response error or else use response
  })
  // Check for request error
}

The same could technically be done with Broker.Produce and N go routines:

for {
  go func() {
    res, err := broker.Produce(req)
    // Check for error or else use response
  }
}

but it gets tricky to get the back-pressure behavior from the internal Brokers.responses channel and avoid creating too many Go routines.

With the callback the back-pressure is propagated up and we can still use a channel to handle the responses asynchronously if necessary:

type responseOrError struct {
  res *ProduceResponse
  err error
}

c := make(chan responseOrError)

// Handle future responses asynchronously
go func() {
  for resOrErr := range c {
    // Check for response error or else use response
  }
}

for {
  // Blocks once we reach MaxOpenRequests in-flight requests
  err := broker.AsyncProduce(req, func(res *ProduceResponse, err error) {
    // Pass the response or error to a separate Go routine to unblock
    // the Broker.responseReceiver Go routine
    c <- responseOrError{res, err}
  })
  // Check for request error
}

Returning or else passing a channel to Broker.AsyncProduce is another option but I believe it leads to more complex and less flexible code.

As this is a non trivial behavior change in the AsyncProducer, I introduced a new configuration property to make that feature opt-in.

I do believe that Produce request pipelining should be the default (like in the Java client) but an application that cares about ordering might see records persisted out of order depending on how it is configured which can be considered a breaking change.

Changes

  • introduce Broker.AsyncProduce with a callback to have a mostly non blocking way to produce to a single broker
  • refactor Broker.send and Broker.responseReceiver to support callback
  • factorize throttleTime metrics into Broker.updateThrottleMetric
  • add Config.Produce.Pipeline parameter (defaults to false for backward compatibility)
  • honor MaxOpenRequests when using AsyncProducer and conf.Produce.Pipeline is enabled
  • add unit and functional tests
  • add -enable-pipeline flag to kafka-producer-performance for benchmarking

Testing done

The new unit and functional tests should provide good coverage for the new changes as they touch a critical part of the Broker and the AsyncProducer logic.

But for a real end to end and performance test I am using the kafka-producer-performance tool with a new -enable-pipeline to show the impact of request pipelining when it is the most useful.

For that, I am running the tool from an EC2 instance in AWS us-west-2 and producing to:

  • a local Kafka cluster in the same AZ with a RTT of ~1 ms
  • a remote Kafka cluster in AWS us-east-1 with a RTT of ~70 ms
  • a remote Kafka cluster in AWS eu-central-1 with a RTT of ~140 ms

Here are some details about the EC2 instance and the command line used as the baseline:

$ lsb_release -d
Description:    Ubuntu 18.04.2 LTS
$ ec2metadata --instance-type
c5d.xlarge
$ sysctl net.ipv4.tcp_wmem
net.ipv4.tcp_wmem = 4096        16384   67108864
$ ./kafka-producer-performance-pipeline \
  -brokers <broker>:9093 \
  -security-protocol SSL -tls-ca-certs ca.pem -tls-client-cert cert.pem -tls-client-key key.pem \
  -topic test-topic-for-intg \
  -message-load 1000000 \       # 1M records
  -message-size 1000 \          # 1 kB record payload
  -required-acks -1 \           # Wait for 2 ISRs out of 4 replicas
  -flush-bytes 500000 \
  -flush-frequency 100ms

Here are the performance results for each remote target cluster with the respective extra flags:

Target cluster Extra flags RTT (ms) Max records/s Max throughput (MiB/s) Max requests in flight
us-west-2 -partitioner manual
-partition 0
1 55,417.7 35.87 1
us-west-2 -partitioner manual
-partition 0
-enable-pipeline
1 63,645.8 41.70 6
us-east-1 -partitioner manual
-partition 0
70 10,758.9 9.74 1
us-east-1 -partitioner manual
-partition 0
-enable-pipeline
70 64,800.0 36.12 6
eu-central-1 -partitioner manual
-partition 0
140 5,494.0 5.14 1
eu-central-1 -partitioner manual
-partition 0
-enable-pipeline
-max-open-requests 1
140 11,371.9 10.10 2
eu-central-1 -partitioner manual
-partition 0
-enable-pipeline
140 32,999.4 25.47 6
eu-central-1 default roundrobin partitioner
on 5 Brokers/128 partitions
140 119,782.4 59.19 5
eu-central-1 default roundrobin partitioner
on 5 Brokers/128 partitions
plus -enable-pipeline
140 133,124.7 65.88 16

Here is the detailed output of kafka-producer-performance when producing to eu-central-1 over a single topic partition (-partitioner manual -partition 0).

Request pipelining disabled (current behavior) details
7306 records sent, 2945.7 records/sec (2.81 MiB/sec ingress, 0.73 MiB/sec egress), 343.9 ms avg latency, 335.1 ms stddev, 161.5 ms 50th, 412.2 ms 75th, 1192.0 ms 95th, 1192.0 ms 99th, 1192.0 ms 99.9th, 1 total req. in flight
32630 records sent, 4362.6 records/sec (4.16 MiB/sec ingress, 2.15 MiB/sec egress), 230.4 ms avg latency, 182.3 ms stddev, 161.0 ms 50th, 305.0 ms 75th, 634.0 ms 95th, 1192.0 ms 99th, 1192.0 ms 99.9th, 1 total req. in flight
56980 records sent, 4565.8 records/sec (4.35 MiB/sec ingress, 2.81 MiB/sec egress), 214.0 ms avg latency, 144.6 ms stddev, 161.0 ms 50th, 175.0 ms 75th, 314.0 ms 95th, 1192.0 ms 99th, 1192.0 ms 99.9th, 1 total req. in flight
85226 records sent, 4875.6 records/sec (4.65 MiB/sec ingress, 3.36 MiB/sec egress), 200.8 ms avg latency, 120.6 ms stddev, 162.0 ms 50th, 175.0 ms 75th, 306.6 ms 95th, 1192.0 ms 99th, 1192.0 ms 99.9th, 1 total req. in flight
113472 records sent, 5047.8 records/sec (4.81 MiB/sec ingress, 3.72 MiB/sec egress), 192.5 ms avg latency, 106.4 ms stddev, 161.0 ms 50th, 169.0 ms 75th, 306.0 ms 95th, 1058.1 ms 99th, 1192.0 ms 99.9th, 1 total req. in flight
139770 records sent, 5086.2 records/sec (4.85 MiB/sec ingress, 3.93 MiB/sec egress), 190.5 ms avg latency, 98.1 ms stddev, 161.0 ms 50th, 170.5 ms 75th, 306.0 ms 95th, 857.2 ms 99th, 1192.0 ms 99.9th, 1 total req. in flight
168016 records sent, 5173.0 records/sec (4.93 MiB/sec ingress, 4.13 MiB/sec egress), 187.4 ms avg latency, 91.0 ms stddev, 161.0 ms 50th, 168.5 ms 75th, 306.0 ms 95th, 641.4 ms 99th, 1192.0 ms 99.9th, 1 total req. in flight
196262 records sent, 5236.5 records/sec (4.99 MiB/sec ingress, 4.28 MiB/sec egress), 185.1 ms avg latency, 85.5 ms stddev, 161.0 ms 50th, 165.0 ms 75th, 305.9 ms 95th, 445.7 ms 99th, 1192.0 ms 99.9th, 1 total req. in flight
222560 records sent, 5239.2 records/sec (5.00 MiB/sec ingress, 4.37 MiB/sec egress), 184.4 ms avg latency, 81.9 ms stddev, 161.0 ms 50th, 165.0 ms 75th, 306.0 ms 95th, 425.2 ms 99th, 1192.0 ms 99.9th, 1 total req. in flight
251780 records sent, 5302.9 records/sec (5.06 MiB/sec ingress, 4.49 MiB/sec egress), 182.3 ms avg latency, 77.7 ms stddev, 161.0 ms 50th, 164.0 ms 75th, 306.0 ms 95th, 402.4 ms 99th, 1192.0 ms 99.9th, 1 total req. in flight
281000 records sent, 5354.4 records/sec (5.11 MiB/sec ingress, 4.60 MiB/sec egress), 180.2 ms avg latency, 73.8 ms stddev, 161.0 ms 50th, 164.0 ms 75th, 305.0 ms 95th, 379.6 ms 99th, 1192.0 ms 99.9th, 1 total req. in flight
307298 records sent, 5346.2 records/sec (5.10 MiB/sec ingress, 4.64 MiB/sec egress), 180.5 ms avg latency, 72.2 ms stddev, 161.0 ms 50th, 164.0 ms 75th, 305.0 ms 95th, 362.1 ms 99th, 1192.0 ms 99.9th, 1 total req. in flight
336518 records sent, 5386.1 records/sec (5.14 MiB/sec ingress, 4.72 MiB/sec egress), 179.4 ms avg latency, 69.5 ms stddev, 161.0 ms 50th, 164.0 ms 75th, 305.0 ms 95th, 344.7 ms 99th, 1192.0 ms 99.9th, 1 total req. in flight
363790 records sent, 5391.1 records/sec (5.14 MiB/sec ingress, 4.76 MiB/sec egress), 178.7 ms avg latency, 67.4 ms stddev, 161.0 ms 50th, 164.0 ms 75th, 305.0 ms 95th, 328.5 ms 99th, 1192.0 ms 99.9th, 1 total req. in flight
391062 records sent, 5395.4 records/sec (5.15 MiB/sec ingress, 4.79 MiB/sec egress), 178.7 ms avg latency, 65.8 ms stddev, 161.0 ms 50th, 164.0 ms 75th, 305.0 ms 95th, 314.0 ms 99th, 1192.0 ms 99.9th, 1 total req. in flight
420282 records sent, 5424.4 records/sec (5.17 MiB/sec ingress, 4.85 MiB/sec egress), 177.8 ms avg latency, 63.9 ms stddev, 161.0 ms 50th, 164.0 ms 75th, 305.0 ms 95th, 314.0 ms 99th, 1192.0 ms 99.9th, 1 total req. in flight
448528 records sent, 5438.1 records/sec (5.19 MiB/sec ingress, 4.89 MiB/sec egress), 177.2 ms avg latency, 62.2 ms stddev, 161.0 ms 50th, 163.0 ms 75th, 305.0 ms 95th, 314.0 ms 99th, 1192.0 ms 99.9th, 1 total req. in flight
476774 records sent, 5450.1 records/sec (5.20 MiB/sec ingress, 4.92 MiB/sec egress), 176.9 ms avg latency, 61.0 ms stddev, 161.0 ms 50th, 163.0 ms 75th, 305.0 ms 95th, 314.0 ms 99th, 1192.0 ms 99.9th, 1 total req. in flight
504046 records sent, 5450.3 records/sec (5.20 MiB/sec ingress, 4.94 MiB/sec egress), 176.7 ms avg latency, 60.0 ms stddev, 161.0 ms 50th, 163.0 ms 75th, 305.0 ms 95th, 313.6 ms 99th, 1192.0 ms 99.9th, 1 total req. in flight
532292 records sent, 5460.5 records/sec (5.21 MiB/sec ingress, 4.97 MiB/sec egress), 176.5 ms avg latency, 59.0 ms stddev, 161.0 ms 50th, 163.0 ms 75th, 305.0 ms 95th, 313.0 ms 99th, 1192.0 ms 99.9th, 1 total req. in flight
556642 records sent, 5431.7 records/sec (5.18 MiB/sec ingress, 4.96 MiB/sec egress), 177.3 ms avg latency, 59.2 ms stddev, 161.0 ms 50th, 163.8 ms 75th, 305.0 ms 95th, 318.0 ms 99th, 1192.0 ms 99.9th, 1 total req. in flight
584888 records sent, 5441.9 records/sec (5.19 MiB/sec ingress, 4.98 MiB/sec egress), 176.9 ms avg latency, 58.1 ms stddev, 161.0 ms 50th, 163.0 ms 75th, 305.0 ms 95th, 314.0 ms 99th, 1192.0 ms 99.9th, 1 total req. in flight
614108 records sent, 5459.7 records/sec (5.21 MiB/sec ingress, 5.01 MiB/sec egress), 176.4 ms avg latency, 57.0 ms stddev, 161.0 ms 50th, 163.0 ms 75th, 305.0 ms 95th, 314.0 ms 99th, 1192.0 ms 99.9th, 1 total req. in flight
643328 records sent, 5476.1 records/sec (5.22 MiB/sec ingress, 5.04 MiB/sec egress), 175.8 ms avg latency, 55.8 ms stddev, 161.0 ms 50th, 163.0 ms 75th, 305.0 ms 95th, 314.0 ms 99th, 1192.0 ms 99.9th, 1 total req. in flight
670600 records sent, 5475.2 records/sec (5.22 MiB/sec ingress, 5.05 MiB/sec egress), 175.9 ms avg latency, 55.4 ms stddev, 161.0 ms 50th, 163.0 ms 75th, 305.0 ms 95th, 314.0 ms 99th, 1192.0 ms 99.9th, 1 total req. in flight
699820 records sent, 5489.6 records/sec (5.24 MiB/sec ingress, 5.08 MiB/sec egress), 175.3 ms avg latency, 54.3 ms stddev, 161.0 ms 50th, 163.0 ms 75th, 305.0 ms 95th, 313.6 ms 99th, 1192.0 ms 99.9th, 1 total req. in flight
725144 records sent, 5473.6 records/sec (5.22 MiB/sec ingress, 5.07 MiB/sec egress), 175.8 ms avg latency, 54.5 ms stddev, 161.0 ms 50th, 163.0 ms 75th, 305.0 ms 95th, 314.0 ms 99th, 1192.0 ms 99.9th, 1 total req. in flight
752416 records sent, 5472.9 records/sec (5.22 MiB/sec ingress, 5.08 MiB/sec egress), 175.9 ms avg latency, 54.2 ms stddev, 161.0 ms 50th, 163.0 ms 75th, 305.0 ms 95th, 314.0 ms 99th, 1192.0 ms 99.9th, 1 total req. in flight
779688 records sent, 5472.3 records/sec (5.22 MiB/sec ingress, 5.09 MiB/sec egress), 175.9 ms avg latency, 53.9 ms stddev, 161.0 ms 50th, 163.0 ms 75th, 305.0 ms 95th, 314.0 ms 99th, 1192.0 ms 99.9th, 1 total req. in flight
808908 records sent, 5484.9 records/sec (5.23 MiB/sec ingress, 5.11 MiB/sec egress), 175.6 ms avg latency, 53.1 ms stddev, 161.0 ms 50th, 163.0 ms 75th, 305.0 ms 95th, 314.0 ms 99th, 1192.0 ms 99.9th, 1 total req. in flight
837154 records sent, 5490.3 records/sec (5.24 MiB/sec ingress, 5.13 MiB/sec egress), 175.3 ms avg latency, 52.5 ms stddev, 161.0 ms 50th, 163.0 ms 75th, 305.0 ms 95th, 314.0 ms 99th, 1192.0 ms 99.9th, 1 total req. in flight
865400 records sent, 5495.3 records/sec (5.24 MiB/sec ingress, 5.14 MiB/sec egress), 175.2 ms avg latency, 52.0 ms stddev, 161.0 ms 50th, 163.0 ms 75th, 305.0 ms 95th, 314.0 ms 99th, 1192.0 ms 99.9th, 1 total req. in flight
892672 records sent, 5494.0 records/sec (5.24 MiB/sec ingress, 5.14 MiB/sec egress), 175.3 ms avg latency, 51.8 ms stddev, 161.0 ms 50th, 163.0 ms 75th, 305.0 ms 95th, 314.0 ms 99th, 1192.0 ms 99.9th, 1 total req. in flight
918970 records sent, 5487.1 records/sec (5.23 MiB/sec ingress, 5.14 MiB/sec egress), 175.4 ms avg latency, 51.7 ms stddev, 161.0 ms 50th, 163.0 ms 75th, 305.0 ms 95th, 314.5 ms 99th, 1192.0 ms 99.9th, 1 total req. in flight
947216 records sent, 5491.8 records/sec (5.24 MiB/sec ingress, 5.15 MiB/sec egress), 175.4 ms avg latency, 51.3 ms stddev, 161.0 ms 50th, 163.0 ms 75th, 305.0 ms 95th, 314.3 ms 99th, 1192.0 ms 99.9th, 1 total req. in flight
972540 records sent, 5479.7 records/sec (5.23 MiB/sec ingress, 5.15 MiB/sec egress), 175.8 ms avg latency, 51.5 ms stddev, 161.0 ms 50th, 163.0 ms 75th, 305.0 ms 95th, 314.0 ms 99th, 1192.0 ms 99.9th, 1 total req. in flight
999812 records sent, 5479.0 records/sec (5.23 MiB/sec ingress, 5.15 MiB/sec egress), 175.7 ms avg latency, 51.2 ms stddev, 161.0 ms 50th, 163.0 ms 75th, 305.0 ms 95th, 314.0 ms 99th, 1171.2 ms 99.9th, 1 total req. in flight
1000000 records sent, 5478.7 records/sec (5.22 MiB/sec ingress, 5.15 MiB/sec egress), 175.4 ms avg latency, 50.5 ms stddev, 161.0 ms 50th, 163.0 ms 75th, 305.0 ms 95th, 314.0 ms 99th, 1168.3 ms 99.9th, 0 total req. in flight
Request pipelining enabled (-enable-pipeline -max-open-requests 1) details
1462 records sent, 1304.2 records/sec (1.24 MiB/sec ingress, 0.05 MiB/sec egress), 453.0 ms avg latency, 0.0 ms stddev, 453.0 ms 50th, 453.0 ms 75th, 453.0 ms 95th, 453.0 ms 99th, 453.0 ms 99.9th, 2 total req. in flight
54058 records sent, 8831.9 records/sec (8.42 MiB/sec ingress, 3.56 MiB/sec egress), 207.8 ms avg latency, 172.2 ms stddev, 162.0 ms 50th, 167.0 ms 75th, 482.6 ms 95th, 1348.0 ms 99th, 1348.0 ms 99.9th, 2 total req. in flight
111524 records sent, 10028.3 records/sec (9.56 MiB/sec ingress, 5.50 MiB/sec egress), 186.5 ms avg latency, 122.0 ms stddev, 162.0 ms 50th, 164.0 ms 75th, 288.2 ms 95th, 1235.9 ms 99th, 1348.0 ms 99.9th, 2 total req. in flight
169964 records sent, 10542.8 records/sec (10.05 MiB/sec ingress, 6.69 MiB/sec egress), 179.1 ms avg latency, 99.4 ms stddev, 162.0 ms 50th, 164.0 ms 75th, 240.8 ms 95th, 787.8 ms 99th, 1348.0 ms 99.9th, 2 total req. in flight
228404 records sent, 10814.0 records/sec (10.31 MiB/sec ingress, 7.49 MiB/sec egress), 175.2 ms avg latency, 86.0 ms stddev, 162.0 ms 50th, 164.0 ms 75th, 183.0 ms 95th, 549.2 ms 99th, 1348.0 ms 99.9th, 2 total req. in flight
285870 records sent, 10944.2 records/sec (10.44 MiB/sec ingress, 8.03 MiB/sec egress), 173.3 ms avg latency, 77.3 ms stddev, 162.0 ms 50th, 163.0 ms 75th, 181.3 ms 95th, 461.9 ms 99th, 1348.0 ms 99.9th, 2 total req. in flight
344310 records sent, 11063.7 records/sec (10.55 MiB/sec ingress, 8.46 MiB/sec egress), 171.5 ms avg latency, 70.6 ms stddev, 162.0 ms 50th, 163.0 ms 75th, 180.3 ms 95th, 367.7 ms 99th, 1348.0 ms 99.9th, 2 total req. in flight
401776 records sent, 11123.2 records/sec (10.61 MiB/sec ingress, 8.77 MiB/sec egress), 170.9 ms avg latency, 65.6 ms stddev, 162.0 ms 50th, 163.0 ms 75th, 181.0 ms 95th, 294.9 ms 99th, 1348.0 ms 99.9th, 2 total req. in flight
460216 records sent, 11191.8 records/sec (10.67 MiB/sec ingress, 9.03 MiB/sec egress), 170.0 ms avg latency, 61.4 ms stddev, 162.0 ms 50th, 163.0 ms 75th, 180.0 ms 95th, 294.3 ms 99th, 1348.0 ms 99.9th, 2 total req. in flight
517682 records sent, 11224.5 records/sec (10.70 MiB/sec ingress, 9.24 MiB/sec egress), 169.5 ms avg latency, 58.1 ms stddev, 162.0 ms 50th, 163.0 ms 75th, 179.4 ms 95th, 292.4 ms 99th, 1348.0 ms 99.9th, 2 total req. in flight
576122 records sent, 11269.7 records/sec (10.75 MiB/sec ingress, 9.42 MiB/sec egress), 168.8 ms avg latency, 55.1 ms stddev, 162.0 ms 50th, 163.0 ms 75th, 179.0 ms 95th, 289.4 ms 99th, 1348.0 ms 99.9th, 2 total req. in flight
634562 records sent, 11307.1 records/sec (10.78 MiB/sec ingress, 9.58 MiB/sec egress), 168.2 ms avg latency, 52.6 ms stddev, 162.0 ms 50th, 163.0 ms 75th, 178.0 ms 95th, 288.5 ms 99th, 1348.0 ms 99.9th, 2 total req. in flight
693002 records sent, 11338.3 records/sec (10.81 MiB/sec ingress, 9.71 MiB/sec egress), 168.2 ms avg latency, 50.4 ms stddev, 162.0 ms 50th, 163.0 ms 75th, 180.0 ms 95th, 287.9 ms 99th, 1348.0 ms 99.9th, 2 total req. in flight
751442 records sent, 11364.8 records/sec (10.84 MiB/sec ingress, 9.82 MiB/sec egress), 168.0 ms avg latency, 48.4 ms stddev, 162.0 ms 50th, 163.0 ms 75th, 180.0 ms 95th, 287.3 ms 99th, 1348.0 ms 99.9th, 2 total req. in flight
807934 records sent, 11360.1 records/sec (10.83 MiB/sec ingress, 9.90 MiB/sec egress), 168.0 ms avg latency, 47.1 ms stddev, 162.0 ms 50th, 163.0 ms 75th, 180.0 ms 95th, 287.7 ms 99th, 1348.0 ms 99.9th, 2 total req. in flight
864426 records sent, 11355.9 records/sec (10.83 MiB/sec ingress, 9.97 MiB/sec egress), 168.0 ms avg latency, 45.9 ms stddev, 162.0 ms 50th, 163.0 ms 75th, 180.0 ms 95th, 288.1 ms 99th, 1348.0 ms 99.9th, 2 total req. in flight
922866 records sent, 11376.4 records/sec (10.85 MiB/sec ingress, 10.05 MiB/sec egress), 167.8 ms avg latency, 44.5 ms stddev, 162.0 ms 50th, 163.0 ms 75th, 180.0 ms 95th, 287.5 ms 99th, 1348.0 ms 99.9th, 2 total req. in flight
979358 records sent, 11371.9 records/sec (10.85 MiB/sec ingress, 10.10 MiB/sec egress), 167.8 ms avg latency, 43.6 ms stddev, 162.0 ms 50th, 163.0 ms 75th, 180.0 ms 95th, 287.9 ms 99th, 1343.5 ms 99.9th, 2 total req. in flight
1000000 records sent, 11375.8 records/sec (10.85 MiB/sec ingress, 10.13 MiB/sec egress), 167.4 ms avg latency, 42.2 ms stddev, 162.0 ms 50th, 163.0 ms 75th, 180.0 ms 95th, 286.7 ms 99th, 1326.3 ms 99.9th, 0 total req. in flight
Request pipelining enabled (-enable-pipeline [-max-open-requests 5]) details
36526 records sent, 14647.2 records/sec (13.97 MiB/sec ingress, 3.63 MiB/sec egress), 255.9 ms avg latency, 191.9 ms stddev, 174.0 ms 50th, 314.0 ms 75th, 675.4 ms 95th, 1185.0 ms 99th, 1185.0 ms 99.9th, 6 total req. in flight
211846 records sent, 28270.6 records/sec (26.96 MiB/sec ingress, 13.96 MiB/sec egress), 178.6 ms avg latency, 83.0 ms stddev, 161.0 ms 50th, 164.0 ms 75th, 241.6 ms 95th, 456.7 ms 99th, 1185.0 ms 99.9th, 6 total req. in flight
390088 records sent, 31223.1 records/sec (29.78 MiB/sec ingress, 19.23 MiB/sec egress), 171.2 ms avg latency, 61.6 ms stddev, 161.0 ms 50th, 162.0 ms 75th, 194.6 ms 95th, 450.1 ms 99th, 1185.0 ms 99.9th, 6 total req. in flight
551772 records sent, 31541.5 records/sec (30.08 MiB/sec ingress, 21.73 MiB/sec egress), 173.2 ms avg latency, 52.5 ms stddev, 161.0 ms 50th, 174.2 ms 75th, 200.0 ms 95th, 372.0 ms 99th, 1185.0 ms 99.9th, 6 total req. in flight
728066 records sent, 32368.0 records/sec (30.87 MiB/sec ingress, 23.87 MiB/sec egress), 170.6 ms avg latency, 46.0 ms stddev, 161.0 ms 50th, 167.0 ms 75th, 199.0 ms 95th, 317.8 ms 99th, 1185.0 ms 99.9th, 6 total req. in flight
907282 records sent, 32999.4 records/sec (31.47 MiB/sec ingress, 25.47 MiB/sec egress), 169.0 ms avg latency, 41.3 ms stddev, 161.0 ms 50th, 165.0 ms 75th, 197.0 ms 95th, 311.0 ms 99th, 1185.0 ms 99.9th, 6 total req. in flight
1000000 records sent, 33098.5 records/sec (31.57 MiB/sec ingress, 26.04 MiB/sec egress), 168.2 ms avg latency, 38.4 ms stddev, 161.0 ms 50th, 164.0 ms 75th, 196.0 ms 95th, 216.1 ms 99th, 1163.9 ms 99.9th, 0 total req. in flight

There is an oddity with using -max-open-requests as it is reported as 2 when set to 1 and 6 when set to 5 (default).

I thought it was because the requests-in-flight metric is increased before writing to the socket but the effective rate limiting is done later when writing to the Brokers.responses channel:
https://github.com/Shopify/sarama/blob/635bcf350a7b8a92b4da0aacd288ee09311e673d/broker.go#L830-L848

I think there might actually be room for one more in-flight request even if the channel is buffered with a N-1 capacity. That would explain the 2x throughput improvement when using -max-open-requests 1 as we would be able to
pipeline up to 2 Produce requests.

Anyway, based on those numbers the throughput performance improvement ranges from 11% to 395%.

When writing to multiple topic partitions like when using the roundrobin partitioner (i.e. writing to more than one broker) the throughput increases because multiple brokerProducers are active.

If you have Kafka producers distributed around the world and a few centralized Kafka clusters like we do, writing to multiple brokers and using request pipeline can greatly improve the throughput.
The same applies if you want to mirror records between two Kafka clusters located in remote data centers.

Having a constant stream of bytes written to a TCP connection is key to increase the TCP window and therefore the throughput especially on high latency network links.

We have been using this specific enhancement in our log shipper successfully for a few years in production.
With other improvements, we are able to use Sarama to push up to 1M records/s per instance (131 Bytes records in average) and produce overall ~6M records/s during peak hours across our 37 data centers.

So thanks a lot for creating and maintaining this Go library and merry Christmas 🎄!

- introduce Broker.AsyncProduce with a callback to have a mostly non
  blocking way to produce to a single broker
- refactor Broker.send and Broker.responseReceiver to support callback
- factorize throttleTime metrics into Broker.updateThrottleMetric
- add Config.Produce.Pipeline parameter (defaults to false for backward
  compatibility)
- honor MaxOpenRequests when using AsyncProducer and
  conf.Produce.Pipeline is enabled
- add unit and functional tests
@dnwe
Copy link
Collaborator

dnwe commented Jan 13, 2022

@slaunay thank you for such a fantastic write-up and explanation

I had forgotten this PR when I went ahead and reviewed+merged #1686 which was an old PR that was seemingly trying to solve the same problem. Please could you rebase your PR atop the latest main. Feel free to resolve the conflicts by replacing the changes of #1686 with your own.

Then I will go ahead and review and merge.

@dnwe
Copy link
Collaborator

dnwe commented Jan 14, 2022

@slaunay oh, one other thing I was going to suggest is that we just go ahead and make pipelining the default behaviour. I think it's correct (as you mention) that we should match the Java client and as long as we flag it in the release notes and point out that MaxOpenRequests 1 can be used to restore the old behaviour we should be fine.

@slaunay
Copy link
Contributor Author

slaunay commented Jan 14, 2022

I had forgotten this PR when I went ahead and reviewed+merged #1686 which was an old PR that was seemingly trying to solve the same problem.

I saw that PR but it looked like it was stalled possibly because it tends to leak the Broker internals into the AsyncProducer.
But it essentially addresses the problem the same way indeed although by adding another go routine per brokerProducer and relying on a buffered pendings channel big enough to pipeline Produce requests.

I used commit fba46c9 (includes #1686) to build kafka-producer-performance and can confirm similar performance results against our infrastructure with the following respective extra flags (same baseline as above):

Target cluster Extra flags RTT (ms) Max records/s Max throughput (MiB/s) Max requests in flight
us-west-2 -partitioner manual
-partition 0
1 64,884.0 43.94 6
us-east-1 -partitioner manual
-partition 0
70 72,458.5 45.87 6
eu-central-1 -partitioner manual
-partition 0
140 32,824.3 25.87 6
eu-central-1 -partitioner manual
-partition 0
-max-open-requests 1
140 11,062.0 10.01 2
eu-central-1 default roundrobin partitioner
on 5 Brokers/128 partitions
140 133,439.3 65.88 11

oh, one other thing I was going to suggest is that we just go ahead and make pipelining the default behaviour. I think it's correct (as you mention) that we should match the Java client and as long as we flag it in the release notes and point out that MaxOpenRequests 1 can be used to restore the old behaviour we should be fine.

That makes sense to me too as long as users read the Changelog before upgrading to a new release 😉.

That being said using MaxOpenRequests 1 might not necessarily ensure ordering (see oddity above with metrics).
That is, I believe it might be possible to send a second Produce request before the response from the first request is fully read:

I think it is a separate issue that has always been there because the synchronization (using a buffered channel as a semaphore) happens after writing the request to the socket.

Please could you rebase your PR atop the latest main. Feel free to resolve the conflicts by replacing the changes of #1686 with your own.

Just to be clear, after resolving conflicts do you want me to?

  • remove the -enable-pipeline flag in kafka-producer-performance
  • remove the Config.Produce.Pipeline to make pipelining the default and remove the extra functional tests
  • keep the new Broker.AsyncProduce receiver and unit tests
  • use that receiver instead of the extra goroutine and pendings channel in brokerProducer

@dnwe
Copy link
Collaborator

dnwe commented Jan 14, 2022

@slaunay yep exactly right. Thanks again for the great contribution

@slaunay
Copy link
Contributor Author

slaunay commented Jan 18, 2022

I updated async_producer.go to use AsyncProduce and remove the Pipeline configuration bits.

Here are up to date performance results using kafka-producer-performance against our infrastructure with the following respective extra flags (same baseline as before):

Target cluster Extra flags RTT (ms) Max records/s Max throughput (MiB/s) Max requests in flight
us-west-2 -partitioner manual
-partition 0
1 83,533.7 52.49 6
us-east-1 -partitioner manual
-partition 0
70 67,800.0 45.08 6
eu-central-1 -partitioner manual
-partition 0
140 32,652.3 24.19 6
eu-central-1 -partitioner manual
-partition 0
-max-open-requests 1
140 11,462.7 10.13 2
eu-central-1 -partitioner manual
-partition 0
-required-acks 0
140 51,136.0 36.47 1
eu-central-1 default roundrobin partitioner
on 5 Brokers/128 partitions
140 177,886.2 75.70 19

The results are consistent with the initial changes.
I added an entry for using RequiredAcks = NoResponse which is technically close the max throughput over a single TCP connection when we do not have to wait for the broker to acknowledge the Produce request (minus TCP ACKs).

I made sure most of the changes have close to full coverage but I have been running into flaky test from functional_consumer_test.go.
Sometimes the docker containers are not ready to accept traffic when producing records leading to tons of goroutines stuck in retry.
I believe ordering might not always be honoured when producing to the test.1 topic:
https://github.com/Shopify/sarama/blob/fba46c96597c8aa81be2bec7ccfb54b7c2740eeb/functional_consumer_test.go#L217-L233

On top of using more than one in flight request now (in case one fails but others succeed), there is no guarantee that the SyncProducer will process each calls to SendMessage in order when using multiple goroutines.
The same probably applies to appending to the producedMessages slice using the producedMessagesMu lock.
Also calling t.Errorf will not work as expected as it must be called from the goroutine running the test.

Using a single synchronous call to SendMessages or else using the AsyncProducer (and waiting for all successes) would address the ordering issue (and also speed up the ingestion) if all Produce requests are succeeding.

@dnwe dnwe added the feat label Jan 18, 2022
Copy link
Collaborator

@dnwe dnwe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent, thanks again!

@twmb
Copy link

twmb commented Jan 19, 2022

Note sure I saw this mentioned anywhere, but seeing 16 requests in flight above: note that with idempotency, max requests in flight should not be more than 5. It's not well documented, but Kafka internally uses only a 5-wide idempotent deduplication window per partition.

@slaunay
Copy link
Contributor Author

slaunay commented Jan 19, 2022

Note sure I saw this mentioned anywhere, but seeing 16 requests in flight above

The 16 or 11 is actually the total concurrent in flight requests (all brokers combined):
https://github.com/Shopify/sarama/blob/49c453afa54c436d764f894bc0b75ede4ac0bdfb/sarama.go#L42-L43

In that particular scenario we write to 128 partitions lead by 5 brokers using the default roundrobin partitioner so it could in theory go up to 5 in-flight * 5 brokers = 25.
The metrics are also captured every 5 seconds using kafka-producer-performance so it's not the most accurate way to capture that information.

On the other scenarios, to show the impact of the changes we are relying on a single partition (-partitioner manual -partition 0) and therefore a single broker/TCP connection (assuming no leader election) from the same 5 brokers cluster.

note that with idempotency, max requests in flight should not be more than 5.

FWIU to use the producer idempotency feature you either need in Java land to set at least:

  1. max.in.flight.requests.per.connection = 1 when using Kafka 0.11.0.x.
  2. max.in.flight.requests.per.connection <= 5 when using Kafka 1.0.0+.

It actually looks like Sarama currently forces you to use 1.:
https://github.com/Shopify/sarama/blob/49c453afa54c436d764f894bc0b75ede4ac0bdfb/config.go#L691-L704

It's not well documented, but Kafka internally uses only a 5-wide idempotent deduplication window per partition.

It is not well documented indeed and quickly looking at the Kafka source code, it seems to me that the restriction is done on the client vs the broker:

I did test more than 5 in flights requests (against one broker) using idempotency and a custom version of kafka-producer-performance after altering config.go and was able to successfully produce to a 2.4.0 Kafka broker.
I'm not sure if the semantics changes when 5 is reached but having lots in flight requests is probably not a good idea anyway.

But I'm more concerned about setting Net.MaxOpenRequests to N and having actually up to N+1 in flight requests because of how the synchronization is done for limiting concurrent requests.
This means setting Net.MaxOpenRequests to 1 and using the AsyncProducer might result in up to 2 in-flight requests, leading to possible ordering issues or maybe an error with idempotency turn on against Kafka 0.11.0.x.

I believe the brokerProducer goroutine was previously limited to a single in flight requests being synchronous (even though calling broker.Produce concurrently from multiple goroutines could already result in up to 2 in flight requests when Net.MaxOpenRequests = 1).

The way forward is probably to:

  • introduce some synchronization prior to writing to the socket (vs when sending to the responses channel) to ensure that Net.MaxOpenRequests is really honoured.
  • allow Net.MaxOpenRequests <= 5 when Producer.Idempotent = true and Kafka version is at least 1.0.0 (should improve performance on a high latency network link).

niamster added a commit to niamster/sarama that referenced this pull request Mar 16, 2022
Since async producer now support multiple inflight messages
thanks to IBM#1686 and
IBM#2094, it now may "leak"
the "fin" internal management message to Kafka (and to the client)
when broker producer is reconnecting to Kafka broker and retries
multiple inflight messages at the same time.
niamster added a commit to niamster/sarama that referenced this pull request Mar 16, 2022
Since async producer now support multiple inflight messages
thanks to IBM#1686 and
IBM#2094, it now may "leak"
the "fin" internal management message to Kafka (and to the client)
when broker producer is reconnecting to Kafka broker and retries
multiple inflight messages at the same time.
niamster added a commit to niamster/sarama that referenced this pull request Mar 23, 2022
Since async producer now support multiple inflight messages
thanks to IBM#1686 and
IBM#2094, it now may "leak"
the "fin" internal management message to Kafka (and to the client)
when broker producer is reconnecting to Kafka broker and retries
multiple inflight messages at the same time.
dnwe pushed a commit that referenced this pull request Mar 30, 2022
…2182)

* producer: ensure that the management message (fin) is never "leaked"

Since async producer now support multiple inflight messages
thanks to #1686 and
#2094, it now may "leak"
the "fin" internal management message to Kafka (and to the client)
when broker producer is reconnecting to Kafka broker and retries
multiple inflight messages at the same time.

* test:async-producer: test broker restart (this fixes #2150)

* tests:async-producer: disable logger in TestAsyncProducerBrokerRestart

* tests:async-producer: protect leader with a mutex to make race detector happy

* test:async-producer: set 5mn default finish timeout

* async-producer: do not clear bp.currentRetries when fin message is received just after syn

* async-producer: use debug logger when fin message is handled for a healthy brokerProducer

* test:async-producer:restart: make emptyValues atomic to avoid races

* test:async-producer:restart: rename produceRequestTest to countRecordsWithEmptyValue

* test:async-producer:restart: reduce retry backoff timeout to speed up the test

* test:async-producer:restart: remove bogus 0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants