diff --git a/flink-connectors/flink-connector-jdbc/pom.xml b/flink-connectors/flink-connector-jdbc/pom.xml
index 55fbfdf2d413c7..efef548b338090 100644
--- a/flink-connectors/flink-connector-jdbc/pom.xml
+++ b/flink-connectors/flink-connector-jdbc/pom.xml
@@ -36,7 +36,7 @@ under the License.
jar
- 42.2.10
+ 42.3.3
19.3.0.0
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/psql/PostgresTypeMapper.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/psql/PostgresTypeMapper.java
index 644586ba135741..b6c478ab268223 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/psql/PostgresTypeMapper.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/psql/PostgresTypeMapper.java
@@ -49,6 +49,7 @@ public class PostgresTypeMapper implements JdbcDialectTypeMapper {
// float <=> float8
// boolean <=> bool
// decimal <=> numeric
+ private static final String PG_SMALLSERIAL = "smallserial";
private static final String PG_SERIAL = "serial";
private static final String PG_BIGSERIAL = "bigserial";
private static final String PG_BYTEA = "bytea";
@@ -102,6 +103,7 @@ public DataType mapping(ObjectPath tablePath, ResultSetMetaData metadata, int co
case PG_BYTEA_ARRAY:
return DataTypes.ARRAY(DataTypes.BYTES());
case PG_SMALLINT:
+ case PG_SMALLSERIAL:
return DataTypes.SMALLINT();
case PG_SMALLINT_ARRAY:
return DataTypes.ARRAY(DataTypes.SMALLINT());
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/converter/PostgresRowConverter.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/converter/PostgresRowConverter.java
index 60b936963bb773..265476ac1bec3a 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/converter/PostgresRowConverter.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/converter/PostgresRowConverter.java
@@ -22,13 +22,11 @@
import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.LogicalTypeFamily;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
import org.postgresql.jdbc.PgArray;
-import org.postgresql.util.PGobject;
import java.lang.reflect.Array;
@@ -78,38 +76,21 @@ protected JdbcSerializationConverter createNullableExternalConverter(LogicalType
}
private JdbcDeserializationConverter createPostgresArrayConverter(ArrayType arrayType) {
- // PG's bytea[] is wrapped in PGobject, rather than primitive byte arrays
- if (arrayType.getElementType().is(LogicalTypeFamily.BINARY_STRING)) {
- final Class> elementClass =
- LogicalTypeUtils.toInternalConversionClass(arrayType.getElementType());
- final JdbcDeserializationConverter elementConverter =
- createNullableInternalConverter(arrayType.getElementType());
-
- return val -> {
- PgArray pgArray = (PgArray) val;
- Object[] in = (Object[]) pgArray.getArray();
- final Object[] array = (Object[]) Array.newInstance(elementClass, in.length);
- for (int i = 0; i < in.length; i++) {
- array[i] =
- elementConverter.deserialize(((PGobject) in[i]).getValue().getBytes());
- }
- return new GenericArrayData(array);
- };
- } else {
- final Class> elementClass =
- LogicalTypeUtils.toInternalConversionClass(arrayType.getElementType());
- final JdbcDeserializationConverter elementConverter =
- createNullableInternalConverter(arrayType.getElementType());
- return val -> {
- PgArray pgArray = (PgArray) val;
- Object[] in = (Object[]) pgArray.getArray();
- final Object[] array = (Object[]) Array.newInstance(elementClass, in.length);
- for (int i = 0; i < in.length; i++) {
- array[i] = elementConverter.deserialize(in[i]);
- }
- return new GenericArrayData(array);
- };
- }
+ // Since PGJDBC 42.2.15 (https://github.com/pgjdbc/pgjdbc/pull/1194) bytea[] is wrapped in
+ // primitive byte arrays
+ final Class> elementClass =
+ LogicalTypeUtils.toInternalConversionClass(arrayType.getElementType());
+ final JdbcDeserializationConverter elementConverter =
+ createNullableInternalConverter(arrayType.getElementType());
+ return val -> {
+ PgArray pgArray = (PgArray) val;
+ Object[] in = (Object[]) pgArray.getArray();
+ final Object[] array = (Object[]) Array.newInstance(elementClass, in.length);
+ for (int i = 0; i < in.length; i++) {
+ array[i] = elementConverter.deserialize(in[i]);
+ }
+ return new GenericArrayData(array);
+ };
}
// Have its own method so that Postgres can support primitives that super class doesn't support
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogITCase.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogITCase.java
index 020b7da021582f..8abaaa234f5a4c 100644
--- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogITCase.java
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogITCase.java
@@ -154,7 +154,7 @@ public void testArrayTypes() {
assertEquals(
"[+I["
+ "[1, 2, 3], "
- + "[[92, 120, 51, 50], [92, 120, 51, 51], [92, 120, 51, 52]], "
+ + "[[50], [51], [52]], "
+ "[3, 4, 5], "
+ "[4, 5, 6], "
+ "[5.5, 6.6, 7.7], "