-
Notifications
You must be signed in to change notification settings - Fork 13.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-16770; [1/N] Coalesce records into bigger batches #15964
Conversation
@@ -1072,8 +1174,8 @@ public void replay( | |||
@Test | |||
public void testScheduleWriteOpWhenWriteFails() { | |||
MockTimer timer = new MockTimer(); | |||
// The partition writer only accept on write. | |||
MockPartitionWriter writer = new MockPartitionWriter(2); | |||
// The partition writer only accept one write. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for my understanding, we always batched the (in this case 2) records that were part of the same write operation. For now we aren't changing this, but moving the logic to the coordinator runtime to make space for the batching logic as a followup?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You got it right. A write operation produces a single batch with all the records generated by it. This patch does not change it but change where the memory record is built. The next patch will add the logic to keep the batch open until full or until a linger time is reached. With this, records produced by many write operations will end up in the same batch.
I took a first pass to get a general understanding. I will come back tomorrow and take a deeper dive in some of the minor changes and let you know if i think of anything missed. |
result | ||
VerificationGuard.SENTINEL, | ||
MemoryRecords.withEndTransactionMarker( | ||
time.milliseconds(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems we didn't specify this time value before. Was that a bug? I guess it also just gets the system time in the method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
withEndTransactionMarker
takes the current time if we don't specify it. The reason why I set it explicitly here is to ensure that the mock time is used in tests.
byte magic = logConfig.recordVersion().value; | ||
int maxBatchSize = logConfig.maxMessageSize(); | ||
long currentTimeMs = time.milliseconds(); | ||
ByteBuffer buffer = context.bufferSupplier.get(Math.min(16384, maxBatchSize)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice we got rid of the thread local. 👍
// coordinator is the single writer to the underlying partition so we can | ||
// deduce it like this. | ||
for (int i = 0; i < result.records().size(); i++) { | ||
MemoryRecordsBuilder builder = MemoryRecords.builder( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: is there a benefit from putting this here and not right before the append method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The builder
is used in the above loop (L801) so we need it here.
|
||
/** | ||
* Listener allowing to listen to high watermark changes. This is meant | ||
* to be used in conjunction with {{@link PartitionWriter#append(TopicPartition, List)}}. | ||
* to be used in conjunction with {{@link PartitionWriter#append(TopicPartition, VerificationGuard, MemoryRecords)}}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a programatic way to check if these links are broken due to refactoring, or do you need to do it manually?
Just wondering if there is an easy way to check you did them all :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Intellij reports them as warning. I suppose that we would get warning when we generate the javadoc too.
*/ | ||
public class InMemoryPartitionWriter<T> implements PartitionWriter<T> { | ||
|
||
public static class LogEntry { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice that we could just use the real memory records
@@ -84,98 +67,28 @@ class CoordinatorPartitionWriterTest { | |||
} | |||
|
|||
@Test | |||
def testWriteRecords(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have an equivalent test for the writing of the records in CoordinatorRuntimeTest? I didn't really notice new tests, but saw we have some of the builder logic there. Is it tested by checking equality between the records generated by the helper methods and the output from running the CoordinatorRuntime code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. We have many tests in CoordinatorRuntimeTest doing writes. As we fully validate the records now, they cover this.
@jolshan Thanks for your comments. I replied to them. |
We have discovered during large scale performance tests that the current write path of the new coordinator does not scale well. The issue is that each write operation writes synchronously from the coordinator threads. Coalescing records into bigger batches helps drastically because it amortizes the cost of writes. Aligning the batches with the snapshots of the timelines data structures also reduces the number of in-flight snapshots. This patch is the first of a series of patches that will bring records coalescing into the coordinator runtime. As a first step, we had to rework the PartitionWriter interface and move the logic to build MemoryRecords from it to the CoordinatorRuntime. The main changes are in these two classes. The others are related mechanical changes. Reviewers: Justine Olshan <jolshan@confluent.io>
We have discovered during large scale performance tests that the current write path of the new coordinator does not scale well. The issue is that each write operation writes synchronously from the coordinator threads. Coalescing records into bigger batches helps drastically because it amortizes the cost of writes. Aligning the batches with the snapshots of the timelines data structures also reduces the number of in-flight snapshots.
This patch is the first of a series of patches that will bring records coalescing into the coordinator runtime. As a first step, we had to rework the PartitionWriter interface and move the logic to build MemoryRecords from it to the CoordinatorRuntime. The main changes are in these two classes. The others are related mechanical changes.
Committer Checklist (excluded from commit message)