Skip to content

Commit

Permalink
DBZ-2317 Backporting "DBZ-2027 Upgrading Postgres JDBC driver to 42.2…
Browse files Browse the repository at this point in the history
….12";

The driver upgrade mitigates some issues with using this connector with
Postgres on Azure. It comes with some behavioural changes, though:

* column metadata for DECIMAL without scale is returned differently by
the (see pgjdbc/pgjdbc#1767): while it used
to be returned as 0, it's now returned as null. This should be
transparent to DBZ consumers
* snapshots for partitioned tables only export change events on the
partition topics now due to pgjdbc/pgjdbc#1708;
this has an impact on consumers, but I think it's more reasonable than
exporting all change events twice, one partition table and main table
topics
  • Loading branch information
gunnarmorling committed Jul 15, 2020
1 parent 8ef778e commit 31e8bfc
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 6 deletions.
Expand Up @@ -300,7 +300,7 @@ private boolean schemaChanged(List<ReplicationMessage.Column> columns, Table tab
incomingLength);
return true;
}
final int localScale = column.scale().get();
final int localScale = column.scale().orElseGet(() -> 0);
final int incomingScale = message.getTypeMetadata().getScale();
if (localScale != incomingScale) {
logger.info("detected new scale for column '{}', old scale was {}, new scale is {}; refreshing table schema", columnName, localScale,
Expand Down
Expand Up @@ -297,7 +297,7 @@ private SchemaBuilder numericSchema(Column column) {
if (decimalMode == DecimalMode.PRECISE && isVariableScaleDecimal(column)) {
return VariableScaleDecimal.builder();
}
return SpecialValueDecimal.builder(decimalMode, column.length(), column.scale().get());
return SpecialValueDecimal.builder(decimalMode, column.length(), column.scale().orElseGet(() -> 0));
}

private SchemaBuilder hstoreSchema() {
Expand Down Expand Up @@ -840,8 +840,8 @@ else if (data instanceof PgArray) {
}

private boolean isVariableScaleDecimal(final Column column) {
return (column.scale().isPresent() && column.scale().get() == 0 && column.length() == VARIABLE_SCALE_DECIMAL_LENGTH)
|| (!column.scale().isPresent() && column.length() == -1);
return column.length() == VARIABLE_SCALE_DECIMAL_LENGTH &&
column.scale().orElseGet(() -> 0) == 0;
}

public static Optional<SpecialValueDecimal> toSpecialValue(String value) {
Expand Down
Expand Up @@ -112,6 +112,7 @@ public void converterFor(RelationalColumn column, ConverterRegistration<SchemaBu
}
}

@Override
protected List<SchemaAndValueField> schemasAndValuesForCustomConverterTypes() {
return Arrays.asList(new SchemaAndValueField("i",
SchemaBuilder.string().name("io.debezium.postgresql.type.Isbn").build(), "0-393-04002-X"));
Expand Down Expand Up @@ -408,7 +409,7 @@ public void shouldGenerateSnapshotsForPartitionedTables() throws Exception {
// then start the producer and validate all records are there
buildNoStreamProducer(TestHelper.defaultConfig());

TestConsumer consumer = testConsumer(1 + 2 * 30); // Every record comes once from partitioned table and from partition
TestConsumer consumer = testConsumer(1 + 30);
consumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);

Set<Integer> ids = new HashSet<>();
Expand All @@ -434,7 +435,7 @@ public void shouldGenerateSnapshotsForPartitionedTables() throws Exception {

// verify each topic contains exactly the number of input records
assertEquals(1, topicCounts.get("test_server.public.first_table").intValue());
assertEquals(30, topicCounts.get("test_server.public.partitioned").intValue());
assertEquals(0, topicCounts.get("test_server.public.partitioned").intValue());
assertEquals(10, topicCounts.get("test_server.public.partitioned_1_100").intValue());
assertEquals(20, topicCounts.get("test_server.public.partitioned_101_200").intValue());

Expand Down

0 comments on commit 31e8bfc

Please sign in to comment.