-
Notifications
You must be signed in to change notification settings - Fork 10
/
FlinkCommitsToKafka.java
101 lines (88 loc) · 3.89 KB
/
FlinkCommitsToKafka.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package com.ververica.platform;
import static org.apache.flink.table.api.Expressions.$;
import com.ververica.platform.entities.Commit;
import com.ververica.platform.io.source.JGitCommitSource;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* Flink job that reads commits in the apache/flink Github repository (using the Github API) and
* writes commit metadata to Kafka.
*/
public class FlinkCommitsToKafka {
public static final String APACHE_FLINK_REPOSITORY = "https://github.com/apache/flink.git";
public static final String APACHE_FLINK_BRANCH = "refs/heads/master";
public static void main(String[] args) {
ParameterTool params = ParameterTool.fromArgs(args);
// Sink
String kafkaServer = params.get("kafka-server", "kafka.vvp.svc");
String kafkaTopic = params.get("kafka-topic", "flink-commits");
String kafkaSecurityProtocol = params.get("kafka-security-protocol", null);
String kafkaSaslMechanism = params.get("kafka-sasl-mechanism", null);
String kafkaSaslJaasConfig = params.get("kafka-sasl-jaas-config", null);
// Source
long delayBetweenQueries = params.getLong("poll-interval-ms", 10_000L);
String ignoreCommitsBefore = params.get("ignore-commits-before", null);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
env.getConfig().enableObjectReuse();
DataStream<Commit> commits =
env.addSource(getGitCommitSource(delayBetweenQueries, ignoreCommitsBefore))
.name("flink-commit-source")
.uid("flink-commit-source");
tableEnv.executeSql(
"CREATE TABLE commits (\n"
+ "`author` STRING,\n"
+ "`authorDate` TIMESTAMP(3),\n"
+ "`authorEmail` STRING,\n"
+ "`commitDate` TIMESTAMP(3),\n"
+ "`committer` STRING,\n"
+ "`committerEmail` STRING,\n"
+ "`filesChanged` ARRAY<ROW<filename STRING, linesAdded INT, linesChanged INT, linesRemoved INT>>,\n"
+ "`sha1` STRING,\n"
+ "`shortInfo` STRING\n"
+ ") WITH (\n"
+ "'connector' = 'kafka',\n"
+ "'topic' = '"
+ kafkaTopic
+ "',\n"
+ "'properties.bootstrap.servers' = '"
+ kafkaServer
+ "',\n"
+ (kafkaSecurityProtocol != null
? "'properties.security.protocol' = '" + kafkaSecurityProtocol + "',\n"
: "")
+ (kafkaSaslMechanism != null
? "'properties.sasl.mechanism' = '" + kafkaSaslMechanism + "',\n"
: "")
+ (kafkaSaslJaasConfig != null
? "'properties.sasl.jaas.config' = '" + kafkaSaslJaasConfig + "',\n"
: "")
+ "'properties.max.request.size' = '"
+ 20 * 1024 * 1024
+ "',"
+ "'format' = 'json'\n"
+ ")");
tableEnv
.fromDataStream(
commits,
$("author"),
$("authorDate"),
$("authorEmail"),
$("commitDate"),
$("committer"),
$("committerEmail"),
$("filesChanged"),
$("sha1"),
$("shortInfo"))
.executeInsert("commits");
}
private static JGitCommitSource getGitCommitSource(
final long delayBetweenQueries, final String ignoreCommitsBefore) {
return new JGitCommitSource(
APACHE_FLINK_REPOSITORY, APACHE_FLINK_BRANCH, ignoreCommitsBefore, delayBetweenQueries);
}
}