Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DB2 connector - Feature Preview #661

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open

DB2 connector - Feature Preview #661

wants to merge 1 commit into from

Conversation

sravotto
Copy link
Contributor

@sravotto sravotto commented Jan 24, 2024

This change adds a connector to a DB2 database source.
It uses DB2 SQL replication, leveraging staging tables in the source database.
For instructions on how to put tables in capture mode in DB2, so that changes can be sent to the staging tables, see https://debezium.io/documentation/reference/stable/connectors/db2.html#putting-tables-in-capture-mode.

The connector periodically pulls new mutations from the staging tables and inserts them to the target database via the types.MultiAcceptor interface.

Note for reviewers:

  • internal/source/db2 implements the DB2 source connector:
    • queries.go defines the queries used to get the Log Sequence Numbers and change events from DB2.
    • conn.go reads messages from the DB2 staging tables, accumulates them in a batch and commits them to the target.
    • lsn.go implements the stamp.Stamp interface.
  • A new workflow .github/workflows/go-test-db2.yaml sets up a DB2 docker image and a CRDB image. It executes a set of tests to verify that changes are correctly propagated from the source to the target.
  • The code under .github/db2 is taken from https://github.com/debezium/debezium-examples/tree/main/tutorial/debezium-db2-init/db2server. It builds the DB2 docker image used for integration testing.
  • internal/cmd/db2 contains two new command line options.
    • db2 starts a connection to the DB2 server to pull change events from the staging tables.
    • db2install is a convenience command to download and install the DB2 ODBC driver. The ODBC driver is exposed as a Go library via a cgo wrapper.

This change is Reviewable

internal/cmd/db2/install.go Fixed Show fixed Hide fixed
internal/cmd/db2/install.go Fixed Show fixed Hide fixed
internal/cmd/db2/install.go Fixed Show fixed Hide fixed
internal/cmd/db2/install.go Fixed Show fixed Hide fixed
internal/cmd/db2/install.go Fixed Show fixed Hide fixed
@codecov-commenter
Copy link

codecov-commenter commented Jan 24, 2024

Codecov Report

Attention: Patch coverage is 58.97436% with 208 lines in your changes are missing coverage. Please review.

Project coverage is 72.01%. Comparing base (70ac5ff) to head (fef7013).

Files Patch % Lines
internal/source/db2/conn.go 61.53% 93 Missing and 42 partials ⚠️
internal/source/db2/config.go 22.03% 37 Missing and 9 partials ⚠️
internal/source/db2/lsn.go 72.91% 9 Missing and 4 partials ⚠️
internal/source/db2/provider.go 73.33% 8 Missing and 4 partials ⚠️
internal/source/db2/db2.go 0.00% 2 Missing ⚠️

❗ Your organization needs to install the Codecov GitHub app to enable full functionality.

Additional details and impacted files
@@            Coverage Diff             @@
##           master     #661      +/-   ##
==========================================
- Coverage   76.62%   72.01%   -4.61%     
==========================================
  Files         195      201       +6     
  Lines       11640    12147     +507     
==========================================
- Hits         8919     8748     -171     
- Misses       2069     2731     +662     
- Partials      652      668      +16     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@sravotto sravotto force-pushed the sr8_db2 branch 2 times, most recently from 96b424b to fd641e6 Compare January 25, 2024 22:43
@sravotto sravotto force-pushed the sr8_db2 branch 6 times, most recently from c840b12 to 2330f3a Compare January 26, 2024 20:03
@sravotto sravotto changed the title First rough draft for the DB2 connector. First draft for the DB2 connector. Jan 26, 2024
@sravotto sravotto force-pushed the sr8_db2 branch 4 times, most recently from bbb9ef6 to e7ecd02 Compare February 2, 2024 16:19
Copy link
Member

@BramGruneir BramGruneir left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here are my notes so far.

Reviewed 10 of 37 files at r1, 4 of 17 files at r4, 2 of 2 files at r5.
Reviewable status: 15 of 45 files reviewed, 38 unresolved discussions (waiting on @bobvawter and @sravotto)


