Skip to content

Commit

Permalink
[FLINK-32828] Properly initialize initial splits in WatermarkOutputMu…
Browse files Browse the repository at this point in the history
…ltiplexer

Without this fix, initial splits were registered in the multiplexer only
when first record from that split has been emitted. This was leading to
incorrectly emitted watermarks, as resulting watermark was not properly
combined from the initial splits, but only from the splits that have
already emitted at least one record.
  • Loading branch information
pnowojski committed May 17, 2024
1 parent 5aa02cb commit a160f8d
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr
private final SourceOperatorAvailabilityHelper availabilityHelper =
new SourceOperatorAvailabilityHelper();

private final List<SplitT> outputPendingSplits = new ArrayList<>();
private final List<SplitT> splitsToInitializeOutput = new ArrayList<>();

private int numSplits;
private final Map<String, Long> splitCurrentWatermarks = new HashMap<>();
Expand Down Expand Up @@ -343,6 +343,7 @@ public void open() throws Exception {
final List<SplitT> splits = CollectionUtil.iterableToList(readerState.get());
if (!splits.isEmpty()) {
LOG.info("Restoring state for {} split(s) to reader.", splits.size());
splitsToInitializeOutput.addAll(splits);
sourceReader.addSplits(splits);
}

Expand Down Expand Up @@ -465,7 +466,7 @@ private void initializeMainOutput(DataOutput<OUT> output) {
initializeLatencyMarkerEmitter(output);
lastInvokedOutput = output;
// Create per-split output for pending splits added before main output is initialized
createOutputForSplits(outputPendingSplits);
createOutputForSplits(splitsToInitializeOutput);
this.operatingMode = OperatingMode.READING;
}

Expand Down Expand Up @@ -592,7 +593,7 @@ private void handleAddSplitsEvent(AddSplitEvent<SplitT> event) {
// For splits arrived before the main output is initialized, store them into the
// pending list. Outputs of these splits will be created once the main output is
// ready.
outputPendingSplits.addAll(newSplits);
splitsToInitializeOutput.addAll(newSplits);
} else {
// Create output directly for new splits if the main output is already initialized.
createOutputForSplits(newSplits);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ private void assertNoReportedWatermarkEvent(SourceOperatorTestContext context) {
assertThat(events).isEmpty();
}

private static class PunctuatedGenerator implements WatermarkGenerator<Integer> {
static class PunctuatedGenerator implements WatermarkGenerator<Integer> {

private enum GenerationMode {
ALL,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package org.apache.flink.streaming.api.operators;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.streaming.api.operators.source.CollectingDataOutput;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.DataInputStatus;
import org.apache.flink.streaming.util.MockOutput;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;

/** Unit test for {@link SourceOperator} watermark alignment. */
@SuppressWarnings("serial")
class SourceOperatorWatermarksTest {

@Nullable private SourceOperatorTestContext context;
@Nullable private SourceOperator<Integer, MockSourceSplit> operator;

@BeforeEach
void setup() throws Exception {
context =
new SourceOperatorTestContext(
false,
true,
WatermarkStrategy.forGenerator(
ctx ->
new SourceOperatorAlignmentTest
.PunctuatedGenerator())
.withTimestampAssigner((r, t) -> r),
new MockOutput<>(new ArrayList<>()));
operator = context.getOperator();
}

@AfterEach
void tearDown() throws Exception {
context.close();
context = null;
operator = null;
}

@Test
void testPerPartitionWatermarksAfterRecovery() throws Exception {
List<MockSourceSplit> initialSplits = new ArrayList<>();
initialSplits.add(new MockSourceSplit(0).addRecord(1042).addRecord(1044));
initialSplits.add(new MockSourceSplit(1).addRecord(42).addRecord(44));
operator.initializeState(context.createStateContext(initialSplits));
operator.open();

CollectingDataOutput<Integer> actualOutput = new CollectingDataOutput<>();

// after emitting the first element from the first split, there can not be watermark
// emitted, as a watermark from the other split is still unknown.
assertThat(operator.emitNext(actualOutput)).isEqualTo(DataInputStatus.MORE_AVAILABLE);
assertNoWatermarks(actualOutput);

// after emitting two more elements (in this order: [1042, 1044, 42] but order doesn't
// matter for this test), three in total, watermark 42 can be finally emitted
assertThat(operator.emitNext(actualOutput)).isEqualTo(DataInputStatus.MORE_AVAILABLE);
assertThat(operator.emitNext(actualOutput)).isEqualTo(DataInputStatus.MORE_AVAILABLE);
assertWatermark(actualOutput, new Watermark(42));
}

private static void assertNoWatermarks(CollectingDataOutput<Integer> actualOutput) {
assertThat(actualOutput.getEvents()).noneMatch(element -> element instanceof Watermark);
}

private void assertWatermark(CollectingDataOutput<Integer> actualOutput, Watermark watermark) {
assertThat(actualOutput.getEvents()).containsOnlyOnce(watermark);
}
}

0 comments on commit a160f8d

Please sign in to comment.