Skip to content

ververica/lab-flink-repository-analytics

Repository files navigation

Apache Flink Community Data Analytics with Apache Flink

This project contains a couple of tools to analyze data around the Apache Flink community, including

While there are a couple of sub-projects, the overall analytics basically splits up into two categories:

  1. The original Flink Repository Analytics (via DataStream API), and
  2. SQL analytics on any of the available data above.

Flink Repository Analytics with the DataStream API

This Apache Flink application analyzes the commit history of the Apache Flink Open Source project to determine each components' activity over time. It pulls data live from the Github API via a custom Flink Source and writes it to ElasticSearch.

It is the running example for a blog post series on Ververica Platform Community Edition.

The entrypoint class is com.ververica.platform.FlinkCommitProgram in the commit-analytics sub-project.

Configuration

This application accepts the following command line arguments of the form --values key:

Parameter Description Default
es-host Elastic Search Host elasticsearch-master-headless.vvp.svc
es-port Elastic Search Port 9200
start-date The application will process the commit history of Apache Flink starting from this date.
Examples: 2019-01, 2019-01-01, 2020-01-01T10:30Z, 2020-01-01T10:30:00Z, 2020-01-01T10:30:00.000Z
now
enable-new-feature A feature flag for enabling a new Job feature unset
poll-interval-ms Minimum pause between polling commits via the Github API (in milliseconds) 1000
checkpointing-interval-ms Apache Flink checkpointing interval (in milliseconds) 10000

SQL-based analytics for the Flink Repository, Mailing Lists, and Pull Requests

This functionality is split into 2 projects:

  • import to copy Flink Repository information, Mailing Lists contents, and Pull Requests information into Kafka
  • sql-functions to offer a few helpers useful for a variety of SQL queries

Import

The import sub-project contains three jobs to import data from various public sources around the development of Apache Flink:

These jobs leverage source implementations in the DataStream API and use the Table API to write the created elements to Kafka.

Configuration

Each application accepts the following command line arguments of the form --values key:

