Skip to content

Commit

Permalink
[FLINK-35030][runtime] Introduce Epoch Manager for under async execut…
Browse files Browse the repository at this point in the history
…ion (#24748)
  • Loading branch information
fredia committed May 17, 2024
1 parent cc21eec commit f1ecb9e
Show file tree
Hide file tree
Showing 11 changed files with 484 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.api.common.state.v2.State;
import org.apache.flink.core.state.InternalStateFuture;
import org.apache.flink.core.state.StateFutureImpl.AsyncFrameworkExceptionHandler;
import org.apache.flink.runtime.asyncprocessing.EpochManager.ParallelMode;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingRunnable;
Expand Down Expand Up @@ -76,6 +77,9 @@ public class AsyncExecutionController<K> implements StateRequestHandler {
*/
private final MailboxExecutor mailboxExecutor;

/** Exception handler to handle the exception thrown by asynchronous framework. */
private final AsyncFrameworkExceptionHandler exceptionHandler;

/** The key accounting unit which is used to detect the key conflict. */
final KeyAccountingUnit<K> keyAccountingUnit;

Expand All @@ -86,7 +90,7 @@ public class AsyncExecutionController<K> implements StateRequestHandler {
private final StateFutureFactory<K> stateFutureFactory;

/** The state executor where the {@link StateRequest} is actually executed. */
StateExecutor stateExecutor;
final StateExecutor stateExecutor;

/** The corresponding context that currently runs in task thread. */
RecordContext<K> currentContext;
Expand All @@ -102,6 +106,15 @@ public class AsyncExecutionController<K> implements StateRequestHandler {
/** Max parallelism of the job. */
private final int maxParallelism;

/** The reference of epoch manager. */
final EpochManager epochManager;

/**
* The parallel mode of epoch execution. Keep this field internal for now, until we could see
* the concrete need for {@link ParallelMode#PARALLEL_BETWEEN_EPOCH} from average users.
*/
final ParallelMode epochParallelMode = ParallelMode.SERIAL_BETWEEN_EPOCH;

public AsyncExecutionController(
MailboxExecutor mailboxExecutor,
AsyncFrameworkExceptionHandler exceptionHandler,
Expand All @@ -112,6 +125,7 @@ public AsyncExecutionController(
int maxInFlightRecords) {
this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords);
this.mailboxExecutor = mailboxExecutor;
this.exceptionHandler = exceptionHandler;
this.stateFutureFactory = new StateFutureFactory<>(this, mailboxExecutor, exceptionHandler);
this.stateExecutor = stateExecutor;
this.batchSize = batchSize;
Expand All @@ -131,11 +145,13 @@ public AsyncExecutionController(
},
"AEC-buffer-timeout"));

this.epochManager = new EpochManager(this);
LOG.info(
"Create AsyncExecutionController: batchSize {}, bufferTimeout {}, maxInFlightRecordNum {}",
"Create AsyncExecutionController: batchSize {}, bufferTimeout {}, maxInFlightRecordNum {}, epochParallelMode {}",
this.batchSize,
this.bufferTimeout,
this.maxInFlightRecordNum);
this.maxInFlightRecordNum,
this.epochParallelMode);
}

/**
Expand All @@ -152,13 +168,15 @@ public RecordContext<K> buildContext(Object record, K key) {
RecordContext.EMPTY_RECORD,
key,
this::disposeContext,
KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism));
KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism),
epochManager.onRecord());
}
return new RecordContext<>(
record,
key,
this::disposeContext,
KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism));
KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism),
epochManager.onRecord());
}

/**
Expand All @@ -177,6 +195,7 @@ public void setCurrentContext(RecordContext<K> switchingContext) {
* @param toDispose the context to dispose.
*/
void disposeContext(RecordContext<K> toDispose) {
epochManager.completeOneRecord(toDispose.getEpoch());
keyAccountingUnit.release(toDispose.getRecord(), toDispose.getKey());
inFlightRecordNum.decrementAndGet();
RecordContext<K> nextRecordCtx =
Expand Down Expand Up @@ -311,6 +330,18 @@ public void drainInflightRecords(int targetNum) {
}
}

public void processNonRecord(ThrowingRunnable<? extends Exception> action) {
Runnable wrappedAction =
() -> {
try {
action.run();
} catch (Exception e) {
exceptionHandler.handleException("Failed to process non-record.", e);
}
};
epochManager.onNonRecord(wrappedAction, epochParallelMode);
}

@VisibleForTesting
public StateExecutor getStateExecutor() {
return stateExecutor;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.asyncprocessing;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.util.LinkedList;

/**
* Epoch manager segments inputs into distinct epochs, marked by the arrival of non-records(e.g.
* watermark, record attributes). Records are assigned to a unique epoch based on their arrival,
* records within an epoch are allowed to be parallelized, while the non-record of an epoch can only
* be executed when all records in this epoch have finished.
*
* <p>For more details please refer to FLIP-425.
*/
public class EpochManager {
private static final Logger LOG = LoggerFactory.getLogger(EpochManager.class);

/**
* This enum defines whether parallel execution between epochs is allowed. We should keep this
* internal and away from API module for now, until we could see the concrete need for {@link
* #PARALLEL_BETWEEN_EPOCH} from average users.
*/
public enum ParallelMode {
/**
* Subsequent epochs must wait until the previous epoch is completed before they can start.
*/
SERIAL_BETWEEN_EPOCH,
/**
* Subsequent epochs can begin execution even if the previous epoch has not yet completed.
* Usually performs better than {@link #SERIAL_BETWEEN_EPOCH}.
*/
PARALLEL_BETWEEN_EPOCH
}

/**
* The reference to the {@link AsyncExecutionController}, used for {@link
* ParallelMode#SERIAL_BETWEEN_EPOCH}. Can be null when testing.
*/
final AsyncExecutionController<?> asyncExecutionController;

/** The number of epochs that have arrived. */
long epochNum;

/** The output queue to hold ongoing epochs. */
LinkedList<Epoch> outputQueue;

/** Current active epoch, only one active epoch at the same time. */
Epoch activeEpoch;

public EpochManager(AsyncExecutionController<?> aec) {
this.epochNum = 0;
this.outputQueue = new LinkedList<>();
this.asyncExecutionController = aec;
// init an empty epoch, the epoch action will be updated when non-record is received.
this.activeEpoch = new Epoch(epochNum++);
}

/**
* Add a record to the current epoch and return the current open epoch, the epoch will be
* associated with the {@link RecordContext} of this record. Must be invoked within task thread.
*
* @return the current open epoch.
*/
public Epoch onRecord() {
activeEpoch.ongoingRecordCount++;
return activeEpoch;
}

/**
* Add a non-record to the current epoch, close current epoch and open a new epoch. Must be
* invoked within task thread.
*
* @param action the action associated with this non-record.
* @param parallelMode the parallel mode for this epoch.
*/
public void onNonRecord(Runnable action, ParallelMode parallelMode) {
LOG.trace(
"on NonRecord, old epoch: {}, outputQueue size: {}",
activeEpoch,
outputQueue.size());
switchActiveEpoch(action);
if (parallelMode == ParallelMode.SERIAL_BETWEEN_EPOCH) {
asyncExecutionController.drainInflightRecords(0);
}
}

/**
* Complete one record in the specific epoch. Must be invoked within task thread.
*
* @param epoch the specific epoch
*/
public void completeOneRecord(Epoch epoch) {
if (--epoch.ongoingRecordCount == 0) {
tryFinishInQueue();
}
}

private void tryFinishInQueue() {
// If one epoch has been closed before and all records in
// this epoch have finished, the epoch will be removed from the output queue.
while (!outputQueue.isEmpty() && outputQueue.peek().tryFinish()) {
LOG.trace(
"Finish epoch: {}, outputQueue size: {}",
outputQueue.peek(),
outputQueue.size());
outputQueue.pop();
}
}

private void switchActiveEpoch(Runnable action) {
activeEpoch.close(action);
outputQueue.offer(activeEpoch);
this.activeEpoch = new Epoch(epochNum++);
tryFinishInQueue();
}

/** The status of an epoch, see Fig.6 in FLIP-425 for details. */
enum EpochStatus {
/**
* The subsequent non-record input has not arrived. So arriving records will be collected
* into current epoch.
*/
OPEN,
/**
* The records belong to this epoch is settled since the following non-record input has
* arrived, the newly arriving records would be collected into the next epoch.
*/
CLOSED,
/**
* One epoch can only be finished when it meets the following three conditions. 1. The
* records of this epoch have finished execution. 2. The epoch is closed. 3. The epoch is in
* the front of outputQueue.
*/
FINISHED
}

/**
* All inputs are segment into distinct epochs, marked by the arrival of non-record inputs.
* Records are assigned to a unique epoch based on their arrival.
*/
public static class Epoch {
/** The id of this epoch for easy debugging. */
long id;
/** The number of records that are still ongoing in this epoch. */
int ongoingRecordCount;

/** The action associated with non-record of this epoch(e.g. advance watermark). */
@Nullable Runnable action;

EpochStatus status;

public Epoch(long id) {
this.id = id;
this.ongoingRecordCount = 0;
this.status = EpochStatus.OPEN;
this.action = null;
}

/**
* Try to finish this epoch.
*
* @return whether this epoch has been finished.
*/
boolean tryFinish() {
if (this.status == EpochStatus.FINISHED) {
return true;
}
if (ongoingRecordCount == 0 && this.status == EpochStatus.CLOSED) {
this.status = EpochStatus.FINISHED;
if (action != null) {
action.run();
}
return true;
}
return false;
}

/** Close this epoch. */
void close(Runnable action) {
this.action = action;
this.status = EpochStatus.CLOSED;
}

public String toString() {
return String.format(
"Epoch{id=%d, ongoingRecord=%d, status=%s}", id, ongoingRecordCount, status);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.flink.runtime.asyncprocessing;

import org.apache.flink.runtime.asyncprocessing.EpochManager.Epoch;

import javax.annotation.Nullable;

import java.util.Objects;
Expand Down Expand Up @@ -46,7 +48,7 @@ public class RecordContext<K> extends ReferenceCounted<RecordContext.DisposerRun

/**
* The disposer for disposing this context. This should be invoked in {@link
* #referenceCountReachedZero()}, which may be called once the ref count reaches zero in any
* #referenceCountReachedZero}, which may be called once the ref count reaches zero in any
* thread.
*/
private final Consumer<RecordContext<K>> disposer;
Expand All @@ -61,13 +63,18 @@ public class RecordContext<K> extends ReferenceCounted<RecordContext.DisposerRun
*/
private @Nullable volatile Object extra;

public RecordContext(Object record, K key, Consumer<RecordContext<K>> disposer, int keyGroup) {
/** The epoch of this context. */
private final Epoch epoch;

public RecordContext(
Object record, K key, Consumer<RecordContext<K>> disposer, int keyGroup, Epoch epoch) {
super(0);
this.record = record;
this.key = key;
this.keyOccupied = false;
this.disposer = disposer;
this.keyGroup = keyGroup;
this.epoch = epoch;
}

public Object getRecord() {
Expand Down Expand Up @@ -112,6 +119,10 @@ public Object getExtra() {
return extra;
}

public Epoch getEpoch() {
return epoch;
}

@Override
public int hashCode() {
return Objects.hash(record, key);
Expand All @@ -129,6 +140,12 @@ public boolean equals(Object o) {
if (!Objects.equals(record, that.record)) {
return false;
}
if (!Objects.equals(keyGroup, that.keyGroup)) {
return false;
}
if (!Objects.equals(epoch, that.epoch)) {
return false;
}
return Objects.equals(key, that.key);
}

Expand All @@ -143,6 +160,8 @@ public String toString() {
+ keyOccupied
+ ", ref="
+ getReferenceCount()
+ ", epoch="
+ epoch.id
+ "}";
}

Expand Down

0 comments on commit f1ecb9e

Please sign in to comment.