Skip to content
This repository has been archived by the owner on Feb 16, 2024. It is now read-only.

Commit

Permalink
Merge branch 'master' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
raminqaf committed May 11, 2021
2 parents 92655c8 + 12ee9ca commit f44ad5e
Show file tree
Hide file tree
Showing 44 changed files with 4,062 additions and 63 deletions.
5 changes: 5 additions & 0 deletions distribution/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@
<artifactId>flink-connector-netty_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-pinot_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.kudu.client.CreateTableOptions;

import java.io.Serializable;
import java.util.Objects;

/**
* Describes which table should be used in sources and sinks along with specifications
Expand Down Expand Up @@ -103,4 +104,21 @@ public CreateTableOptions getCreateTableOptions() {
}
return createTableOptionsFactory.getCreateTableOptions();
}

@Override
public int hashCode() {
return Objects.hash(name);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
KuduTableInfo that = (KuduTableInfo) o;
return Objects.equals(this.name, that.name);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ private KuduClient obtainClient() {
private KuduSession obtainSession() {
KuduSession session = client.newSession();
session.setFlushMode(writerConfig.getFlushMode());
session.setTimeoutMillis(writerConfig.getOperationTimeout());
session.setMutationBufferSpace(writerConfig.getMaxBufferSize());
session.setFlushInterval(writerConfig.getFlushInterval());
session.setIgnoreAllDuplicateRows(writerConfig.isIgnoreDuplicate());
session.setIgnoreAllNotFoundRows(writerConfig.isIgnoreNotFound());
return session;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
import org.apache.flink.annotation.PublicEvolving;

import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.kudu.client.AsyncKuduClient;

import java.io.Serializable;
import java.util.Objects;

import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.kudu.client.SessionConfiguration.FlushMode;
Expand All @@ -34,13 +36,28 @@ public class KuduWriterConfig implements Serializable {

private final String masters;
private final FlushMode flushMode;
private final long operationTimeout;
private int maxBufferSize;
private int flushInterval;
private boolean ignoreNotFound;
private boolean ignoreDuplicate;

private KuduWriterConfig(
String masters,
FlushMode flushMode) {
FlushMode flushMode,
long operationTimeout,
int maxBufferSize,
int flushInterval,
boolean ignoreNotFound,
boolean ignoreDuplicate) {

this.masters = checkNotNull(masters, "Kudu masters cannot be null");
this.flushMode = checkNotNull(flushMode, "Kudu flush mode cannot be null");
this.operationTimeout = operationTimeout;
this.maxBufferSize = maxBufferSize;
this.flushInterval = flushInterval;
this.ignoreNotFound = ignoreNotFound;
this.ignoreDuplicate = ignoreDuplicate;
}

public String getMasters() {
Expand All @@ -51,6 +68,26 @@ public FlushMode getFlushMode() {
return flushMode;
}

public long getOperationTimeout() {
return operationTimeout;
}

public int getMaxBufferSize() {
return maxBufferSize;
}

public int getFlushInterval() {
return flushInterval;
}

public boolean isIgnoreNotFound() {
return ignoreNotFound;
}

public boolean isIgnoreDuplicate() {
return ignoreDuplicate;
}

@Override
public String toString() {
return new ToStringBuilder(this)
Expand All @@ -65,6 +102,16 @@ public String toString() {
public static class Builder {
private String masters;
private FlushMode flushMode = FlushMode.AUTO_FLUSH_BACKGROUND;
// Reference from AsyncKuduClientBuilder defaultOperationTimeoutMs.
private long timeout = AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS;
// Reference from AsyncKuduSession mutationBufferMaxOps 1000.
private int maxBufferSize = 1000;
// Reference from AsyncKuduSession flushIntervalMillis 1000.
private int flushInterval = 1000;
// Reference from AsyncKuduSession ignoreAllNotFoundRows false.
private boolean ignoreNotFound = false;
// Reference from AsyncKuduSession ignoreAllDuplicateRows false.
private boolean ignoreDuplicate = false;

private Builder(String masters) {
this.masters = masters;
Expand All @@ -87,10 +134,72 @@ public Builder setStrongConsistency() {
return setConsistency(FlushMode.AUTO_FLUSH_SYNC);
}

public Builder setMaxBufferSize(int maxBufferSize) {
this.maxBufferSize = maxBufferSize;
return this;
}

public Builder setFlushInterval(int flushInterval) {
this.flushInterval = flushInterval;
return this;
}

public Builder setOperationTimeout(long timeout) {
this.timeout = timeout;
return this;
}

public Builder setIgnoreNotFound(boolean ignoreNotFound) {
this.ignoreNotFound = ignoreNotFound;
return this;
}

public Builder setIgnoreDuplicate(boolean ignoreDuplicate) {
this.ignoreDuplicate = ignoreDuplicate;
return this;
}

public KuduWriterConfig build() {
return new KuduWriterConfig(
masters,
flushMode);
flushMode,
timeout,
maxBufferSize,
flushInterval,
ignoreNotFound,
ignoreDuplicate);
}

@Override
public int hashCode() {
int result =
Objects.hash(
masters,
flushMode,
timeout,
maxBufferSize,
flushInterval,
ignoreNotFound,
ignoreDuplicate);
return result;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Builder that = (Builder) o;
return Objects.equals(masters, that.masters)
&& Objects.equals(flushMode, that.flushMode)
&& Objects.equals(timeout, that.timeout)
&& Objects.equals(maxBufferSize, that.maxBufferSize)
&& Objects.equals(flushInterval, that.flushInterval)
&& Objects.equals(ignoreNotFound, that.ignoreNotFound)
&& Objects.equals(ignoreDuplicate, that.ignoreDuplicate);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,28 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
import static org.apache.flink.table.descriptors.DescriptorProperties.*;
import static org.apache.flink.table.descriptors.Rowtime.*;
import static org.apache.flink.table.descriptors.Schema.*;
import static org.apache.flink.table.descriptors.DescriptorProperties.EXPR;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_EXPR;
import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_CLASS;
import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_FROM;
import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_SERIALIZED;
import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_TYPE;
import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_CLASS;
import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_DELAY;
import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_SERIALIZED;
import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_TYPE;
import static org.apache.flink.table.descriptors.Schema.SCHEMA;
import static org.apache.flink.table.descriptors.Schema.SCHEMA_DATA_TYPE;
import static org.apache.flink.table.descriptors.Schema.SCHEMA_FROM;
import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME;
import static org.apache.flink.table.descriptors.Schema.SCHEMA_PROCTIME;
import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE;
import static org.apache.flink.util.Preconditions.checkNotNull;

public class KuduTableFactory implements TableSourceFactory<Row>, TableSinkFactory<Tuple2<Boolean, Row>> {
Expand All @@ -49,6 +66,11 @@ public class KuduTableFactory implements TableSourceFactory<Row>, TableSinkFacto
public static final String KUDU_HASH_COLS = "kudu.hash-columns";
public static final String KUDU_PRIMARY_KEY_COLS = "kudu.primary-key-columns";
public static final String KUDU_REPLICAS = "kudu.replicas";
public static final String KUDU_MAX_BUFFER_SIZE = "kudu.max-buffer-size";
public static final String KUDU_FLUSH_INTERVAL = "kudu.flush-interval";
public static final String KUDU_OPERATION_TIMEOUT = "kudu.operation-timeout";
public static final String KUDU_IGNORE_NOT_FOUND = "kudu.ignore-not-found";
public static final String KUDU_IGNORE_DUPLICATE = "kudu.ignore-duplicate";
public static final String KUDU = "kudu";

@Override
Expand All @@ -65,6 +87,11 @@ public List<String> supportedProperties() {
properties.add(KUDU_MASTERS);
properties.add(KUDU_HASH_COLS);
properties.add(KUDU_PRIMARY_KEY_COLS);
properties.add(KUDU_MAX_BUFFER_SIZE);
properties.add(KUDU_FLUSH_INTERVAL);
properties.add(KUDU_OPERATION_TIMEOUT);
properties.add(KUDU_IGNORE_NOT_FOUND);
properties.add(KUDU_IGNORE_DUPLICATE);
// schema
properties.add(SCHEMA + ".#." + SCHEMA_DATA_TYPE);
properties.add(SCHEMA + ".#." + SCHEMA_TYPE);
Expand Down Expand Up @@ -123,17 +150,31 @@ private KuduTableSource createTableSource(String tableName, TableSchema schema,
public KuduTableSink createTableSink(ObjectPath tablePath, CatalogTable table) {
validateTable(table);
String tableName = table.toProperties().getOrDefault(KUDU_TABLE, tablePath.getObjectName());
return createTableSink(tableName, table.getSchema(), table.getProperties());
return createTableSink(tableName, table.getSchema(), table.toProperties());
}

private KuduTableSink createTableSink(String tableName, TableSchema schema, Map<String, String> props) {
DescriptorProperties properties = new DescriptorProperties();
properties.putProperties(props);
String masterAddresses = props.get(KUDU_MASTERS);
TableSchema physicalSchema = KuduTableUtils.getSchemaWithSqlTimestamp(schema);
KuduTableInfo tableInfo = KuduTableUtils.createTableInfo(tableName, schema, props);

KuduWriterConfig.Builder configBuilder = KuduWriterConfig.Builder
.setMasters(masterAddresses);

Optional<Long> operationTimeout = properties.getOptionalLong(KUDU_OPERATION_TIMEOUT);
Optional<Integer> flushInterval = properties.getOptionalInt(KUDU_FLUSH_INTERVAL);
Optional<Integer> bufferSize = properties.getOptionalInt(KUDU_MAX_BUFFER_SIZE);
Optional<Boolean> ignoreNotFound = properties.getOptionalBoolean(KUDU_IGNORE_NOT_FOUND);
Optional<Boolean> ignoreDuplicate = properties.getOptionalBoolean(KUDU_IGNORE_DUPLICATE);

operationTimeout.ifPresent(time -> configBuilder.setOperationTimeout(time));
flushInterval.ifPresent(interval -> configBuilder.setFlushInterval(interval));
bufferSize.ifPresent(size -> configBuilder.setMaxBufferSize(size));
ignoreNotFound.ifPresent(i -> configBuilder.setIgnoreNotFound(i));
ignoreDuplicate.ifPresent(i -> configBuilder.setIgnoreDuplicate(i));

return new KuduTableSink(configBuilder, tableInfo, physicalSchema);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.apache.flink.table.utils.TableConnectorUtils;
import org.apache.flink.types.Row;

import java.util.Objects;

public class KuduTableSink implements UpsertStreamTableSink<Row> {

private final KuduWriterConfig.Builder writerConfigBuilder;
Expand Down Expand Up @@ -68,4 +70,23 @@ public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInform

@Override
public TableSchema getTableSchema() { return flinkSchema; }

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || o.getClass() != this.getClass()) {
return false;
}
KuduTableSink that = (KuduTableSink) o;
return this.writerConfigBuilder.equals(that.writerConfigBuilder) &&
this.flinkSchema.equals(that.flinkSchema) &&
this.tableInfo.equals(that.tableInfo);
}

@Override
public int hashCode() {
return Objects.hash(writerConfigBuilder, flinkSchema, tableInfo);
}
}

0 comments on commit f44ad5e

Please sign in to comment.