Skip to content

Commit

Permalink
[FLINK-35346][table-common] Introduce workflow scheduler interface fo…
Browse files Browse the repository at this point in the history
…r materialized table

[FLINK-35346][table-common] Introduce workflow scheduler interface for materialized table

This closes #24767
  • Loading branch information
lsyldliu committed May 16, 2024
1 parent 48c14fe commit 1378979
Show file tree
Hide file tree
Showing 12 changed files with 782 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,13 @@ public final class FactoryUtil {
+ "tasks to advance their watermarks without the need to wait for "
+ "watermarks from this source while it is idle.");

public static final ConfigOption<String> WORKFLOW_SCHEDULER_TYPE =
ConfigOptions.key("workflow-scheduler.type")
.stringType()
.noDefaultValue()
.withDescription(
"Specify the workflow scheduler type that is used for materialized table.");

/**
* Suffix for keys of {@link ConfigOption} in case a connector requires multiple formats (e.g.
* for both key and value).
Expand Down Expand Up @@ -903,7 +910,7 @@ static List<Factory> discoverFactories(ClassLoader classLoader) {
return loadResults;
}

private static String stringifyOption(String key, String value) {
public static String stringifyOption(String key, String value) {
if (GlobalConfiguration.isSensitive(key)) {
value = HIDDEN_CONTENT;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.factories;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.workflow.WorkflowScheduler;

import java.util.Map;

/**
* A factory to create a {@link WorkflowScheduler} instance.
*
* <p>See {@link Factory} for more information about the general design of a factory.
*/
@PublicEvolving
public interface WorkflowSchedulerFactory extends Factory {

/** Create a workflow scheduler instance which interacts with external scheduler service. */
WorkflowScheduler<?> createWorkflowScheduler(Context context);

/** Context provided when a workflow scheduler is created. */
@PublicEvolving
interface Context {

/** Gives the config option to create {@link WorkflowScheduler}. */
ReadableConfig getConfiguration();

/**
* Returns the options with which the workflow scheduler is created. All options that are
* prefixed with the workflow scheduler identifier are included in the map.
*
* <p>All the keys in the options are pruned with the prefix. For example, the option {@code
* workflow-scheduler.airflow.endpoint}'s key is {@code endpoint} in the map.
*
* <p>An implementation should perform validation of these options.
*/
Map<String, String> getWorkflowSchedulerOptions();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.factories;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DelegatingConfiguration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.workflow.WorkflowScheduler;
import org.apache.flink.util.StringUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION;
import static org.apache.flink.table.factories.FactoryUtil.WORKFLOW_SCHEDULER_TYPE;
import static org.apache.flink.table.factories.FactoryUtil.stringifyOption;

/** Utility for working with {@link WorkflowScheduler}. */
@PublicEvolving
public class WorkflowSchedulerFactoryUtil {

private static final Logger LOG = LoggerFactory.getLogger(WorkflowSchedulerFactoryUtil.class);

public static final String WORKFLOW_SCHEDULER_PREFIX = "workflow-scheduler";

private WorkflowSchedulerFactoryUtil() {
// no instantiation
}

/**
* Attempts to discover the appropriate workflow scheduler factory and creates the instance of
* the scheduler. Return null directly if doesn't specify the workflow scheduler in config
* because it is optional for materialized table.
*/
public static @Nullable WorkflowScheduler<?> createWorkflowScheduler(
Configuration configuration, ClassLoader classLoader) {
// Workflow scheduler identifier
String identifier = configuration.get(WORKFLOW_SCHEDULER_TYPE);
if (StringUtils.isNullOrWhitespaceOnly(identifier)) {
LOG.warn(
"Workflow scheduler options do not contain an option key '%s' for discovering an workflow scheduler.");
return null;
}

try {
final WorkflowSchedulerFactory factory =
FactoryUtil.discoverFactory(
classLoader, WorkflowSchedulerFactory.class, identifier);
return factory.createWorkflowScheduler(
new DefaultWorkflowSchedulerContext(
configuration, getWorkflowSchedulerConfig(configuration, identifier)));
} catch (Throwable t) {
throw new ValidationException(
String.format(
"Error creating workflow scheduler '%s' in option space '%s'.",
identifier,
configuration.toMap().entrySet().stream()
.map(
optionEntry ->
stringifyOption(
optionEntry.getKey(),
optionEntry.getValue()))
.sorted()
.collect(Collectors.joining("\n"))),
t);
}
}

private static Map<String, String> getWorkflowSchedulerConfig(
Configuration flinkConf, String identifier) {
return new DelegatingConfiguration(flinkConf, getWorkflowSchedulerOptionPrefix(identifier))
.toMap();
}

private static String getWorkflowSchedulerOptionPrefix(String identifier) {
return String.format("%s.%s.", WORKFLOW_SCHEDULER_PREFIX, identifier);
}

/**
* Creates a utility that helps to validate options for a {@link WorkflowSchedulerFactory}.
*
* <p>Note: This utility checks for left-over options in the final step.
*/
public static WorkflowSchedulerFactoryHelper createWorkflowSchedulerFactoryHelper(
WorkflowSchedulerFactory workflowSchedulerFactory,
WorkflowSchedulerFactory.Context context) {
return new WorkflowSchedulerFactoryHelper(
workflowSchedulerFactory, context.getWorkflowSchedulerOptions());
}

/**
* Helper utility for validating all options for a {@link WorkflowSchedulerFactory}.
*
* @see #createWorkflowSchedulerFactoryHelper(WorkflowSchedulerFactory,
* WorkflowSchedulerFactory.Context)
*/
@PublicEvolving
public static class WorkflowSchedulerFactoryHelper
extends FactoryUtil.FactoryHelper<WorkflowSchedulerFactory> {

public WorkflowSchedulerFactoryHelper(
WorkflowSchedulerFactory workflowSchedulerFactory,
Map<String, String> configOptions) {
super(workflowSchedulerFactory, configOptions, PROPERTY_VERSION);
}
}

/** Default implementation of {@link WorkflowSchedulerFactory.Context}. */
@Internal
public static class DefaultWorkflowSchedulerContext
implements WorkflowSchedulerFactory.Context {

private final ReadableConfig configuration;
private final Map<String, String> workflowSchedulerConfig;

public DefaultWorkflowSchedulerContext(
ReadableConfig configuration, Map<String, String> workflowSchedulerConfig) {
this.configuration = configuration;
this.workflowSchedulerConfig = workflowSchedulerConfig;
}

@Override
public ReadableConfig getConfiguration() {
return configuration;
}

@Override
public Map<String, String> getWorkflowSchedulerOptions() {
return workflowSchedulerConfig;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.workflow;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.catalog.CatalogMaterializedTable;

/**
* {@link CreateRefreshWorkflow} provides the related information to create refresh workflow of
* {@link CatalogMaterializedTable}.
*/
@PublicEvolving
public interface CreateRefreshWorkflow extends RefreshWorkflow {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.workflow;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.catalog.CatalogMaterializedTable;
import org.apache.flink.table.refresh.RefreshHandler;

/**
* {@link DeleteRefreshWorkflow} provides the related information to delete refresh workflow of
* {@link CatalogMaterializedTable}.
*
* @param <T> The type of {@link RefreshHandler} used by specific {@link WorkflowScheduler} to
* locate the refresh workflow in scheduler service.
*/
@PublicEvolving
public class DeleteRefreshWorkflow<T extends RefreshHandler> implements RefreshWorkflow {

private final T refreshHandler;

public DeleteRefreshWorkflow(T refreshHandler) {
this.refreshHandler = refreshHandler;
}

/**
* Return {@link RefreshHandler} from corresponding {@link WorkflowScheduler} which provides
* meta info to points to the refresh workflow in scheduler service.
*/
public T getRefreshHandler() {
return refreshHandler;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.workflow;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.catalog.CatalogMaterializedTable;
import org.apache.flink.table.refresh.RefreshHandler;

/**
* {@link ModifyRefreshWorkflow} provides the related information to modify refresh workflow of
* {@link CatalogMaterializedTable}.
*
* @param <T> The type of {@link RefreshHandler} used by specific {@link WorkflowScheduler} to
* locate the refresh workflow in scheduler service.
*/
@PublicEvolving
public interface ModifyRefreshWorkflow<T extends RefreshHandler> extends RefreshWorkflow {

/**
* Return {@link RefreshHandler} from corresponding {@link WorkflowScheduler} which provides
* meta info to points to the refresh workflow in scheduler service.
*/
T getRefreshHandler();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.workflow;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.catalog.CatalogMaterializedTable;

/**
* {@link RefreshWorkflow} is the basic interface that provide the related information to operate
* the refresh workflow of {@link CatalogMaterializedTable}, the operation of refresh workflow
* include create, modify, drop, etc.
*
* @see CreateRefreshWorkflow
* @see ModifyRefreshWorkflow
* @see DeleteRefreshWorkflow
*/
@PublicEvolving
public interface RefreshWorkflow {}

0 comments on commit 1378979

Please sign in to comment.