Parameter Description Default
kafka-server Kafka bootstrap server kafka.vvp.svc
kafka-topic Kafka topic to write to (FlinkMailingListToKafka will use this as the prefix) `flink-commits
start-date The application will process its input starting from this date.
Examples: 2019-01, 2019-01-01, 2020-01-01T10:30Z, 2020-01-01T10:30:00Z, 2020-01-01T10:30:00.000Z
now
poll-interval-ms Minimum pause between polling input data after reaching the current date and time (in milliseconds) 10000
checkpointing-interval-ms Apache Flink checkpointing interval (in milliseconds) 10000

Output Table Definitions

The Kafka tables that the import sub-project is writing to can directly be used in SQL queries with the following table definitions that you just need to point to your Kafka server(s), adjusting other properties as needed, e.g. Kafka topic names or watermark definitions:

Flink Commits
CREATE TABLE `flink_commits` (
  `author` STRING,
  `authorDate` TIMESTAMP(3),
  `authorEmail` STRING,
  `commitDate` TIMESTAMP(3),
  `committer` STRING,
  `committerEmail` STRING,
  `filesChanged` ARRAY<ROW<filename STRING, linesAdded INT, linesChanged INT, linesRemoved INT>>,
  `sha1` STRING,
  `shortInfo` STRING,
  WATERMARK FOR `commitDate` AS `commitDate` - INTERVAL '1' DAY
)
COMMENT 'Commits on the master branch of github.com/apache/flink'
WITH (
  'connector' = 'kafka',
  'topic' = 'flink-commits',
  'properties.bootstrap.servers' = '<kafka-server>',
  'properties.group.id' = 'flink-analytics',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json',
  'json.fail-on-missing-field' = 'false',
  'json.ignore-parse-errors' = 'true'
);
Flink Pull Requests
CREATE TABLE `flink_pulls` (
  `closedAt` TIMESTAMP(3),
  `commentsCount` INT,
  `createdAt` TIMESTAMP(3),
  `creator` STRING,
  `creatorEmail` STRING,
  `description` STRING,
  `labels` ARRAY<STRING>,
  `mergeCommit` STRING,
  `mergedAt` TIMESTAMP(3),
  `number` INT,
  `state` STRING,
  `title` STRING,
  `updatedAt` TIMESTAMP(3),
  WATERMARK FOR `createdAt` AS `createdAt` - INTERVAL '7' DAY
)
COMMENT 'Pull requests opened for the master branch of github.com/apache/flink'
WITH (
  'connector' = 'kafka',
  'topic' = 'flink-pulls',
  'properties.bootstrap.servers' = '<kafka-server>',
  'properties.group.id' = 'flink-analytics',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json',
  'json.fail-on-missing-field' = 'false',
  'json.ignore-parse-errors' = 'true'
);
Flink Mailing Lists
CREATE TABLE `flink_ml_dev` (
  `date` TIMESTAMP(3),
  `fromEmail` STRING,
  `fromRaw` STRING,
  `htmlBody` STRING,
  `subject` STRING,
  `textBody` STRING,
  WATERMARK FOR `date` AS `date` - INTERVAL '1' DAY
)
COMMENT 'Email summary of all messages sent to dev@flink.apache.org>'
WITH (
  'connector' = 'kafka',
  'topic' = 'flink-mail-dev',
  'properties.bootstrap.servers' = '<kafka-server>',
  'properties.group.id' = 'flink-analytics',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json',
  'json.fail-on-missing-field' = 'false',
  'json.ignore-parse-errors' = 'true'
);

CREATE TABLE `flink_ml_user` (
  `date` TIMESTAMP(3),
  `fromEmail` STRING,
  `fromRaw` STRING,
  `htmlBody` STRING,
  `subject` STRING,
  `textBody` STRING,
  WATERMARK FOR `date` AS `date` - INTERVAL '1' DAY
)
COMMENT 'Email summary of all messages sent to user@flink.apache.org>'
WITH (
  'connector' = 'kafka',
  'topic' = 'flink-mail-user',
  'properties.bootstrap.servers' = '<kafka-server>',
  'properties.group.id' = 'flink-analytics',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json',
  'json.fail-on-missing-field' = 'false',
  'json.ignore-parse-errors' = 'true'
);

CREATE TABLE `flink_ml_user_zh` (
  `date` TIMESTAMP(3),
  `fromEmail` STRING,
  `fromRaw` STRING,
  `htmlBody` STRING,
  `subject` STRING,
  `textBody` STRING,
  WATERMARK FOR `date` AS `date` - INTERVAL '1' DAY
)
COMMENT 'Email summary of all messages sent to user-zh@flink.apache.org>'
WITH (
  'connector' = 'kafka',
  'topic' = 'flink-mail-user-zh',
  'properties.bootstrap.servers' = '<kafka-server>',
  'properties.group.id' = 'flink-analytics',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json',
  'json.fail-on-missing-field' = 'false',
  'json.ignore-parse-errors' = 'true'
);

SQL Functions

The sql-functions sub-project contains a few user-defined functions that you can use to simplify your SQL queries when analyzing the repository. You can find them in the com.ververica.platform.sql.functions package

Community Data Analytics Examples

The following SQL statements are just examples of things you can look for with the data that is available. There is much more you can find out. A few more examples were presented in a Flink Forward Global 2020 talk on A Year in Flink - Flink SQL Live Coding.

Number of Distinct Users per Year on the Mailing List

SELECT
  TUMBLE_END(`date`, INTERVAL '365' DAY(3)) as windowEnd,
  COUNT(DISTINCT fromEmail) AS numUsers
FROM flink_ml_user
GROUP BY TUMBLE(`date`, INTERVAL '365' DAY(3));

Emails on the User Mailing List with no Reply within 30 Days

SELECT
  SESSION_END(`date`, INTERVAL '30' DAY) AS windowEnd,
  NormalizeEmailThread(subject) AS thread,
  COUNT(*) as numMessagesInThread
FROM flink_ml_user
WHERE `date` > (CURRENT_TIMESTAMP - INTERVAL '1' YEAR)
GROUP BY SESSION(`date`, INTERVAL '30' DAY), NormalizeEmailThread(subject)
HAVING COUNT(*) < 2;

Commit Activity per Month and Flink Component

This is basically the SQL version of the Flink Repository Analytics with the DataStream API introduced above.

SELECT
  TUMBLE_END(commitDate, INTERVAL '30' DAY) AS windowEnd,
  GetSourceComponent(filename),
  SUM(linesChanged) AS linesChanged
FROM flink_commits CROSS JOIN UNNEST(filesChanged) AS t
WHERE commitDate > (CURRENT_TIMESTAMP - INTERVAL '1' YEAR)
GROUP BY TUMBLE(commitDate, INTERVAL '30' DAY), GetSourceComponent(filename)
HAVING SUM(linesChanged) > 1000;

Jira Created Tickets per Month and Jira Component

SELECT
  TUMBLE_END(`date`, INTERVAL '30' DAY) as windowEnd,
  component,
  COUNT(*) as createdTickets
FROM flink_ml_dev
  CROSS JOIN UNNEST(GetJiraTicketComponents(textBody)) AS c (component)
WHERE `date` > (CURRENT_TIMESTAMP - INTERVAL '1' YEAR)
  AND IsJiraTicket(fromRaw)
  AND GetJiraTicketComponents(textBody) IS NOT NULL
GROUP BY TUMBLE(`date`, INTERVAL '30' DAY), component
HAVING COUNT(*) > 10;

Helper Utilities

A couple of other sub-projects mainly serve as helper utilities providing common functionality:

  • common offers generic helper classes and all entities used throughout the whole project.
  • source-github contains implementations of sources interacting with the Github API.
  • source-mbox contains a source implementation that uses Apache James Mime4J to parse mbox archives.