Skip to content

Repository containing the code for the assignment of the Information Integration course in SoSe 2022

License

Notifications You must be signed in to change notification settings

bakdata/hpi-ii-project-2022

Repository files navigation

HPI information integration project SoSe 2022

This repository provides a code base for the information integration course in the summer semester of 2022. Below you can find the documentation for setting up the project.

Prerequisites

Architecture

Components

RB website

The Registerbekanntmachung website contains announcements concerning entries made into the companies, cooperatives, and partnerships registers within the electronic information and communication system. You can search for the announcements. Each announcement can be requested through the link below. You only need to pass the query parameters rb_id and land_abk. For instance, we chose the state Rheinland-Pfalz rp with an announcement id of 56267, the new entry of the company BioNTech.

export STATE="rp" 
export RB_ID="56267"
curl -X GET  "https://www.handelsregisterbekanntmachungen.de/skripte/hrb.php?rb_id=$RB_ID&land_abk=$STATE"

RB crawler

The Registerbekanntmachung crawler (rb_crawler) sends a get request to the link above with parameters (rb_id and land_abk) passed to it and extracts the information from the response.

We use Protocol buffers to define our schema. The crawler uses the generated model class (i.e., Announcement class) from the protobuf schema. We will explain further how you can generate this class using the protobuf compiler.

The compiler creates an Announcement object with the fields defined in the schema. The crawler fills the object fields with the extracted data from the website. It then serializes the Announcement object to bytes so that Kafka can read it and produces it to the rb-announcementstopic.

After that, it increments the rb_id value and sends another GET request. This process continues until the end of the announcements is reached, and the crawler will stop automatically.

rb-announcements topic

The rb-annourncements topic holds all the announcements produced by the rb_crawler. Each message in a Kafka topic consist of a key and value.

The key type of this topic is String. The rb_crawler generates the key. The key is a combination of the land_abk and the rb_id. If we consider the rb_id and land_abk from the example above, the key will look like this: rp_56267.

The value of the message contains more information like event_name, event_date, and more. Therefore, the value type is complex and needs a schema definition.

rb-corporates topic

This topic contains the extracted information about the corporates of the Registerbekanntmachung. The key is a self generated hash from the corporate name and the value is a complex schema type.

rb-persons topic

This topic contains the extracted information about the persons of the Registerbekanntmachung. The key is a self generated hash from the source(i.e., rb), firstname, lastname, and corporate name. The value is a complex schema type.

BaFin website

The Federal Financial Supervisory Authority (BaFin) brings together under one roof the supervision of banks and financial services providers, insurance undertakings and securities trading. The website also contains registered announcements of managers’ transactions pursuant to Article 19 of the MAR. These announcements describe the transaction details that an executive director (manager) of a company did in the stocks. The website holds the transaction information in a one-year time window. The first announcement has a message ID of 18794 at the time of this writing. You can find the message ID on this page.

BaFin crawler

This crawler extracts the information from the BaFin portal and fills the model objects with the extracted data. Moreover, it serializes the objects and produces each of them to the desired topic.

The crawler is initialized with a message_id at the beginning of the crawl and sends a request to the portal URL of BaFin. This process is demonstrated in the script below:

export MESSAGE_ID="18794"
curl -X GET  https://portal.mvp.bafin.de/database/DealingsInfo/ergebnisListe.do?cmd=loadEmittentenAction&meldepflichtigerId=$MEESAGE_ID

After retrieving the HTML of the page, the crawler extracts the BaFin-ID in the table and sends another request to retrieve the detailed transaction information. This is demonstrated with a shell script:

export MESSAGE_ID="18794"
export BAFIN_ID=40002082
https://portal.mvp.bafin.de/database/DealingsInfo/transaktionListe.do?cmd=loadTransaktionenAction&emittentBafinId=$BAFIN_ID&meldungId=$MESSAGE_ID&KeepThis=true&TB_iframe=true&modal=true

The crawler uses the HTML response and extracts the information, and produces them for each topic. It then increases the message_id and repeats this process.

