Skip to content

Commit

Permalink
DBZ-2027 Upgrading Postgres JDBC driver to 42.2.12;
Browse files Browse the repository at this point in the history
The driver upgrade mitigates some issues with using this connector with
Postgres on Azure. It comes with some behavioural changes, though:

* column metadata for DECIMAL without scale is returned differently by
the (see pgjdbc/pgjdbc#1767): while it used
to be returned as 0, it's now returned as null. This should be
transparent to DBZ consumers
* snapshots for partitioned tables only export change events on the
partition topics now due to pgjdbc/pgjdbc#1708;
this has an impact on consumers, but I think it's more reasonable than
exporting all change events twice, one partition table and main table
topics
  • Loading branch information
gunnarmorling committed Apr 29, 2020
1 parent a6d1383 commit ccac215
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 7 deletions.
Expand Up @@ -300,7 +300,7 @@ private boolean schemaChanged(List<ReplicationMessage.Column> columns, Table tab
incomingLength);
return true;
}
final int localScale = column.scale().get();
final int localScale = column.scale().orElseGet(() -> 0);
final int incomingScale = message.getTypeMetadata().getScale();
if (localScale != incomingScale) {
logger.info("detected new scale for column '{}', old scale was {}, new scale is {}; refreshing table schema", columnName, localScale,
Expand Down
Expand Up @@ -299,7 +299,7 @@ private SchemaBuilder numericSchema(Column column) {
if (decimalMode == DecimalMode.PRECISE && isVariableScaleDecimal(column)) {
return VariableScaleDecimal.builder();
}
return SpecialValueDecimal.builder(decimalMode, column.length(), column.scale().get());
return SpecialValueDecimal.builder(decimalMode, column.length(), column.scale().orElseGet(() -> 0));
}

private SchemaBuilder hstoreSchema() {
Expand Down Expand Up @@ -840,8 +840,8 @@ else if (data instanceof PgArray) {
}

private boolean isVariableScaleDecimal(final Column column) {
return (column.scale().isPresent() && column.scale().get() == 0 && column.length() == VARIABLE_SCALE_DECIMAL_LENGTH)
|| (!column.scale().isPresent() && column.length() == -1);
return column.length() == VARIABLE_SCALE_DECIMAL_LENGTH &&
column.scale().orElseGet(() -> 0) == 0;
}

public static Optional<SpecialValueDecimal> toSpecialValue(String value) {
Expand Down
Expand Up @@ -112,6 +112,7 @@ public void converterFor(RelationalColumn column, ConverterRegistration<SchemaBu
}
}

@Override
protected List<SchemaAndValueField> schemasAndValuesForCustomConverterTypes() {
return Arrays.asList(new SchemaAndValueField("i",
SchemaBuilder.string().name("io.debezium.postgresql.type.Isbn").build(), "0-393-04002-X"));
Expand Down Expand Up @@ -408,7 +409,7 @@ public void shouldGenerateSnapshotsForPartitionedTables() throws Exception {
// then start the producer and validate all records are there
buildNoStreamProducer(TestHelper.defaultConfig());

TestConsumer consumer = testConsumer(1 + 2 * 30); // Every record comes once from partitioned table and from partition
TestConsumer consumer = testConsumer(1 + 30);
consumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);

Set<Integer> ids = new HashSet<>();
Expand All @@ -434,7 +435,7 @@ public void shouldGenerateSnapshotsForPartitionedTables() throws Exception {

// verify each topic contains exactly the number of input records
assertEquals(1, topicCounts.get("test_server.public.first_table").intValue());
assertEquals(30, topicCounts.get("test_server.public.partitioned").intValue());
assertEquals(0, topicCounts.get("test_server.public.partitioned").intValue());
assertEquals(10, topicCounts.get("test_server.public.partitioned_1_100").intValue());
assertEquals(20, topicCounts.get("test_server.public.partitioned_101_200").intValue());

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Expand Up @@ -67,7 +67,7 @@
<version.confluent.platform>5.4.1</version.confluent.platform>

<!-- Databases -->
<version.postgresql.driver>42.2.9</version.postgresql.driver>
<version.postgresql.driver>42.2.12</version.postgresql.driver>
<version.mysql.server>5.7</version.mysql.server>
<version.mysql.driver>8.0.16</version.mysql.driver>
<version.mysql.binlog>0.19.1</version.mysql.binlog>
Expand Down

0 comments on commit ccac215

Please sign in to comment.