From 724f487007ed6012aed6a33884d5152b66c9efb5 Mon Sep 17 00:00:00 2001 From: MartijnVisser Date: Thu, 24 Mar 2022 11:21:53 +0100 Subject: [PATCH] [FLINK-25926][Connectors][JDBC] Update org.postgresql:postgresql to 42.3.3. This required adding support for smallserial and applying changes after refactoring pgjdbc/pgjdbc#1194. PG's bytea[] now uses primitive arrays. This closes #18604 Co-authored-by: Sergey Nuyanzin --- flink-connectors/flink-connector-jdbc/pom.xml | 2 +- .../jdbc/dialect/psql/PostgresTypeMapper.java | 2 + .../converter/PostgresRowConverter.java | 49 ++++++------------- .../jdbc/catalog/PostgresCatalogITCase.java | 2 +- 4 files changed, 19 insertions(+), 36 deletions(-) diff --git a/flink-connectors/flink-connector-jdbc/pom.xml b/flink-connectors/flink-connector-jdbc/pom.xml index 55fbfdf2d413c7..efef548b338090 100644 --- a/flink-connectors/flink-connector-jdbc/pom.xml +++ b/flink-connectors/flink-connector-jdbc/pom.xml @@ -36,7 +36,7 @@ under the License. jar - 42.2.10 + 42.3.3 19.3.0.0 diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/psql/PostgresTypeMapper.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/psql/PostgresTypeMapper.java index 644586ba135741..b6c478ab268223 100644 --- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/psql/PostgresTypeMapper.java +++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/psql/PostgresTypeMapper.java @@ -49,6 +49,7 @@ public class PostgresTypeMapper implements JdbcDialectTypeMapper { // float <=> float8 // boolean <=> bool // decimal <=> numeric + private static final String PG_SMALLSERIAL = "smallserial"; private static final String PG_SERIAL = "serial"; private static final String PG_BIGSERIAL = "bigserial"; private static final String PG_BYTEA = "bytea"; @@ -102,6 +103,7 @@ public DataType mapping(ObjectPath tablePath, ResultSetMetaData metadata, int co case PG_BYTEA_ARRAY: return DataTypes.ARRAY(DataTypes.BYTES()); case PG_SMALLINT: + case PG_SMALLSERIAL: return DataTypes.SMALLINT(); case PG_SMALLINT_ARRAY: return DataTypes.ARRAY(DataTypes.SMALLINT()); diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/converter/PostgresRowConverter.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/converter/PostgresRowConverter.java index 60b936963bb773..265476ac1bec3a 100644 --- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/converter/PostgresRowConverter.java +++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/converter/PostgresRowConverter.java @@ -22,13 +22,11 @@ import org.apache.flink.table.data.GenericArrayData; import org.apache.flink.table.types.logical.ArrayType; import org.apache.flink.table.types.logical.LogicalType; -import org.apache.flink.table.types.logical.LogicalTypeFamily; import org.apache.flink.table.types.logical.LogicalTypeRoot; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.utils.LogicalTypeUtils; import org.postgresql.jdbc.PgArray; -import org.postgresql.util.PGobject; import java.lang.reflect.Array; @@ -78,38 +76,21 @@ protected JdbcSerializationConverter createNullableExternalConverter(LogicalType } private JdbcDeserializationConverter createPostgresArrayConverter(ArrayType arrayType) { - // PG's bytea[] is wrapped in PGobject, rather than primitive byte arrays - if (arrayType.getElementType().is(LogicalTypeFamily.BINARY_STRING)) { - final Class elementClass = - LogicalTypeUtils.toInternalConversionClass(arrayType.getElementType()); - final JdbcDeserializationConverter elementConverter = - createNullableInternalConverter(arrayType.getElementType()); - - return val -> { - PgArray pgArray = (PgArray) val; - Object[] in = (Object[]) pgArray.getArray(); - final Object[] array = (Object[]) Array.newInstance(elementClass, in.length); - for (int i = 0; i < in.length; i++) { - array[i] = - elementConverter.deserialize(((PGobject) in[i]).getValue().getBytes()); - } - return new GenericArrayData(array); - }; - } else { - final Class elementClass = - LogicalTypeUtils.toInternalConversionClass(arrayType.getElementType()); - final JdbcDeserializationConverter elementConverter = - createNullableInternalConverter(arrayType.getElementType()); - return val -> { - PgArray pgArray = (PgArray) val; - Object[] in = (Object[]) pgArray.getArray(); - final Object[] array = (Object[]) Array.newInstance(elementClass, in.length); - for (int i = 0; i < in.length; i++) { - array[i] = elementConverter.deserialize(in[i]); - } - return new GenericArrayData(array); - }; - } + // Since PGJDBC 42.2.15 (https://github.com/pgjdbc/pgjdbc/pull/1194) bytea[] is wrapped in + // primitive byte arrays + final Class elementClass = + LogicalTypeUtils.toInternalConversionClass(arrayType.getElementType()); + final JdbcDeserializationConverter elementConverter = + createNullableInternalConverter(arrayType.getElementType()); + return val -> { + PgArray pgArray = (PgArray) val; + Object[] in = (Object[]) pgArray.getArray(); + final Object[] array = (Object[]) Array.newInstance(elementClass, in.length); + for (int i = 0; i < in.length; i++) { + array[i] = elementConverter.deserialize(in[i]); + } + return new GenericArrayData(array); + }; } // Have its own method so that Postgres can support primitives that super class doesn't support diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogITCase.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogITCase.java index 020b7da021582f..8abaaa234f5a4c 100644 --- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogITCase.java +++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogITCase.java @@ -154,7 +154,7 @@ public void testArrayTypes() { assertEquals( "[+I[" + "[1, 2, 3], " - + "[[92, 120, 51, 50], [92, 120, 51, 51], [92, 120, 51, 52]], " + + "[[50], [51], [52]], " + "[3, 4, 5], " + "[4, 5, 6], " + "[5.5, 6.6, 7.7], "