Skip to content

Commit

Permalink
[FLINK-25926][Connectors][JDBC] Update org.postgresql:postgresql to 4…
Browse files Browse the repository at this point in the history
…2.3.3. This required adding support for smallserial and applying changes after refactoring pgjdbc/pgjdbc#1194. PG's bytea[] now uses primitive arrays. This closes apache#18604

Co-authored-by: Sergey Nuyanzin <snuyanzin@gmail.com>
  • Loading branch information
MartijnVisser and snuyanzin committed Apr 7, 2022
1 parent 12620e8 commit 2089716
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 36 deletions.
2 changes: 1 addition & 1 deletion flink-connectors/flink-connector-jdbc/pom.xml
Expand Up @@ -36,7 +36,7 @@ under the License.
<packaging>jar</packaging>

<properties>
<postgres.version>42.2.10</postgres.version>
<postgres.version>42.3.3</postgres.version>
<oracle.version>19.3.0.0</oracle.version>
</properties>

Expand Down
Expand Up @@ -49,6 +49,7 @@ public class PostgresTypeMapper implements JdbcDialectTypeMapper {
// float <=> float8
// boolean <=> bool
// decimal <=> numeric
private static final String PG_SMALLSERIAL = "smallserial";
private static final String PG_SERIAL = "serial";
private static final String PG_BIGSERIAL = "bigserial";
private static final String PG_BYTEA = "bytea";
Expand Down Expand Up @@ -102,6 +103,7 @@ public DataType mapping(ObjectPath tablePath, ResultSetMetaData metadata, int co
case PG_BYTEA_ARRAY:
return DataTypes.ARRAY(DataTypes.BYTES());
case PG_SMALLINT:
case PG_SMALLSERIAL:
return DataTypes.SMALLINT();
case PG_SMALLINT_ARRAY:
return DataTypes.ARRAY(DataTypes.SMALLINT());
Expand Down
Expand Up @@ -22,13 +22,11 @@
import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeFamily;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;

import org.postgresql.jdbc.PgArray;
import org.postgresql.util.PGobject;

import java.lang.reflect.Array;

Expand Down Expand Up @@ -78,38 +76,21 @@ protected JdbcSerializationConverter createNullableExternalConverter(LogicalType
}

private JdbcDeserializationConverter createPostgresArrayConverter(ArrayType arrayType) {
// PG's bytea[] is wrapped in PGobject, rather than primitive byte arrays
if (arrayType.getElementType().is(LogicalTypeFamily.BINARY_STRING)) {
final Class<?> elementClass =
LogicalTypeUtils.toInternalConversionClass(arrayType.getElementType());
final JdbcDeserializationConverter elementConverter =
createNullableInternalConverter(arrayType.getElementType());

return val -> {
PgArray pgArray = (PgArray) val;
Object[] in = (Object[]) pgArray.getArray();
final Object[] array = (Object[]) Array.newInstance(elementClass, in.length);
for (int i = 0; i < in.length; i++) {
array[i] =
elementConverter.deserialize(((PGobject) in[i]).getValue().getBytes());
}
return new GenericArrayData(array);
};
} else {
final Class<?> elementClass =
LogicalTypeUtils.toInternalConversionClass(arrayType.getElementType());
final JdbcDeserializationConverter elementConverter =
createNullableInternalConverter(arrayType.getElementType());
return val -> {
PgArray pgArray = (PgArray) val;
Object[] in = (Object[]) pgArray.getArray();
final Object[] array = (Object[]) Array.newInstance(elementClass, in.length);
for (int i = 0; i < in.length; i++) {
array[i] = elementConverter.deserialize(in[i]);
}
return new GenericArrayData(array);
};
}
// Since PGJDBC 42.2.15 (https://github.com/pgjdbc/pgjdbc/pull/1194) bytea[] is wrapped in
// primitive byte arrays
final Class<?> elementClass =
LogicalTypeUtils.toInternalConversionClass(arrayType.getElementType());
final JdbcDeserializationConverter elementConverter =
createNullableInternalConverter(arrayType.getElementType());
return val -> {
PgArray pgArray = (PgArray) val;
Object[] in = (Object[]) pgArray.getArray();
final Object[] array = (Object[]) Array.newInstance(elementClass, in.length);
for (int i = 0; i < in.length; i++) {
array[i] = elementConverter.deserialize(in[i]);
}
return new GenericArrayData(array);
};
}

// Have its own method so that Postgres can support primitives that super class doesn't support
Expand Down
Expand Up @@ -154,7 +154,7 @@ public void testArrayTypes() {
assertEquals(
"[+I["
+ "[1, 2, 3], "
+ "[[92, 120, 51, 50], [92, 120, 51, 51], [92, 120, 51, 52]], "
+ "[[50], [51], [52]], "
+ "[3, 4, 5], "
+ "[4, 5, 6], "
+ "[5.5, 6.6, 7.7], "
Expand Down

0 comments on commit 2089716

Please sign in to comment.