.github/db2/asncdc.c line 1 at r5 (raw file):

#include <stdio.h>

These files need an attribution.


internal/cmd/db2/install.go line 17 at r5 (raw file):

// SPDX-License-Identifier: Apache-2.0

package db2

Maybe only let this work in linux.
I'm on the fence of this just displaying the requirements if the driver isn't found when calling db2 instead of automating the install.


internal/cmd/db2/install.go line 44 at r5 (raw file):

// Because of license restrictions, the driver cannot be under
// source control. It needs to be downloaded when used.
func Install() *cobra.Command {

Add a dry run here so you can just see what file it's trying to download.


internal/source/db2/conn.go line 70 at r5 (raw file):

) error {
	var startBatch time.Time
	var mutInBatch int

mutBatchSize


internal/source/db2/conn.go line 99 at r5 (raw file):

			mutInBatch = 0
			startBatch = time.Now()
			if err != nil {

move up


internal/source/db2/conn.go line 133 at r5 (raw file):

				return errors.Errorf("unable to retrieve primary keys for %s", sourceTable)
			}
			var err error

move these two down to where they're used


internal/source/db2/conn.go line 165 at r5 (raw file):

			log.Tracef("%s %s %s %v\n", ev.op, sourceTable.Raw(), targetTable.Raw(), key)
			script.AddMeta("db2", targetTable, &mut)
			err = batch.OnData(ctx, script.SourceName(targetTable), targetTable, []types.Mutation{mut})

wrap in if ... ; err != nil {


internal/source/db2/conn.go line 170 at r5 (raw file):

			}
		default:
			log.Warnf("unknown op %v", ev.op)

maybe error?


internal/source/db2/conn.go line 185 at r5 (raw file):

	}
	defer func() {
		log.Warn("db2.ReadInto done. Closing database connection")

this should be info, maybe just debug


internal/source/db2/conn.go line 187 at r5 (raw file):

		log.Warn("db2.ReadInto done. Closing database connection")
		err := db.Close()
		log.Info("Db closed.", err)

if there is an error, log as an error!
Otherwise, as info


internal/source/db2/conn.go line 193 at r5 (raw file):

		return errors.New("missing lsn")
	}
	previousLsn, _ := cp.(*lsn)

check for safety?