bafin_trades topic

This topic contains all the transaction information a person made. The key is a self-generated string from the message-id and the BaFin id. The value is a complex schema type.

bafin_corporates topic

The bafin_corporates topic contains all the information about a corporate. The key is a self-generated hash from the corporate name. The value is a complex schema type.

bafin_persons topic

This topic contains the extracted information about the persons who made a transaction. The key is a self-generated hash from the source (i.e., BaFin), first name, last name, and corporate name. The value is a complex schema type.

Kafka Connect

Kafka Connect is a tool to move large data sets into (source) and out (sink) of Kafka. Here we only use the Sink connector, which consumes data from a Kafka topic into a secondary index such as Elasticsearch.

We use the Elasticsearch Sink Connector to move the data from the coporate-events topic into the Elasticsearch.

Setup

This project uses Poetry as a build tool. To install all the dependencies, just run poetry install.

This project uses Protobuf for serializing and deserializing objects. You can find these schemas under the proto folder. Furthermore, you must generate the Python code for the model class from the proto file. To do so run the generate-proto.sh script. This script uses the Protobuf compiler (protoc) to generate the model class under the bakdata folder.

Run

Infrastructure

Use docker-compose up -d to start all the services: Zookeeper , Kafka, Schema Registry, Redpanda Console, Kafka Connect, and Elasticsearch. Depending on your system, it takes a couple of minutes before the services are up and running. You can use a tool like lazydocker to check the status of the services.

NOTE: Kafka Connect start time for the Apple silicon is more than 5 minutes! You can start using Kafka Connect whenever the status of the container is running (healthy).

Kafka Connect

After all the services are up and running, you need to configure Kafka Connect to use the Elasticsearch or the Neo4j sink connector. The config file is a JSON formatted file. We provided the sink configuration for the different topics under the connect folder.

You can find more information about the configuration properties for the Elasticsearch sink on the official documentation page. Details on configuring the Neo4j sink connector are available on the official documentation page.

To start the connector, you must push the JSON config file to Kafka. You can use the UI dashboard in Redpanda Console or the bash script provided. It is possible to remove a connector by deleting it through Redpanda's Console or calling the deletion API in the bash script provided.

RB crawler

You can start the crawler with the command below:

poetry run python -m rb_crawler.main --id $RB_ID --state $STATE

The --id option is an integer, which determines the initial event in the Handelsregisterbekanntmachungen to be crawled.

The --state option takes a string (only the ones listed above). This string defines the state where the crawler should start from.

You can use the --help option to see the usage:

Usage: main.py [OPTIONS]

Options:
  -i, --id INTEGER                The rb_id to initialize the crawl from
  -s, --state [bw|by|be|br|hb|hh|he|mv|ni|nw|rp|sl|sn|st|sh|th]
                                  The state ISO code
  --help                          Show this message and exit.

BaFin crawler

You can start the crawler with the command below:

poetry run python -m bafin_crawler.main --id $MESSAGE_ID

The --id option is an integer, which determines the initial event in the BaFin portal to be crawled.

You can use the --help option to see the usage:

Usage: main.py [OPTIONS]

Options:
  -i, --id INTEGER  The message_id to initialize the crawl from
  --help            Show this message and exit.

Query data

Redpanda Console

Redpanda Console is a web application that helps you manage and debug your Kafka workloads effortlessly. You can create, update, and delete Kafka resources like Topics and Kafka Connect configs. You can open Redpanda Console in your browser under http://localhost:8080.

Elasticsearch

To query the data from Elasticsearch, you can use the query DSL of elastic. For example:

curl -X GET "localhost:9200/_search?pretty" -H 'Content-Type: application/json' -d'
{
    "query": {
        "match": {
            <field>
        }
    }
}
'

<field> is the field you wish to search. For example:

"first_name":"Sussane"

Teardown

You can stop and remove all the resources by running:

docker-compose down

About

Repository containing the code for the assignment of the Information Integration course in SoSe 2022

Topics

Resources

License

Stars

Watchers

Forks