Change Data Capture (CDC) source
that captures and streams change events from various databases.
Currently, it supports MySQL
, PostgreSQL
, MongoDB
, Oracle
and SQL Server
databases.
Build upon Debezium Embedded Connector, the CDC Source
allows capturing and streaming database changes over different message binders such Apache Kafka, RabbitMQ and all Spring Cloud Stream supporter brokers.
It supports all Debezium configuration properties. Just add the cdc.config.
prefix to the existing Debezium properties. For example to set the Debezium’s connector.class
property use the cdc.config.connector.class
source property instead.
We provide convenient shortcuts for the most frequently used Debezium properties. For example instead of the long cdc.config.connector.class=io.debezium.connector.mysql.MySqlConnector
Debezium property you can use our cdc.connector=mysql
shortcut. The table below lists all available shortcuts along with the Debezium properties they represent.
The Debezium properties (e.g. cdc.config.XXX
) always have precedence over the shortcuts!
The CDC Source introduces a new default BackingOffsetStore
configuration, based on the MetadataStore service. Later provides various microservices friendly ways for storing the offset metadata.
Properties grouped by prefix:
- config
-
Spring pass-trough wrapper for debezium configuration properties. All properties with a 'cdc.config.' prefix are native Debezium properties. The prefix is removed, converting them into Debezium io.debezium.config.Configuration. (Map<String, String>, default:
<none>
) - connector
-
Shortcut for the cdc.config.connector.class property. Either of those can be used as long as they do not contradict with each other. (ConnectorType, default:
<none>
, possible values:mysql
,postgres
,mongodb
,oracle
,sqlserver
) - name
-
Unique name for this sourceConnector instance. (String, default:
<none>
) - schema
-
Include the schema's as part of the outbound message. (Boolean, default:
false
)
- add-fields
-
Comma separated list of metadata fields to add to the flattened message. The fields will be prefixed with "__" or "__[<]struct]__", depending on the specification of the struct. (String, default:
<none>
) - add-headers
-
Comma separated list specify a list of metadata fields to add to the header of the flattened message. The fields will be prefixed with "__" or "__[struct]__". (String, default:
<none>
) - delete-handling-mode
-
Options for handling deleted records: (1) none - pass the records through, (2) drop - remove the records and (3) rewrite - add a '__deleted' field to the records. (DeleteHandlingMode, default:
<none>
, possible values:drop
,rewrite
,none
) - drop-tombstones
-
By default Debezium generates tombstone records to enable Kafka compaction on deleted records. The dropTombstones can suppress the tombstone records. (Boolean, default:
true
) - enabled
-
Enable flattening the source record events (https://debezium.io/docs/configuration/event-flattening). (Boolean, default:
true
)
- commit-timeout
-
Maximum number of milliseconds to wait for records to flush and partition offset data to be committed to offset storage before cancelling the process and restoring the offset data to be committed in a future attempt. (Duration, default:
5000ms
) - flush-interval
-
Interval at which to try committing offsets. The default is 1 minute. (Duration, default:
60000ms
) - policy
-
Offset storage commit policy. (OffsetPolicy, default:
<none>
) - storage
-
Kafka connector tracks the number processed records and regularly stores the count (as "offsets") in a preconfigured metadata storage. On restart the connector resumes the reading from the last recorded source offset. (OffsetStorageType, default:
<none>
, possible values:memory
,file
,kafka
,metadata
)
- convert-connect-headers
-
When true the {@link org.apache.kafka.connect.header.Header} are converted into message headers with the {@link org.apache.kafka.connect.header.Header#key()} as name and {@link org.apache.kafka.connect.header.Header#value()}. (Boolean, default:
true
) - offset
-
Serializes the source record's offset metadata into the outbound message header under cdc.offset. (Boolean, default:
false
)
- create-delay
-
Delay between create table retries. (Integer, default:
1
) - create-retries
-
Retry number for create table request. (Integer, default:
25
) - read-capacity
-
Read capacity on the table. (Long, default:
1
) - table
-
Table name for metadata. (String, default:
<none>
) - time-to-live
-
TTL for table entries. (Integer, default:
<none>
) - write-capacity
-
Write capacity on the table. (Long, default:
1
)
- region
-
Unique grouping identifier for messages persisted with this store. (String, default:
DEFAULT
) - table-prefix
-
Prefix for the custom table name. (String, default:
<none>
)
- collection
-
MongoDB collection name for metadata. (String, default:
metadataStore
)
- type
-
Indicates the type of metadata store to configure (default is 'memory'). You must include the corresponding Spring Integration dependency to use a persistent store. (StoreType, default:
<none>
, possible values:mongodb
,redis
,dynamodb
,jdbc
,zookeeper
,hazelcast
,memory
)
- connect-string
-
Zookeeper connect string in form HOST:PORT. (String, default:
127.0.0.1:2181
) - encoding
-
Encoding to use when storing data in Zookeeper. (Charset, default:
UTF-8
) - retry-interval
-
Retry interval for Zookeeper operations in milliseconds. (Integer, default:
1000
) - root
-
Root node - store entries are children of this node. (String, default:
/SpringIntegration-MetadataStore
)
The table below lists all available shortcuts along with the Debezium properties they represent.
Shortcut | Original | Description |
---|---|---|
cdc.connector |
cdc.config.connector.class |
|
cdc.name |
cdc.config.name |
|
cdc.offset.flush-interval |
cdc.config.offset.flush.interval.ms |
|
cdc.offset.commit-timeout |
cdc.config.offset.flush.timeout.ms |
|
cdc.offset.policy |
cdc.config.offset.commit.policy |
|
cdc.offset.storage |
cdc.config.offset.storage |
|
cdc.flattening.drop-tombstones |
cdc.config.drop.tombstones |
|
cdc.flattening.delete-handling-mode |
cdc.config.delete.handling.mode |
|
The CDC Source
uses the Debezium utilities, and currently supports CDC for five datastores: MySQL
, PostgreSQL
, MongoDB
, Oracle
and SQL Server
databases.
The [CdcSourceIntegrationTest](), [CdcDeleteHandlingIntegrationTest]() and [CdcFlatteningIntegrationTest]() integration tests use test databases fixtures, running on the local machine.
We use pre-build debezium docker database images.
The Maven builds create the test databases fixtures with the help of the docker-maven-plugin
.
To run and debug the tests from your IDE you need to deploy the required database images from the command line. Instructions below explains how to run pre-configured test databases form Docker images.
Start the debezium/example-mysql
in a docker:
docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:1.0
Tip
|
(optional) Use docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'
mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium' IDENTIFIED BY 'dbz'; |
Use following properties to connect the CDC Source to MySQL DB:
cdc.connector=mysql # (1)
cdc.name=my-sql-connector # (2)
cdc.config.database.server.id=85744 # (2)
cdc.config.database.server.name=my-app-connector # (2)
cdc.config.database.user=debezium # (3)
cdc.config.database.password=dbz # (3)
cdc.config.database.hostname=localhost # (3)
cdc.config.database.port=3306 # (3)
cdc.schema=true # (4)
cdc.flattening.enabled=true # (5)
-
Configures the CDC Source to use MySqlConnector. (equivalent to setting
cdc.config.connector.class=io.debezium.connector.mysql.MySqlConnector
). -
Metadata used to identify and dispatch the incoming events.
-
Connection to the MySQL server running on
localhost:3306
asdebezium
user. -
Includes the Change Event Value schema in the
SourceRecord
events. -
Enables the CDC Event Flattening.
You can run also the CdcSourceIntegrationTests#CdcMysqlTests
using this mysql configuration.
Start a pre-configured postgres server from the debezium/example-postgres:1.0
Docker image:
docker run -it --rm --name postgres -p 5432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres debezium/example-postgres:1.0
You can connect to this server like this:
psql -U postgres -h localhost -p 5432
Use following properties to connect the CDC Source to PostgreSQL:
cdc.connector=postgres # (1)
cdc.offset.storage=memory #(2)
cdc.name=my-sql-connector # (3)
cdc.config.database.server.id=85744 # (3)
cdc.config.database.server.name=my-app-connector # (3)
cdc.config.database.user=postgres # (4)
cdc.config.database.password=postgres # (4)
cdc.config.database..dbname=postgres # (4)
cdc.config.database.hostname=localhost # (4)
cdc.config.database.port=5432 # (4)
cdc.schema=true # (5)
cdc.flattening.enabled=true # (6)
-
Configures
CDC Source
to use PostgresConnector. Equivalent for settingcdc.config.connector.class=io.debezium.connector.postgresql.PostgresConnector
. -
Configures the Debezium engine to use
memory
(e.g. `cdc.config.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore) backing offset store. -
Metadata used to identify and dispatch the incoming events.
-
Connection to the PostgreSQL server running on
localhost:5432
aspostgres
user. -
Includes the Change Event Value schema in the
SourceRecord
events. -
Enables the CDC Event Flattening.
You can run also the CdcSourceIntegrationTests#CdcPostgresTests
using this mysql configuration.
Start a pre-configured mongodb from the debezium/example-mongodb:0.10
Docker image:
docker run -it --rm --name mongodb -p 27017:27017 -e MONGODB_USER=debezium -e MONGODB_PASSWORD=dbz debezium/example-mongodb:0.10
Initialize the inventory collections
docker exec -it mongodb sh -c 'bash -c /usr/local/bin/init-inventory.sh'
In the mongodb
terminal output, search for a log entry like host: "3f95a8a6516e:27017"
:
2019-01-10T13:46:10.004+0000 I COMMAND [conn1] command local.oplog.rs appName: "MongoDB Shell" command: replSetInitiate { replSetInitiate: { _id: "rs0", members: [ { _id: 0.0, host: "3f95a8a6516e:27017" } ] }, lsid: { id: UUID("5f477a16-d80d-41f2-9ab4-4ebecea46773") }, $db: "admin" } numYields:0 reslen:22 locks:{ Global: { acquireCount: { r: 36, w: 20, W: 2 }, acquireWaitCount: { W: 1 }, timeAcquiringMicros: { W: 312 } }, Database: { acquireCount: { r: 6, w: 4, W: 16 } }, Collection: { acquireCount: { r: 4, w: 2 } }, oplog: { acquireCount: { r: 2, w: 3 } } } protocol:op_msg 988ms
Add 127.0.0.1 3f95a8a6516e
entry to your /etc/hosts
Use following properties to connect the CDC Source to MongoDB:
cdc.connector=mongodb # (1)
cdc.offset.storage=memory #(2)
cdc.config.mongodb.hosts=rs0/localhost:27017 # (3)
cdc.config.mongodb.name=dbserver1 # (3)
cdc.config.mongodb.user=debezium # (3)
cdc.config.mongodb.password=dbz # (3)
cdc.config.database.whitelist=inventory # (3)
cdc.config.tasks.max=1 # (4)
cdc.schema=true # (5)
cdc.flattening.enabled=true # (6)
-
Configures
CDC Source
to use MongoDB Connector. This maps intocdc.config.connector.class=io.debezium.connector.mongodb.MongodbSourceConnector
. -
Configures the Debezium engine to use
memory
(e.g. `cdc.config.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore) backing offset store. -
Connection to the MongoDB running on
localhost:27017
asdebezium
user. -
Includes the Change Event Value schema in the
SourceRecord
events. -
Enables the CDC Event Flattening.
You can run also the CdcSourceIntegrationTests#CdcPostgresTests
using this mysql configuration.
Start a sqlserver
from the debezium/example-postgres:1.0
Docker image:
docker run -it --rm --name sqlserver -p 1433:1433 -e ACCEPT_EULA=Y -e MSSQL_PID=Standard -e SA_PASSWORD=Password! -e MSSQL_AGENT_ENABLED=true microsoft/mssql-server-linux:2017-CU9-GDR2
Populate with sample data form debezium’s sqlserver tutorial:
wget https://raw.githubusercontent.com/debezium/debezium-examples/master/tutorial/debezium-sqlserver-init/inventory.sql
cat ./inventory.sql | docker exec -i sqlserver bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD'
Use following properties to connect the CDC Source to SQLServer:
cdc.connector=sqlserver # (1)
cdc.offset.storage=memory #(2)
cdc.name=my-sql-connector # (3)
cdc.config.database.server.id=85744 # (3)
cdc.config.database.server.name=my-app-connector # (3)
cdc.config.database.user=sa # (4)
cdc.config.database.password=Password! # (4)
cdc.config.database..dbname=testDB # (4)
cdc.config.database.hostname=localhost # (4)
cdc.config.database.port=1433 # (4)
-
Configures
CDC Source
to use SqlServerConnector. Equivalent for settingcdc.config.connector.class=io.debezium.connector.sqlserver.SqlServerConnector
. -
Configures the Debezium engine to use
memory
(e.g. `cdc.config.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore) backing offset store. -
Metadata used to identify and dispatch the incoming events.
-
Connection to the SQL Server running on
localhost:1433
assa
user.
You can run also the CdcSourceIntegrationTests#CdcSqlServerTests
using this mysql configuration.
Start Oracle reachable from localhost and set up with the configuration, users and grants described in the Debezium Vagrant set-up
Populate with sample data form Debezium’s Oracle tutorial:
wget https://raw.githubusercontent.com/debezium/debezium-examples/master/tutorial/debezium-with-oracle-jdbc/init/inventory.sql
cat ./inventory.sql | docker exec -i dbz_oracle sqlplus debezium/dbz@//localhost:1521/ORCLPDB1
java -jar cdc-debezium-source.jar --cdc.connector=mysql --cdc.name=my-sql-connector --cdc.config.database.server.id=85744 --cdc.config.database.server.name=my-app-connector --cdc.config.database.user=debezium --cdc.config.database.password=dbz --cdc.config.database.hostname=localhost --cdc.config.database.port=3306 --cdc.schema=true --cdc.flattening.enabled=true