internal/source/db2/conn.go line 202 at r5 (raw file):

		if nextLsn.Less(previousLsn) || nextLsn.Equal(previousLsn) || nextLsn.Equal(lsnZero()) {
			select {
			case <-time.After(1 * time.Second):

Add config option for this, call it sleep when no results found


internal/source/db2/conn.go line 224 at r5 (raw file):

		tables, err := c.getTables(ctx, db, c.config.SourceSchema)
		if err != nil {
			log.Error("getTables failed", err)

double error logging?


internal/source/db2/conn.go line 230 at r5 (raw file):

			err := c.fetchColumnMetadata(ctx, db, &t)
			if err != nil {
				log.Error("fetchColumnMetadata failed", err)

these lines seem redundant


internal/source/db2/conn.go line 239 at r5 (raw file):

			count, err := c.postMutations(ctx, db, tbl, lsnRange, ch)
			if err != nil {
				log.Error("postMutations failed", err)

double errors


internal/source/db2/lsn.go line 29 at r5 (raw file):

)

// lsn represents the offset, in bytes, of a log record from the beginning of a database log file

Please write out the full name for lsn here.


internal/source/db2/lsn.go line 31 at r5 (raw file):

// lsn represents the offset, in bytes, of a log record from the beginning of a database log file
type lsn struct {
	Value []byte

Why []byte? Do you check the length?


internal/source/db2/lsn.go line 61 at r5 (raw file):

}

type lsnMemo struct {

What's a lsnMemo vs an lsn?


internal/source/db2/lsn_test.go line 67 at r5 (raw file):

	}{
		{"zero", zero, zero, true},
		{"zero_one", zero, "00000000000000000000000000000001", false},

might as well makes these "000..001" and "FFF...FFF" constants.


internal/source/db2/queries.go line 50 at r5 (raw file):

	`

	// get mutations

get tables with mutations

also add a comment about the two different options


internal/source/db2/queries.go line 87 at r5 (raw file):

		OPCODE,
		cdc.*
		FROM  # cdc WHERE   IBMSNAP_COMMITSEQ > ? AND IBMSNAP_COMMITSEQ <= ? 

convert to templates please


internal/source/db2/queries.go line 102 at r5 (raw file):

)

type cdcTable struct {

Add comments please


internal/source/db2/queries.go line 122 at r5 (raw file):

//go:generate go run golang.org/x/tools/cmd/stringer -type=operation

type operation int

also more comments here please


internal/source/db2/queries.go line 135 at r5 (raw file):

// Open a new connection to the source database.
func (c *conn) Open() (*sql.DB, error) {

move these to conn please


internal/source/db2/queries.go line 159 at r5 (raw file):

	// TODO (silvano): memoize, use templates
	table := ident.NewTable(ident.MustSchema(ident.New(cdcTable.cdOwner)), ident.New(cdcTable.cdTable))
	query := strings.ReplaceAll(changeQuery, "#", table.Raw())

ugh


internal/source/db2/queries.go line 168 at r5 (raw file):

	cols, _ := rows.ColumnTypes()
	// the first four columns are metadata (opcode, commit_seq, intent_seq, op_id)
	// the remaining columns contain the

finish comment, why is it safe to throw out the error?


internal/source/db2/queries.go line 172 at r5 (raw file):

	for rows.Next() {
		count++
		mut := len(cols)

check to make sure len(mut) > 5


internal/source/db2/queries.go line 190 at r5 (raw file):

			return count, errors.Errorf("invalid operation  %T", res[0])
		}
		msg.op = operation(op)

Also check that it's a valid operation for us


internal/source/db2/queries.go line 240 at r5 (raw file):

		var v []byte
		err = rows.Scan(&v)
		if err == nil {

wrap this in a help function so you don't repeat the code


internal/source/db2/queries.go line 268 at r5 (raw file):

			// we can check the staging tables directly.
			if len(v) == 0 {
				return c.getMaxLsn(ctx, db)

this should be part of the sql, or this can be racy


internal/source/db2/queries.go line 320 at r5 (raw file):

}

func queryTables(ctx *stopper.Context, db *sql.DB, schema ident.Schema) (*sql.Rows, error) {

comment please... or
just inline this.

@sravotto sravotto force-pushed the sr8_db2 branch 5 times, most recently from 460ae82 to e4f67b7 Compare February 16, 2024 14:22
Copy link
Contributor Author

@sravotto sravotto left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please take another look - I merged all the changes from the main branch, so there are significant changes to use the new Batch/MultiAcceptor apis.

Reviewable status: 2 of 46 files reviewed, 38 unresolved discussions (waiting on @bobvawter, @BramGruneir, and @Github-advanced-security[bot])


.github/db2/asncdc.c line 1 at r5 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

These files need an attribution.

Done.


internal/cmd/db2/install.go line 86 at r1 (raw file):

Previously, github-advanced-security[bot] wrote…

Writable file handle closed without error handling

File handle may be writable as a result of data flow from a call to OpenFile and closing it may result in data loss upon failure, which is not handled explicitly.

Show more details

Done.


internal/cmd/db2/install.go line 131 at r1 (raw file):

Previously, github-advanced-security[bot] wrote…

Arbitrary file access during archive extraction ("Zip Slip")

Unsanitized archive entry, which may contain '..', is used in a file system operation.
Unsanitized archive entry, which may contain '..', is used in a file system operation.

Show more details

Done.


internal/cmd/db2/install.go line 138 at r1 (raw file):

Previously, github-advanced-security[bot] wrote…

Arbitrary file write extracting an archive containing symbolic links

Unresolved path from an archive header, which may point outside the archive root, is used in symlink creation.

Show more details

Done.


internal/cmd/db2/install.go line 151 at r1 (raw file):

Previously, github-advanced-security[bot] wrote…

Arbitrary file write extracting an archive containing symbolic links

Unresolved path from an archive header, which may point outside the archive root, is used in symlink creation.

Show more details

Done.


internal/cmd/db2/install.go line 190 at r1 (raw file):

Previously, github-advanced-security[bot] wrote…

Arbitrary file access during archive extraction ("Zip Slip")

Unsanitized archive entry, which may contain '..', is used in a file system operation.
Unsanitized archive entry, which may contain '..', is used in a file system operation.

Show more details

Done.


internal/cmd/db2/install.go line 225 at r2 (raw file):

Previously, github-advanced-security[bot] wrote…

Arbitrary file access during archive extraction ("Zip Slip")

Unsanitized archive entry, which may contain '..', is used in a file system operation.
Unsanitized archive entry, which may contain '..', is used in a file system operation.

Show more details

Done.


internal/cmd/db2/install.go line 147 at r3 (raw file):

Previously, github-advanced-security[bot] wrote…

Arbitrary file write extracting an archive containing symbolic links

Unresolved path from an archive header, which may point outside the archive root, is used in symlink creation.

Show more details

Done.


internal/cmd/db2/install.go line 17 at r5 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

Maybe only let this work in linux.
I'm on the fence of this just displaying the requirements if the driver isn't found when calling db2 instead of automating the install.

I'll leave it for unix/mac - I made the subcommand hidden as well. I think it's useful for testing, we can revisit at later time.


internal/cmd/db2/install.go line 44 at r5 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

Add a dry run here so you can just see what file it's trying to download.

Done.


internal/source/db2/conn.go line 70 at r5 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

mutBatchSize

Done. With the new code we have batch.Size() so not longer needed.


internal/source/db2/conn.go line 99 at r5 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

move up

Done.


internal/source/db2/conn.go line 133 at r5 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

move these two down to where they're used

Done.


internal/source/db2/conn.go line 165 at r5 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

wrap in if ... ; err != nil {

Done.


internal/source/db2/conn.go line 170 at r5 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

maybe error?

Done.


internal/source/db2/conn.go line 185 at r5 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

this should be info, maybe just debug

Done.


internal/source/db2/conn.go line 187 at r5 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

if there is an error, log as an error!
Otherwise, as info

Done.


internal/source/db2/conn.go line 193 at r5 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

check for safety?

Done. Changed to use a notify.Var


internal/source/db2/conn.go line 202 at r5 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

Add config option for this, call it sleep when no results found

Done.


internal/source/db2/conn.go line 224 at r5 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

double error logging?

Done.


internal/source/db2/conn.go line 230 at r5 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

these lines seem redundant

Done.


internal/source/db2/conn.go line 239 at r5 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

double errors

Done.


internal/source/db2/lsn.go line 29 at r5 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

Please write out the full name for lsn here.

Done.


internal/source/db2/lsn.go line 31 at r5 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

Why []byte? Do you check the length?

Done.


internal/source/db2/lsn.go line 61 at r5 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

What's a lsnMemo vs an lsn?

lsnMemo is needed for marshalling the lsn into a JSON object.


internal/source/db2/lsn_test.go line 67 at r5 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

might as well makes these "000..001" and "FFF...FFF" constants.

Done.


internal/source/db2/queries.go line 50 at r5 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

get tables with mutations

also add a comment about the two different options

Done.


internal/source/db2/queries.go line 87 at r5 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

convert to templates please

Done.


internal/source/db2/queries.go line 102 at r5 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

Add comments please

I moved to conn and cleanup. we don't really need all these fields.


internal/source/db2/queries.go line 122 at r5 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

also more comments here please

Done.


internal/source/db2/queries.go line 135 at r5 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

move these to conn please

Done.


internal/source/db2/queries.go line 159 at r5 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

ugh

Done.


internal/source/db2/queries.go line 168 at r5 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

finish comment, why is it safe to throw out the error?

Done.


internal/source/db2/queries.go line 172 at r5 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

check to make sure len(mut) > 5

Done.


internal/source/db2/queries.go line 190 at r5 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

Also check that it's a valid operation for us

Done.


internal/source/db2/queries.go line 240 at r5 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

wrap this in a help function so you don't repeat the code

Done.


internal/source/db2/queries.go line 268 at r5 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

this should be part of the sql, or this can be racy

Done.


internal/source/db2/queries.go line 320 at r5 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

comment please... or
just inline this.

Done.

@sravotto sravotto changed the title First draft for the DB2 connector. DB2 connector - Feature Preview Feb 26, 2024
@sravotto sravotto marked this pull request as ready for review February 26, 2024 19:42
This change adds a connector to a DB2 database source.
It uses DB2 SQL replication, leveraging staging tables in the source database.
For instructions on how to put table in capture mode in DB2, so that changes can
be sent to the staging tables, see
https://debezium.io/documentation/reference/stable/connectors/db2.html#putting-tables-in-capture-mode.

The connector periodically pulls new mutations from the staging tables and inserts them
to the target database via the types.MultiAcceptor interface.
Copy link
Member

@bobvawter bobvawter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 2 of 46 files reviewed, 48 unresolved discussions (waiting on @BramGruneir, @Github-advanced-security[bot], and @sravotto)


Makefile line 2 at r9 (raw file):

PWD = $(shell pwd)

This should be a shell script.


Makefile line 18 at r9 (raw file):

drivers/clidriver: 
	go run . db2install --dest drivers

This makes the CI system depend upon somebody else's download server. If we can't redistribute the driver package, we can at least maintain a private copy within test environment so we're not adding unnecessary load to another company's hosting infrastructure.


.github/db2/inventory.sql line 2 at r9 (raw file):

-- Create and populate our products using a single insert with many rows

Attribution?


.github/workflows/go-test-db2.yaml line 1 at r9 (raw file):

# Copyright 2023 The Cockroach Authors

Why is this separate from go-test.yaml?


.github/workflows/go-tests.yaml line 46 at r9 (raw file):

      - name: Copyright headers
        if: ${{ !cancelled() }}
        run: go run github.com/google/addlicense -c "The Cockroach Authors" -l apache -s -v  -check -ignore ".github/db2/**" -ignore '**/testdata/**/*.sql' .

Why are the db2 support files hidden in .github ?


internal/cmd/db2/install.go line 17 at r5 (raw file):

Previously, sravotto (silvano) wrote…

I'll leave it for unix/mac - I made the subcommand hidden as well. I think it's useful for testing, we can revisit at later time.

The download command is harmful for automated testing unless the download source is under our control.


internal/cmd/db2/install.go line 37 at r9 (raw file):

const (
	defaultURL  = "https://public.dhe.ibm.com/ibmdl/export/pub/software/data/db2/drivers/odbc_cli/"

Are these drivers freely re-distributable? If so, we should maintain our own mirror so the build doesn't depend on the internet. Also, if they're not freely distributable, I've been told that automating a download like this can be unfavorably viewed as de-facto redistribution, especially if there are end-user license agreements.


internal/cmd/db2/install.go line 120 at r9 (raw file):

	switch runtime.GOOS {
	case "darwin":
		pkgName = "macos64_odbc_cli.tar.gz"

Is this also arch-specific?


internal/source/db2/conn.go line 252 at r9 (raw file):

		err := c.db.Close()
		if err != nil {
			log.Errorf("Error closing the database. %q", err)

This isn't the standard for logging errors since it will throw away stack info. Use log.WithError(err).Error("message")


internal/source/db2/conn.go line 430 at r9 (raw file):

		c.config.user,
		c.config.password)
	log.Debugf("DB2 connection: HOSTNAME=%s;DATABASE=%s;PORT=%d;UID=%s;PWD=...",

log.Debugf("DB2 connection: %s", con)


internal/source/db2/conn.go line 476 at r9 (raw file):

// populateTableMetadata fetches the column metadata for a given table
// and caches it.

What happens during schema changes?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants