diff --git a/flink-connectors/flink-connector-jdbc/pom.xml b/flink-connectors/flink-connector-jdbc/pom.xml index 55fbfdf2d413c..efef548b33809 100644 --- a/flink-connectors/flink-connector-jdbc/pom.xml +++ b/flink-connectors/flink-connector-jdbc/pom.xml @@ -36,7 +36,7 @@ under the License. jar - 42.2.10 + 42.3.3 19.3.0.0 diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/psql/PostgresTypeMapper.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/psql/PostgresTypeMapper.java index 644586ba13574..12a94a569111f 100644 --- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/psql/PostgresTypeMapper.java +++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/psql/PostgresTypeMapper.java @@ -49,6 +49,8 @@ public class PostgresTypeMapper implements JdbcDialectTypeMapper { // float <=> float8 // boolean <=> bool // decimal <=> numeric + private static final String PG_SMALLSERIAL = "smallserial"; + private static final String PG_SMALLSERIAL_ARRAY = "_smallserial"; private static final String PG_SERIAL = "serial"; private static final String PG_BIGSERIAL = "bigserial"; private static final String PG_BYTEA = "bytea"; @@ -102,7 +104,9 @@ public DataType mapping(ObjectPath tablePath, ResultSetMetaData metadata, int co case PG_BYTEA_ARRAY: return DataTypes.ARRAY(DataTypes.BYTES()); case PG_SMALLINT: + case PG_SMALLSERIAL: return DataTypes.SMALLINT(); + case PG_SMALLSERIAL_ARRAY: case PG_SMALLINT_ARRAY: return DataTypes.ARRAY(DataTypes.SMALLINT()); case PG_INTEGER: diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/converter/PostgresRowConverter.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/converter/PostgresRowConverter.java index 60b936963bb77..02d20cc764d65 100644 --- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/converter/PostgresRowConverter.java +++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/converter/PostgresRowConverter.java @@ -91,7 +91,10 @@ private JdbcDeserializationConverter createPostgresArrayConverter(ArrayType arra final Object[] array = (Object[]) Array.newInstance(elementClass, in.length); for (int i = 0; i < in.length; i++) { array[i] = - elementConverter.deserialize(((PGobject) in[i]).getValue().getBytes()); + elementConverter.deserialize( + in[i] instanceof byte[] + ? in[i] + : ((PGobject) in[i]).getValue().getBytes()); } return new GenericArrayData(array); }; diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogITCase.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogITCase.java index 020b7da021582..8abaaa234f5a4 100644 --- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogITCase.java +++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogITCase.java @@ -154,7 +154,7 @@ public void testArrayTypes() { assertEquals( "[+I[" + "[1, 2, 3], " - + "[[92, 120, 51, 50], [92, 120, 51, 51], [92, 120, 51, 52]], " + + "[[50], [51], [52]], " + "[3, 4, 5], " + "[4, 5, 6], " + "[5.5, 6.6, 7.7], "