Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix partitions initial offsets in Kafka connector #25769

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -221,7 +221,11 @@ private void seekToInitialOffsets(Collection<TopicPartition> newAssignments) {
}
long[] topicOffsets = offsets.get(topicPartition.topic());
assert topicOffsets != null && topicOffsets.length > partition;
topicOffsets[partition] = initialOffset;

// we need to decrement the initialOffset value before putting it into the array,
// because the record we want to start reading from has not yet been consumed
topicOffsets[partition] = initialOffset - 1;

getLogger().info("Seeking to specified initial offset: " + initialOffset
+ " of topic-partition: " + topicPartition);
consumer.seek(topicPartition, initialOffset);
Expand Down
Expand Up @@ -28,7 +28,6 @@
import com.hazelcast.jet.config.ProcessingGuarantee;
import com.hazelcast.jet.core.BroadcastKey;
import com.hazelcast.jet.core.EventTimePolicy;
import com.hazelcast.jet.core.JobStatus;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.Watermark;
import com.hazelcast.jet.core.test.TestInbox;
Expand Down Expand Up @@ -82,7 +81,9 @@
import static com.hazelcast.jet.core.JobStatus.RUNNING;
import static com.hazelcast.jet.core.JobStatus.SUSPENDED;
import static com.hazelcast.jet.core.WatermarkPolicy.limitingLag;
import static com.hazelcast.jet.datamodel.Tuple2.tuple2;
import static com.hazelcast.jet.impl.execution.WatermarkCoalescer.IDLE_MESSAGE;
import static java.lang.System.currentTimeMillis;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
Expand Down Expand Up @@ -208,7 +209,7 @@ private void testWithPartitionsInitialOffsets(

Pipeline p = Pipeline.create();
p.readFrom(KafkaSources.<Integer, String, Tuple2<String, String>>kafka(
properties(), r -> Tuple2.tuple2(r.value(), r.topic()), topicsConfig
properties(), r -> tuple2(r.value(), r.topic()), topicsConfig
))
.withoutTimestamps()
.writeTo(Sinks.list(sinkListName));
Expand All @@ -230,6 +231,57 @@ private void testWithPartitionsInitialOffsets(
.isEqualTo(expectedRecordsReadFromTopic2);
}

@Test
public void when_processingGuaranteeAtLeastOnceAndJobResumedAfterSuspension_then_readFromPartitionsInitialOffsets() {
testSuspendResumeWithPartitionInitialOffsets(10, AT_LEAST_ONCE);
}

@Test
public void when_processingExactlyOnceAndJobResumedAfterSuspension_then_readFromPartitionsInitialOffsets() {
testSuspendResumeWithPartitionInitialOffsets(20, EXACTLY_ONCE);
}

private void testSuspendResumeWithPartitionInitialOffsets(int recordsCount, ProcessingGuarantee processingGuarantee) {
String sinkListName = randomName();

// produce a batch of records into single partition
for (int i = 0; i < recordsCount; i++) {
kafkaTestSupport.produce(topic1Name, 0, currentTimeMillis(), i, String.valueOf(i));
}

// skip all records that exists in given kafka topic's partition before the job starts
TopicsConfig topicsConfig = new TopicsConfig()
.addTopicConfig(new TopicConfig(topic1Name)
.addPartitionInitialOffset(0, recordsCount));

Pipeline p = Pipeline.create();
p.readFrom(KafkaSources.<Integer, String, Tuple2<String, String>>kafka(
properties(), r -> tuple2(r.value(), r.topic()), topicsConfig
))
.withoutTimestamps()
.writeTo(Sinks.list(sinkListName));

Job job = instance().getJet().newJob(p, new JobConfig().setProcessingGuarantee(processingGuarantee));
sleepAtLeastSeconds(3);

// make sure nothing was consumed from the topic due to initialOffset
assertTrueEventually(() -> assertEquals(0, instance().getList(sinkListName).size()), 5);
job.suspend();
assertJobStatusEventually(job, SUSPENDED);

job.resume();
assertJobStatusEventually(job, RUNNING);

// produce another batch of records
for (int i = recordsCount; i < 2 * recordsCount; i++) {
kafkaTestSupport.produce(topic1Name, i, String.valueOf(i));
}
sleepAtLeastSeconds(3);

// make sure only newly produced records were consumed from the topic
assertTrueEventually(() -> assertEquals(recordsCount, instance().getList(sinkListName).size()), 5);
}

@Test
public void when_processingGuaranteeNone_then_continueFromBeginningAfterJobRestart() {
TopicsConfig topicsConfig = new TopicsConfig().addTopicConfig(new TopicConfig(topic1Name));
Expand Down Expand Up @@ -325,7 +377,7 @@ private void testWithJobRestart(
) {
String sinkListName = randomName();
for (int i = 0; i < messageCount; i++) {
kafkaTestSupport.produceSync(topic1Name, i, String.valueOf(i));
kafkaTestSupport.produceSync(topic1Name, i, String.valueOf(i));
}
Pipeline p = Pipeline.create();
p.readFrom(KafkaSources.<Integer, String, String>kafka(kafkaProperties, ConsumerRecord::value, topicsConfig))
Expand Down Expand Up @@ -582,7 +634,7 @@ private <T> StreamKafkaP<Integer, String, T> createProcessor(
ToLongFunctionEx<T> timestampFn = e ->
e instanceof Entry
? (int) ((Entry) e).getKey()
: System.currentTimeMillis();
: currentTimeMillis();
EventTimePolicy<T> eventTimePolicy = eventTimePolicy(
timestampFn, limitingLag(LAG), 1, 0, idleTimeoutMillis);
return new StreamKafkaP<>((c) -> new KafkaConsumer<>(properties), topicsConfig, projectionFn, eventTimePolicy);
Expand Down Expand Up @@ -688,7 +740,7 @@ public void when_partitionAddedWhileJobDown_then_consumedFromBeginning() throws
assertEquals(entry(0, "0"), sinkList.get(0));
});
job.suspend();
assertJobStatusEventually(job, JobStatus.SUSPENDED);
assertJobStatusEventually(job, SUSPENDED);
// Note that the job might not have consumed all the zeroes from the topic at this point

// When
Expand Down