Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream reorder example. Issue #179 #411

Open
wants to merge 1,275 commits into
base: 7.0.0-post
Choose a base branch
from

Conversation

sshcherbakov
Copy link

Example of reordering out of order messages using a stateful transformer.

Copy link
Contributor

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @sshcherbakov looks good overall, I just have a couple of comments.

*/
public class ReorderIntegrationTest {

public static class ReorderTransfomer<K, V>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: spelling RecordTransfomer -> RecordTransformer

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

* <p>
* Makes sense only on per partition basis.
* <p>
* Reordering occurs within time windows defined by the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think you meant to say "....defined by the provided grace

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Amended

*/
@Override
public KeyValue<K, V> transform(final K key, final V value) {
final long ts = timestampExtractor.getTimestamp(value);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't used, unless you want to provide this for demo purposes - so maybe add a comment?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Kafka Streams TimestampExtractor could have been reused to illustrate pulling the timestamp from the payload, but the method signature is not convenient. Instead of duplicating TimestampExtractor interface with convenient signature removed removed this piece to reduce confusion.

* @param timestamp – stream time of the punctuate function call
*/
void punctuate(final long timestamp) {
try(KeyValueIterator<K, V> it = reorderStore.all()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comments state to use range(from, to), but the code uses all() since the code deletes everything in the store after forwarding, I think it will continue to work as-is, but you should update the comments.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated comments

@CLAassistant
Copy link

CLAassistant commented Apr 3, 2022

CLA assistant check
All committers have signed the CLA.

ConfluentJenkins and others added 25 commits December 28, 2022 03:00
@sshcherbakov
Copy link
Author

Rebased PR on top of the current master branch. I cannot build master but I checked the test with the latest release branch 7.3.1-post and it still passes.

try(KeyValueIterator<K, V> it = reorderStore.all()) {
while (it.hasNext()) {
final KeyValue<K, V> kv = it.next();
context.forward(kv.key, kv.value);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this implementation is flawed. Ordering only makes sense per partition/key.

Say incoming records have key k1, and there are two messages with timestamps t0, and t1. The StoreKeyGenerator will map that into some combination of the original key and the record timestamp, say something simple like t0_k1 and t1_k1. However, when you forward these message with the "composite" store key, they'll likely end up in two different partitions. Hence, although you sorted them, any downstream processor/consumer has no guarantee in the order after the re-partition. The correct thing to do would be to preserve the original key, and use that to forward the message, while using the composite key to store/sort/dedup records.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is an example. It can be extended with either additional store for composite to original key mapping or a second interface that would allow extracting original key from the composite one before passing downstream.

final K storeKey = storeKeyGenerator.getStoreKey(key, value);
final V storeValue = reorderStore.get(storeKey);
if(storeValue == null) {
reorderStore.put(storeKey, value);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

putIfAbsent?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean pitIfAbsent() is equivalent or it has different logic?
In its description: "Update the value associated with this key, unless a value is already associated with the key."
Before I start interpreting that and guessing whether it will only put my value where there is no key or even when key is there but the value is null, I write my intent explicitly through additional "if" line here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants