diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeRecordEmitter.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeRecordEmitter.java index 4030abdebb2..53a814df3ef 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeRecordEmitter.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeRecordEmitter.java @@ -341,10 +341,10 @@ private Table tableFromFromMessage(List columns, Tabl final PostgresType type = column.getType(); final ColumnEditor columnEditor = Column.editor() .name(column.getName()) - .jdbcType(type.getJdbcId()) + .jdbcType(type.getRootType().getJdbcId()) .type(type.getName()) .optional(column.isOptional()) - .nativeType(type.getOid()); + .nativeType(type.getRootType().getOid()); columnEditor.length(column.getTypeMetadata().getLength()); columnEditor.scale(column.getTypeMetadata().getScale()); return columnEditor.create(); diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresType.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresType.java index f41fdb14b8a..789bc879c64 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresType.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresType.java @@ -5,6 +5,7 @@ */ package io.debezium.connector.postgresql; +import java.util.List; import java.util.Objects; import org.postgresql.core.Oid; @@ -19,25 +20,31 @@ */ public class PostgresType { - public static final PostgresType UNKNOWN = new PostgresType("unknown", -1, Integer.MIN_VALUE, null); + public static final PostgresType UNKNOWN = new PostgresType("unknown", -1, Integer.MIN_VALUE, null, null, null, null); private final String name; private final int oid; private final int jdbcId; + private final PostgresType parentType; private final PostgresType elementType; private final TypeInfo typeInfo; + private final int modifiers; + private final List enumValues; - public PostgresType(String name, int oid, int jdbcId, TypeInfo typeInfo) { - this(name, oid, jdbcId, typeInfo, null); + private PostgresType(String name, int oid, int jdbcId, TypeInfo typeInfo, List enumValues, PostgresType parentType, PostgresType elementType) { + this(name, oid, jdbcId, TypeRegistry.NO_TYPE_MODIFIER, typeInfo, enumValues, parentType, elementType); } - public PostgresType(String name, int oid, int jdbcId, TypeInfo typeInfo, PostgresType elementType) { + private PostgresType(String name, int oid, int jdbcId, int modifiers, TypeInfo typeInfo, List enumValues, PostgresType parentType, PostgresType elementType) { Objects.requireNonNull(name); this.name = name; this.oid = oid; this.jdbcId = jdbcId; - this.elementType = elementType; this.typeInfo = typeInfo; + this.parentType = parentType; + this.elementType = elementType; + this.modifiers = modifiers; + this.enumValues = enumValues; } /** @@ -47,6 +54,24 @@ public boolean isArrayType() { return elementType != null; } + /** + * The type system allows for the creation of user defined types (UDTs) which can be based + * on any existing type. When a type does not extend another type, it is considered to be + * a base or root type in the type hierarchy. + * + * @return true if this type is a base/root type + */ + public boolean isRootType() { + return parentType == null; + } + + /** + * @return true if this type is an enum type + */ + public boolean isEnumType() { + return enumValues != null; + } + /** * * @return symbolic name of the type @@ -79,6 +104,30 @@ public PostgresType getElementType() { return elementType; } + /** + * + * @return the parent postgres type this type is based upon + */ + public PostgresType getParentType() { + return parentType; + } + + /** + * + * @return the postgres type at the top/root level for this type's hierarchy + */ + public PostgresType getRootType() { + PostgresType rootType = this; + while (!rootType.isRootType()) { + rootType = rootType.getParentType(); + } + return rootType; + } + + public List getEnumValues() { + return enumValues; + } + /** * * @return the default length of the type @@ -87,9 +136,23 @@ public int getDefaultLength() { if (typeInfo == null) { return TypeRegistry.UNKNOWN_LENGTH; } - int size = typeInfo.getPrecision(oid, TypeRegistry.NO_TYPE_MODIFIER); + if (parentType != null) { + if (modifiers == TypeRegistry.NO_TYPE_MODIFIER) { + return parentType.getDefaultLength(); + } + else { + int size = typeInfo.getPrecision(parentType.getOid(), modifiers); + if (size == 0) { + size = typeInfo.getDisplaySize(parentType.getOid(), modifiers); + } + if (size != 0 && size != Integer.MAX_VALUE) { + return size; + } + } + } + int size = typeInfo.getPrecision(oid, modifiers); if (size == 0) { - size = typeInfo.getDisplaySize(oid, TypeRegistry.NO_TYPE_MODIFIER); + size = typeInfo.getDisplaySize(oid, modifiers); } return size; } @@ -102,7 +165,15 @@ public int getDefaultScale() { if (typeInfo == null) { return TypeRegistry.UNKNOWN_LENGTH; } - return typeInfo.getScale(oid, TypeRegistry.NO_TYPE_MODIFIER); + if (parentType != null) { + if (modifiers == TypeRegistry.NO_TYPE_MODIFIER) { + return parentType.getDefaultScale(); + } + else { + return typeInfo.getScale(parentType.getOid(), modifiers); + } + } + return typeInfo.getScale(oid, modifiers); } /** @@ -179,7 +250,57 @@ public boolean equals(Object obj) { @Override public String toString() { - return "PostgresType [name=" + name + ", oid=" + oid + ", jdbcId=" + jdbcId + ", defaultLength=" + getDefaultLength() - + ", defaultScale=" + getDefaultScale() + ", elementType=" + elementType + "]"; + return "PostgresType [name=" + name + ", oid=" + oid + ", jdbcId=" + jdbcId + ", modifiers=" + modifiers + ", defaultLength=" + getDefaultLength() + + ", defaultScale=" + getDefaultScale() + ", parentType=" + parentType + ", elementType=" + elementType + "]"; + } + + public static class Builder { + private final TypeRegistry typeRegistry; + private final String name; + private final int oid; + private final int jdbcId; + private final int modifiers; + private final TypeInfo typeInfo; + private int parentTypeOid; + private int elementTypeOid; + private List enumValues; + + public Builder(TypeRegistry typeRegistry, String name, int oid, int jdbcId, int modifiers, TypeInfo typeInfo) { + this.typeRegistry = typeRegistry; + this.name = name; + this.oid = oid; + this.jdbcId = jdbcId; + this.modifiers = modifiers; + this.typeInfo = typeInfo; + } + + public Builder parentType(int parentTypeOid) { + this.parentTypeOid = parentTypeOid; + return this; + } + + public Builder elementType(int elementTypeOid) { + this.elementTypeOid = elementTypeOid; + return this; + } + + public Builder enumValues(List enumValues) { + this.enumValues = enumValues; + return this; + } + + public PostgresType build() { + PostgresType parentType = null; + if (parentTypeOid != 0) { + parentType = typeRegistry.get(parentTypeOid); + } + + PostgresType elementType = null; + if (elementTypeOid != 0) { + elementType = typeRegistry.get(elementTypeOid); + } + + return new PostgresType(name, oid, jdbcId, modifiers, typeInfo, enumValues, parentType, elementType); + } } } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java index 85e4688b6b2..8d391f5c9a6 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java @@ -151,7 +151,6 @@ protected PostgresValueConverter(Charset databaseCharset, DecimalMode decimalMod @Override public SchemaBuilder schemaBuilder(Column column) { int oidValue = column.nativeType(); - switch (oidValue) { case PgOid.BIT: case PgOid.BIT_ARRAY: @@ -277,6 +276,12 @@ else if (oidValue == typeRegistry.citextArrayOid()) { else if (oidValue == typeRegistry.ltreeArrayOid()) { return SchemaBuilder.array(Ltree.builder().optional().build()); } + + final PostgresType resolvedType = typeRegistry.get(oidValue); + if (resolvedType.isEnumType()) { + return io.debezium.data.Enum.builder(Strings.join(",", resolvedType.getEnumValues())); + } + final SchemaBuilder jdbcSchemaBuilder = super.schemaBuilder(column); if (jdbcSchemaBuilder == null) { return includeUnknownDatatypes ? SchemaBuilder.bytes() : null; @@ -309,7 +314,6 @@ private SchemaBuilder hstoreSchema() { @Override public ValueConverter converter(Column column, Field fieldDefn) { int oidValue = column.nativeType(); - switch (oidValue) { case PgOid.BIT: case PgOid.VARBIT: @@ -417,6 +421,7 @@ else if (oidValue == typeRegistry.geometryArrayOid() || oidValue == typeRegistry.hstoreArrayOid()) { return createArrayConverter(column, fieldDefn); } + final ValueConverter jdbcConverter = super.converter(column, fieldDefn); if (jdbcConverter == null) { return includeUnknownDatatypes ? data -> convertBinary(column, fieldDefn, data) : null; diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/TypeRegistry.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/TypeRegistry.java index bc3bef0b205..06e6a54f9c2 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/TypeRegistry.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/TypeRegistry.java @@ -5,14 +5,28 @@ */ package io.debezium.connector.postgresql; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Types; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.Set; +import org.apache.kafka.connect.errors.ConnectException; +import org.postgresql.core.BaseConnection; import org.postgresql.core.TypeInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.debezium.connector.postgresql.connection.PostgresConnection; +import io.debezium.util.Collect; + /** * A registry of types supported by a PostgreSQL instance. Allows lookup of the types according to * type name or OID. @@ -39,6 +53,31 @@ public class TypeRegistry { public static final int NO_TYPE_MODIFIER = -1; public static final int UNKNOWN_LENGTH = -1; + // PostgreSQL driver reports user-defined Domain types as Types.DISTINCT + public static final int DOMAIN_TYPE = Types.DISTINCT; + + private static final String CATEGORY_ENUM = "E"; + + private static final String SQL_NON_ARRAY_TYPES = "SELECT t.oid AS oid, t.typname AS name, t.typbasetype AS parentoid, t.typtypmod as modifiers, t.typcategory as category " + + "FROM pg_catalog.pg_type t JOIN pg_catalog.pg_namespace n ON (t.typnamespace = n.oid) " + + "WHERE n.nspname != 'pg_toast' AND t.typcategory <> 'A'"; + + private static final String SQL_ARRAY_TYPES = "SELECT t.oid AS oid, t.typname AS name, t.typelem AS element, t.typbasetype AS parentoid, t.typtypmod as modifiers, t.typcategory as category " + + "FROM pg_catalog.pg_type t JOIN pg_catalog.pg_namespace n ON (t.typnamespace = n.oid) " + + "WHERE n.nspname != 'pg_toast' AND t.typcategory = 'A'"; + + private static final String SQL_NON_ARRAY_TYPE_NAME_LOOKUP = "SELECT t.oid as oid, t.typname AS name, t.typbasetype AS parentoid, t.typtypmod AS modifiers, t.typcategory as category " + + "FROM pg_catalog.pg_type t JOIN pg_catalog.pg_namespace n ON (t.typnamespace = n.oid) " + + "WHERE n.nspname != 'pg_toast' AND t.typcategory <> 'A' AND t.typname = ?"; + + private static final String SQL_NON_ARRAY_TYPE_OID_LOOKUP = "SELECT t.oid as oid, t.typname AS name, t.typbasetype AS parentoid, t.typtypmod AS modifiers, t.typcategory as category " + + "FROM pg_catalog.pg_type t JOIN pg_catalog.pg_namespace n ON (t.typnamespace = n.oid) " + + "WHERE n.nspname != 'pg_toast' AND t.typcategory <> 'A' AND t.oid = ?"; + + private static final String SQL_ENUM_VALUES_LOOKUP = "select t.enumlabel as enum_value " + + "FROM pg_catalog.pg_enum t " + + "WHERE t.enumtypid=? ORDER BY t.enumsortorder"; + private static final Map LONG_TYPE_NAMES = Collections.unmodifiableMap(getLongTypeNames()); private static Map getLongTypeNames() { @@ -61,126 +100,62 @@ private static Map getLongTypeNames() { return longTypeNames; } - /** - * Builder for instances of {@link TypeRegistry}. - */ - public static final class Builder { + private final Map nameToType = new HashMap<>(); + private final Map oidToType = new HashMap<>(); - private final Map nameToType = new HashMap<>(); - private final Map oidToType = new HashMap<>(); + private final PostgresConnection connection; - private int geometryOid = Integer.MIN_VALUE; - private int geographyOid = Integer.MIN_VALUE; - private int citextOid = Integer.MIN_VALUE; - private int hstoreOid = Integer.MIN_VALUE; - private int ltreeOid = Integer.MIN_VALUE; + private int geometryOid = Integer.MIN_VALUE; + private int geographyOid = Integer.MIN_VALUE; + private int citextOid = Integer.MIN_VALUE; + private int hstoreOid = Integer.MIN_VALUE; + private int ltreeOid = Integer.MIN_VALUE; - private int hstoreArrayOid = Integer.MIN_VALUE; - private int geometryArrayOid = Integer.MIN_VALUE; - private int geographyArrayOid = Integer.MIN_VALUE; - private int citextArrayOid = Integer.MIN_VALUE; - private int ltreeArrayOid = Integer.MIN_VALUE; + private int hstoreArrayOid = Integer.MIN_VALUE; + private int geometryArrayOid = Integer.MIN_VALUE; + private int geographyArrayOid = Integer.MIN_VALUE; + private int citextArrayOid = Integer.MIN_VALUE; + private int ltreeArrayOid = Integer.MIN_VALUE; - private Builder() { - } + public TypeRegistry(PostgresConnection connection) { + this.connection = connection; + prime(); + } - /** - * Add a new type - * - * @param type - * - * @return builder instance - */ - public Builder addType(PostgresType type) { - oidToType.put(type.getOid(), type); - nameToType.put(type.getName(), type); + private void addType(PostgresType type) { + oidToType.put(type.getOid(), type); + nameToType.put(type.getName(), type); - if (TYPE_NAME_GEOMETRY.equals(type.getName())) { - geometryOid = type.getOid(); - } - else if (TYPE_NAME_GEOGRAPHY.equals(type.getName())) { - geographyOid = type.getOid(); - } - else if (TYPE_NAME_CITEXT.equals(type.getName())) { - citextOid = type.getOid(); - } - else if (TYPE_NAME_HSTORE.equals(type.getName())) { - hstoreOid = type.getOid(); - } - else if (TYPE_NAME_LTREE.equals(type.getName())) { - ltreeOid = type.getOid(); - } - else if (TYPE_NAME_HSTORE_ARRAY.equals(type.getName())) { - hstoreArrayOid = type.getOid(); - } - else if (TYPE_NAME_GEOMETRY_ARRAY.equals(type.getName())) { - geometryArrayOid = type.getOid(); - } - else if (TYPE_NAME_GEOGRAPHY_ARRAY.equals(type.getName())) { - geographyArrayOid = type.getOid(); - } - else if (TYPE_NAME_CITEXT_ARRAY.equals(type.getName())) { - citextArrayOid = type.getOid(); - } - else if (TYPE_NAME_LTREE_ARRAY.equals(type.getName())) { - ltreeArrayOid = type.getOid(); - } - return this; + if (TYPE_NAME_GEOMETRY.equals(type.getName())) { + geometryOid = type.getOid(); } - - /** - * - * @param oid - PostgreSQL OID - * @return type associated with the given OID - */ - public PostgresType get(int oid) { - return oidToType.get(oid); + else if (TYPE_NAME_GEOGRAPHY.equals(type.getName())) { + geographyOid = type.getOid(); } - - /** - * @return initialized type registry - */ - public TypeRegistry build() { - return new TypeRegistry(this); + else if (TYPE_NAME_CITEXT.equals(type.getName())) { + citextOid = type.getOid(); + } + else if (TYPE_NAME_HSTORE.equals(type.getName())) { + hstoreOid = type.getOid(); + } + else if (TYPE_NAME_LTREE.equals(type.getName())) { + ltreeOid = type.getOid(); + } + else if (TYPE_NAME_HSTORE_ARRAY.equals(type.getName())) { + hstoreArrayOid = type.getOid(); + } + else if (TYPE_NAME_GEOMETRY_ARRAY.equals(type.getName())) { + geometryArrayOid = type.getOid(); + } + else if (TYPE_NAME_GEOGRAPHY_ARRAY.equals(type.getName())) { + geographyArrayOid = type.getOid(); + } + else if (TYPE_NAME_CITEXT_ARRAY.equals(type.getName())) { + citextArrayOid = type.getOid(); + } + else if (TYPE_NAME_LTREE_ARRAY.equals(type.getName())) { + ltreeArrayOid = type.getOid(); } - } - - public static Builder create(TypeInfo typeInfo) { - return new Builder(); - } - - private final Map nameToType; - private final Map oidToType; - - private final int geometryOid; - private final int geographyOid; - private final int citextOid; - private final int hstoreOid; - private final int ltreeOid; - - private final int hstoreArrayOid; - private final int geometryArrayOid; - private final int geographyArrayOid; - private final int citextArrayOid; - private final int ltreeArrayOid; - - private TypeRegistry(Builder builder) { - - this.nameToType = Collections.unmodifiableMap(builder.nameToType); - this.oidToType = Collections.unmodifiableMap(builder.oidToType); - - this.geometryOid = builder.geometryOid; - this.geographyOid = builder.geographyOid; - this.citextOid = builder.citextOid; - this.hstoreOid = builder.hstoreOid; - this.ltreeOid = builder.ltreeOid; - - this.hstoreArrayOid = builder.hstoreArrayOid; - this.geometryArrayOid = builder.geometryArrayOid; - this.geographyArrayOid = builder.geographyArrayOid; - this.citextArrayOid = builder.citextArrayOid; - this.ltreeArrayOid = builder.ltreeArrayOid; - } /** @@ -191,8 +166,11 @@ private TypeRegistry(Builder builder) { public PostgresType get(int oid) { PostgresType r = oidToType.get(oid); if (r == null) { - LOGGER.warn("Unknown OID {} requested", oid); - r = PostgresType.UNKNOWN; + r = resolveUnknownType(oid); + if (r == null) { + LOGGER.warn("Unknown OID {} requested", oid); + r = PostgresType.UNKNOWN; + } } return r; } @@ -223,8 +201,11 @@ public PostgresType get(String name) { } PostgresType r = nameToType.get(name); if (r == null) { - LOGGER.warn("Unknown type named {} requested", name); - r = PostgresType.UNKNOWN; + r = resolveUnknownType(name); + if (r == null) { + LOGGER.warn("Unknown type named {} requested", name); + r = PostgresType.UNKNOWN; + } } return r; } @@ -319,4 +300,305 @@ public int ltreeArrayOid() { public static String normalizeTypeName(String typeName) { return LONG_TYPE_NAMES.getOrDefault(typeName, typeName); } + + /** + * Prime the {@link TypeRegistry} with all existing database types + */ + private void prime() { + Connection pgConnection = null; + try { + pgConnection = connection.connection(); + + final TypeInfo typeInfo = ((BaseConnection) pgConnection).getTypeInfo(); + final SqlTypeMapper sqlTypeMapper = new SqlTypeMapper(pgConnection, typeInfo); + + try (final Statement statement = pgConnection.createStatement()) { + // Read non-array types + try (final ResultSet rs = statement.executeQuery(SQL_NON_ARRAY_TYPES)) { + final List delayResolvedBuilders = new ArrayList<>(); + while (rs.next()) { + // Coerce long to int so large unsigned values are represented as signed + // Same technique is used in TypeInfoCache + final int oid = (int) rs.getLong("oid"); + final int parentTypeOid = (int) rs.getLong("parentoid"); + final int modifiers = (int) rs.getLong("modifiers"); + String typeName = rs.getString("name"); + String category = rs.getString("category"); + + PostgresType.Builder builder = new PostgresType.Builder( + this, + typeName, + oid, + sqlTypeMapper.getSqlType(typeName), + modifiers, + typeInfo); + + if (CATEGORY_ENUM.equals(category)) { + builder = builder.enumValues(resolveEnumValues(pgConnection, oid)); + } + + // If the type does have have a base type, we can build/add immediately. + if (parentTypeOid == 0) { + addType(builder.build()); + continue; + } + + // For types with base type mappings, they need to be delayed. + builder = builder.parentType(parentTypeOid); + delayResolvedBuilders.add(builder); + } + + // Resolve delayed builders + for (PostgresType.Builder builder : delayResolvedBuilders) { + addType(builder.build()); + } + } + + // Read array types + try (final ResultSet rs = statement.executeQuery(SQL_ARRAY_TYPES)) { + final List delayResolvedBuilders = new ArrayList<>(); + while (rs.next()) { + // int2vector and oidvector will not be treated as arrays + final int oid = (int) rs.getLong("oid"); + final int parentTypeOid = (int) rs.getLong("parentoid"); + final int modifiers = (int) rs.getLong("modifiers"); + String typeName = rs.getString("name"); + + PostgresType.Builder builder = new PostgresType.Builder( + this, + typeName, + oid, + sqlTypeMapper.getSqlType(typeName), + modifiers, + typeInfo); + + builder = builder.elementType((int) rs.getLong("element")); + + // If the type doesnot have a base type, we can build/add immediately + if (parentTypeOid == 0) { + addType(builder.build()); + continue; + } + + // For types with base type mappings, they need to be delayed. + builder = builder.parentType(parentTypeOid); + delayResolvedBuilders.add(builder); + } + + // Resolve delayed builders + for (PostgresType.Builder builder : delayResolvedBuilders) { + addType(builder.build()); + } + } + } + + } + catch (SQLException e) { + if (pgConnection == null) { + throw new ConnectException("Could not create PG connection", e); + } + else { + throw new ConnectException("Could not initialize type registry", e); + } + } + } + + private PostgresType resolveUnknownType(String name) { + try { + LOGGER.trace("Type '{}' not cached, attempting to lookup from database.", name); + final Connection connection = this.connection.connection(); + final TypeInfo typeInfo = ((BaseConnection) connection).getTypeInfo(); + final SqlTypeMapper sqlTypeMapper = new SqlTypeMapper(connection, typeInfo); + + try (final PreparedStatement statement = connection.prepareStatement(SQL_NON_ARRAY_TYPE_NAME_LOOKUP)) { + statement.setString(1, name); + try (final ResultSet rs = statement.executeQuery()) { + while (rs.next()) { + final int oid = (int) rs.getLong("oid"); + final int parentTypeOid = (int) rs.getLong("parentoid"); + final int modifiers = (int) rs.getLong("modifiers"); + String typeName = rs.getString("name"); + String category = rs.getString("category"); + + PostgresType.Builder builder = new PostgresType.Builder( + this, + typeName, + oid, + sqlTypeMapper.getSqlType(typeName), + modifiers, + typeInfo); + + if (CATEGORY_ENUM.equals(category)) { + builder = builder.enumValues(resolveEnumValues(connection, oid)); + } + + PostgresType result = builder.parentType(parentTypeOid).build(); + addType(result); + + return result; + } + } + } + } + catch (SQLException e) { + throw new ConnectException("Database connection failed during resolving unknown type", e); + } + + return null; + } + + private PostgresType resolveUnknownType(int lookupOid) { + try { + LOGGER.trace("Type OID '{}' not cached, attempting to lookup from database.", lookupOid); + final Connection connection = this.connection.connection(); + final TypeInfo typeInfo = ((BaseConnection) connection).getTypeInfo(); + final SqlTypeMapper sqlTypeMapper = new SqlTypeMapper(connection, typeInfo); + + try (final PreparedStatement statement = connection.prepareStatement(SQL_NON_ARRAY_TYPE_OID_LOOKUP)) { + statement.setLong(1, lookupOid); + try (final ResultSet rs = statement.executeQuery()) { + while (rs.next()) { + final int oid = (int) rs.getLong("oid"); + final int parentTypeOid = (int) rs.getLong("parentoid"); + final int modifiers = (int) rs.getLong("modifiers"); + String typeName = rs.getString("name"); + String category = rs.getString("category"); + + PostgresType.Builder builder = new PostgresType.Builder( + this, + typeName, + oid, + sqlTypeMapper.getSqlType(typeName), + modifiers, + typeInfo); + + if (CATEGORY_ENUM.equals(category)) { + builder = builder.enumValues(resolveEnumValues(connection, oid)); + } + + PostgresType result = builder.parentType(parentTypeOid).build(); + addType(result); + + return result; + } + } + } + } + catch (SQLException e) { + throw new ConnectException("Database connection failed during resolving unknown type", e); + } + + return null; + } + + private List resolveEnumValues(Connection pgConnection, int enumOid) throws SQLException { + List enumValues = new ArrayList<>(); + try (final PreparedStatement enumStatement = pgConnection.prepareStatement(SQL_ENUM_VALUES_LOOKUP)) { + enumStatement.setInt(1, enumOid); + try (final ResultSet enumRs = enumStatement.executeQuery()) { + while (enumRs.next()) { + enumValues.add(enumRs.getString("enum_value")); + } + } + } + return enumValues.isEmpty() ? null : enumValues; + } + + /** + * Allows to obtain the SQL type corresponding to PG types. This uses a custom statement instead of going through + * {@link PgDatabaseMetaData#getTypeInfo()} as the latter causes N+1 SELECTs, making it very slow on installations + * with many custom types. + * + * @author Gunnar Morling + * @see DBZ-899 + */ + private static class SqlTypeMapper { + + /** + * Based on org.postgresql.jdbc.TypeInfoCache.getSQLType(String). To emulate the original statement's behavior + * (which works for single types only), PG's DISTINCT ON extension is used to just return the first entry should a + * type exist in multiple schemas. + */ + private static final String SQL_TYPE_DETAILS = "SELECT DISTINCT ON (typname) typname, typinput='array_in'::regproc, typtype, sp.r, pg_type.oid " + + " FROM pg_catalog.pg_type " + + " LEFT " + + " JOIN (select ns.oid as nspoid, ns.nspname, r.r " + + " from pg_namespace as ns " + // -- go with older way of unnesting array to be compatible with 8.0 + + " join ( select s.r, (current_schemas(false))[s.r] as nspname " + + " from generate_series(1, array_upper(current_schemas(false), 1)) as s(r) ) as r " + + " using ( nspname ) " + + " ) as sp " + + " ON sp.nspoid = typnamespace " + + " ORDER BY typname, sp.r, pg_type.oid;"; + + private final TypeInfo typeInfo; + private final Set preloadedSqlTypes; + private final Map sqlTypesByPgTypeNames; + + private SqlTypeMapper(Connection db, TypeInfo typeInfo) throws SQLException { + this.typeInfo = typeInfo; + this.preloadedSqlTypes = Collect.unmodifiableSet(typeInfo.getPGTypeNamesWithSQLTypes()); + this.sqlTypesByPgTypeNames = getSqlTypes(db, typeInfo); + } + + public int getSqlType(String typeName) throws SQLException { + boolean isCoreType = preloadedSqlTypes.contains(typeName); + + // obtain core types such as bool, int2 etc. from the driver, as it correctly maps these types to the JDBC + // type codes. Also those values are cached in TypeInfoCache. + if (isCoreType) { + return typeInfo.getSQLType(typeName); + } + if (typeName.endsWith("[]")) { + return Types.ARRAY; + } + // get custom type mappings from the map which was built up with a single query + else { + try { + return sqlTypesByPgTypeNames.get(typeName); + } + catch (Exception e) { + LOGGER.warn("Failed to obtain SQL type information for type {} via custom statement, falling back to TypeInfo#getSQLType()", typeName, e); + return typeInfo.getSQLType(typeName); + } + } + } + + /** + * Builds up a map of SQL (JDBC) types by PG type name; contains only values for non-core types. + */ + private static Map getSqlTypes(Connection db, TypeInfo typeInfo) throws SQLException { + Map sqlTypesByPgTypeNames = new HashMap<>(); + + try (final Statement statement = db.createStatement()) { + try (final ResultSet rs = statement.executeQuery(SQL_TYPE_DETAILS)) { + while (rs.next()) { + int type; + boolean isArray = rs.getBoolean(2); + String typtype = rs.getString(3); + if (isArray) { + type = Types.ARRAY; + } + else if ("c".equals(typtype)) { + type = Types.STRUCT; + } + else if ("d".equals(typtype)) { + type = Types.DISTINCT; + } + else if ("e".equals(typtype)) { + type = Types.VARCHAR; + } + else { + type = Types.OTHER; + } + + sqlTypesByPgTypeNames.put(rs.getString(1), type); + } + } + } + + return sqlTypesByPgTypeNames; + } + } } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/AbstractColumnValue.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/AbstractColumnValue.java index dd2e8a806a3..37461d927b6 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/AbstractColumnValue.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/AbstractColumnValue.java @@ -8,7 +8,6 @@ import java.sql.SQLException; import java.time.Instant; import java.time.LocalDate; -import java.time.LocalTime; import java.time.OffsetDateTime; import java.time.OffsetTime; import java.time.ZoneOffset; @@ -21,11 +20,15 @@ import org.postgresql.geometric.PGpath; import org.postgresql.geometric.PGpoint; import org.postgresql.geometric.PGpolygon; +import org.postgresql.jdbc.PgArray; import org.postgresql.util.PGInterval; import org.postgresql.util.PGmoney; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.PgConnectionSupplier; +import io.debezium.connector.postgresql.PostgresType; +import io.debezium.connector.postgresql.TypeRegistry; import io.debezium.connector.postgresql.connection.wal2json.DateTimeFormat; /** @@ -41,7 +44,12 @@ public LocalDate asLocalDate() { } @Override - public LocalTime asLocalTime() { + public Object asTime() { + return asString(); + } + + @Override + public Object asLocalTime() { return DateTimeFormat.get().time(asString()); } @@ -83,7 +91,7 @@ public PGcircle asCircle() { } @Override - public PGInterval asInterval() { + public Object asInterval() { try { return new PGInterval(asString()); } @@ -158,4 +166,34 @@ public PGpolygon asPolygon() { throw new ConnectException(e); } } + + @Override + public boolean isArray(PostgresType type) { + return type.isArrayType(); + } + + @Override + public Object asArray(String columnName, PostgresType type, String fullType, PgConnectionSupplier connection) { + try { + final String dataString = asString(); + return new PgArray(connection.get(), type.getOid(), dataString); + } + catch (SQLException e) { + LOGGER.warn("Unexpected exception trying to process PgArray ({}) column '{}', {}", fullType, columnName, e); + } + return null; + } + + @Override + public Object asDefault(TypeRegistry typeRegistry, int columnType, String columnName, String fullType, boolean includeUnknownDatatypes, + PgConnectionSupplier connection) { + if (includeUnknownDatatypes) { + // this includes things like PostGIS geoemetries or other custom types + // leave up to the downstream message recipient to deal with + LOGGER.debug("processing column '{}' with unknown data type '{}' as byte array", columnName, fullType); + return asString(); + } + LOGGER.debug("Unknown column type {} for column {} – ignoring", fullType, columnName); + return null; + } } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java index 7625bfaa876..f9095224e58 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java @@ -7,23 +7,16 @@ package io.debezium.connector.postgresql.connection; import java.nio.charset.Charset; -import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; -import java.sql.Types; import java.time.Duration; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; +import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; import org.apache.kafka.connect.errors.ConnectException; -import org.postgresql.core.BaseConnection; -import org.postgresql.core.TypeInfo; import org.postgresql.jdbc.PgConnection; -import org.postgresql.jdbc.PgDatabaseMetaData; import org.postgresql.replication.LogSequenceNumber; import org.postgresql.util.PSQLState; import org.slf4j.Logger; @@ -36,9 +29,11 @@ import io.debezium.connector.postgresql.spi.SlotState; import io.debezium.jdbc.JdbcConfiguration; import io.debezium.jdbc.JdbcConnection; +import io.debezium.relational.Column; +import io.debezium.relational.ColumnEditor; import io.debezium.relational.TableId; +import io.debezium.relational.Tables; import io.debezium.util.Clock; -import io.debezium.util.Collect; import io.debezium.util.Metronome; /** @@ -56,14 +51,6 @@ public class PostgresConnection extends JdbcConnection { org.postgresql.Driver.class.getName(), PostgresConnection.class.getClassLoader()); - private static final String SQL_NON_ARRAY_TYPES = "SELECT t.oid AS oid, t.typname AS name " - + "FROM pg_catalog.pg_type t JOIN pg_catalog.pg_namespace n ON (t.typnamespace = n.oid) " - + "WHERE n.nspname != 'pg_toast' AND t.typcategory <> 'A'"; - - private static final String SQL_ARRAY_TYPES = "SELECT t.oid AS oid, t.typname AS name, t.typelem AS element " - + "FROM pg_catalog.pg_type t JOIN pg_catalog.pg_namespace n ON (t.typnamespace = n.oid) " - + "WHERE n.nspname != 'pg_toast' AND t.typcategory = 'A'"; - /** * Obtaining a replication slot may fail if there's a pending transaction. We're retrying to get a slot for 30 min. */ @@ -82,21 +69,7 @@ public class PostgresConnection extends JdbcConnection { */ public PostgresConnection(Configuration config) { super(config, FACTORY, PostgresConnection::validateServerVersion, PostgresConnection::defaultSettings); - - Connection pgConnection = null; - try { - pgConnection = connection(); - typeRegistry = initTypeRegistry(pgConnection); - } - catch (SQLException e) { - if (pgConnection == null) { - throw new ConnectException("Could not create PG connection", e); - } - else { - throw new ConnectException("Could not initialize type registry", e); - } - } - + this.typeRegistry = new TypeRegistry(this); databaseCharset = determineDatabaseCharset(); } @@ -450,152 +423,66 @@ private static void validateServerVersion(Statement statement) throws SQLExcepti @Override protected int resolveNativeType(String typeName) { - return getTypeRegistry().get(typeName).getOid(); + return getTypeRegistry().get(typeName).getRootType().getOid(); } - private static TypeRegistry initTypeRegistry(Connection db) { - final TypeInfo typeInfo = ((BaseConnection) db).getTypeInfo(); - TypeRegistry.Builder typeRegistryBuilder = TypeRegistry.create(typeInfo); - - try { - SqlTypeMapper sqlTypeMapper = new SqlTypeMapper(db, typeInfo); - - try (final Statement statement = db.createStatement()) { - // Read non-array types - try (final ResultSet rs = statement.executeQuery(SQL_NON_ARRAY_TYPES)) { - while (rs.next()) { - // Coerce long to int so large unsigned values are represented as signed - // Same technique is used in TypeInfoCache - final int oid = (int) rs.getLong("oid"); - String typeName = rs.getString("name"); - typeRegistryBuilder.addType(new PostgresType( - typeName, - oid, - sqlTypeMapper.getSqlType(typeName), - typeInfo)); - } - } - - // Read array types - try (final ResultSet rs = statement.executeQuery(SQL_ARRAY_TYPES)) { - while (rs.next()) { - // int2vector and oidvector will not be treated as arrays - final int oid = (int) rs.getLong("oid"); - String typeName = rs.getString("name"); - typeRegistryBuilder.addType(new PostgresType( - typeName, - oid, - sqlTypeMapper.getSqlType(typeName), - typeInfo, typeRegistryBuilder.get((int) rs.getLong("element")))); - } - } - } - } - catch (SQLException e) { - throw new ConnectException("Database connection failed during intializiation of the type registry", e); - } - return typeRegistryBuilder.build(); - } - - public TypeRegistry getTypeRegistry() { - return typeRegistry; + @Override + protected int resolveJdbcType(int metadataJdbcType, int nativeType) { + // Special care needs to be taken for columns that use user-defined domain type data types + // where resolution of the column's JDBC type needs to be that of the root type instead of + // the actual column to properly influence schema building and value conversion. + return getTypeRegistry().get(nativeType).getRootType().getJdbcId(); } - /** - * Allows to obtain the SQL type corresponding to PG types. This uses a custom statement instead of going through - * {@link PgDatabaseMetaData#getTypeInfo()} as the latter causes N+1 SELECTs, making it very slow on installations - * with many custom types. - * - * @author Gunnar Morling - * @see DBZ-899 - */ - private static class SqlTypeMapper { - - /** - * Based on org.postgresql.jdbc.TypeInfoCache.getSQLType(String). To emulate the original statement's behavior - * (which works for single types only), PG's DISTINCT ON extension is used to just return the first entry should a - * type exist in multiple schemas. - */ - private static final String SQL_TYPE_DETAILS = "SELECT DISTINCT ON (typname) typname, typinput='array_in'::regproc, typtype, sp.r, pg_type.oid " - + " FROM pg_catalog.pg_type " - + " LEFT " - + " JOIN (select ns.oid as nspoid, ns.nspname, r.r " - + " from pg_namespace as ns " - // -- go with older way of unnesting array to be compatible with 8.0 - + " join ( select s.r, (current_schemas(false))[s.r] as nspname " - + " from generate_series(1, array_upper(current_schemas(false), 1)) as s(r) ) as r " - + " using ( nspname ) " - + " ) as sp " - + " ON sp.nspoid = typnamespace " - + " ORDER BY typname, sp.r, pg_type.oid;"; - - private final TypeInfo typeInfo; - private final Set preloadedSqlTypes; - private final Map sqlTypesByPgTypeNames; - - private SqlTypeMapper(Connection db, TypeInfo typeInfo) throws SQLException { - this.typeInfo = typeInfo; - this.preloadedSqlTypes = Collect.unmodifiableSet(typeInfo.getPGTypeNamesWithSQLTypes()); - this.sqlTypesByPgTypeNames = getSqlTypes(db, typeInfo); - } + @Override + protected Optional readTableColumn(ResultSet columnMetadata, TableId tableId, Tables.ColumnNameFilter columnFilter) throws SQLException { + final String columnName = columnMetadata.getString(4); + if (columnFilter == null || columnFilter.matches(tableId.catalog(), tableId.schema(), tableId.table(), columnName)) { + final ColumnEditor column = Column.editor().name(columnName); + column.type(columnMetadata.getString(6)); + + // first source the length/scale from the column metadata provided by the driver + // this may be overridden below if the column type is a user-defined domain type + column.length(columnMetadata.getInt(7)); + if (columnMetadata.getObject(9) != null) { + column.scale(columnMetadata.getInt(9)); + } - public int getSqlType(String typeName) throws SQLException { - boolean isCoreType = preloadedSqlTypes.contains(typeName); + column.optional(isNullable(columnMetadata.getInt(11))); + column.position(columnMetadata.getInt(17)); + column.autoIncremented("YES".equalsIgnoreCase(columnMetadata.getString(23))); - // obtain core types such as bool, int2 etc. from the driver, as it correctly maps these types to the JDBC - // type codes. Also those values are cached in TypeInfoCache. - if (isCoreType) { - return typeInfo.getSQLType(typeName); + String autogenerated = null; + try { + autogenerated = columnMetadata.getString(24); } - if (typeName.endsWith("[]")) { - return Types.ARRAY; + catch (SQLException e) { + // ignore, some drivers don't have this index - e.g. Postgres } - // get custom type mappings from the map which was built up with a single query - else { - try { - return sqlTypesByPgTypeNames.get(typeName); - } - catch (Exception e) { - LOGGER.warn("Failed to obtain SQL type information for type {} via custom statement, falling back to TypeInfo#getSQLType()", typeName, e); - return typeInfo.getSQLType(typeName); - } + column.generated("YES".equalsIgnoreCase(autogenerated)); + + // Lookup the column type from the TypeRegistry + // For all types, we need to set the Native and Jdbc types by using the root-type + final PostgresType nativeType = getTypeRegistry().get(column.typeName()); + column.nativeType(nativeType.getRootType().getOid()); + column.jdbcType(nativeType.getRootType().getJdbcId()); + + // For domain types, the postgres driver is unable to traverse a nested unbounded + // hierarchy of types and report the right length/scale of a given type. We use + // the TypeRegistry to accomplish this since it is capable of traversing the type + // hierarchy upward to resolve length/scale regardless of hierarchy depth. + if (TypeRegistry.DOMAIN_TYPE == nativeType.getJdbcId()) { + column.length(nativeType.getDefaultLength()); + column.scale(nativeType.getDefaultScale()); } - } - /** - * Builds up a map of SQL (JDBC) types by PG type name; contains only values for non-core types. - */ - private static Map getSqlTypes(Connection db, TypeInfo typeInfo) throws SQLException { - Map sqlTypesByPgTypeNames = new HashMap<>(); - - try (final Statement statement = db.createStatement()) { - try (final ResultSet rs = statement.executeQuery(SQL_TYPE_DETAILS)) { - while (rs.next()) { - int type; - boolean isArray = rs.getBoolean(2); - String typtype = rs.getString(3); - if (isArray) { - type = Types.ARRAY; - } - else if ("c".equals(typtype)) { - type = Types.STRUCT; - } - else if ("d".equals(typtype)) { - type = Types.DISTINCT; - } - else if ("e".equals(typtype)) { - type = Types.VARCHAR; - } - else { - type = Types.OTHER; - } + return Optional.of(column); + } - sqlTypesByPgTypeNames.put(rs.getString(1), type); - } - } - } + return Optional.empty(); + } - return sqlTypesByPgTypeNames; - } + public TypeRegistry getTypeRegistry() { + return typeRegistry; } } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/ReplicationMessage.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/ReplicationMessage.java index af524f965e8..e3b0619882a 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/ReplicationMessage.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/ReplicationMessage.java @@ -8,7 +8,6 @@ import java.time.Instant; import java.time.LocalDate; -import java.time.LocalTime; import java.time.OffsetDateTime; import java.time.OffsetTime; import java.util.List; @@ -16,16 +15,14 @@ import org.postgresql.geometric.PGbox; import org.postgresql.geometric.PGcircle; import org.postgresql.geometric.PGline; -import org.postgresql.geometric.PGlseg; import org.postgresql.geometric.PGpath; import org.postgresql.geometric.PGpoint; import org.postgresql.geometric.PGpolygon; -import org.postgresql.util.PGInterval; import org.postgresql.util.PGmoney; import io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.PgConnectionSupplier; import io.debezium.connector.postgresql.PostgresType; -import io.debezium.data.SpecialValueDecimal; +import io.debezium.connector.postgresql.TypeRegistry; /** * An abstract representation of a replication message that is sent by a PostgreSQL logical decoding plugin and @@ -93,7 +90,7 @@ public interface ColumnValue { Double asDouble(); - SpecialValueDecimal asDecimal(); + Object asDecimal(); LocalDate asLocalDate(); @@ -101,7 +98,9 @@ public interface ColumnValue { Instant asInstant(); - LocalTime asLocalTime(); + Object asTime(); + + Object asLocalTime(); OffsetTime asOffsetTimeUtc(); @@ -111,11 +110,11 @@ public interface ColumnValue { PGcircle asCircle(); - PGInterval asInterval(); + Object asInterval(); PGline asLine(); - PGlseg asLseg(); + Object asLseg(); PGmoney asMoney(); @@ -124,6 +123,12 @@ public interface ColumnValue { PGpoint asPoint(); PGpolygon asPolygon(); + + boolean isArray(PostgresType type); + + Object asArray(String columnName, PostgresType type, String fullType, PgConnectionSupplier connection); + + Object asDefault(TypeRegistry typeRegistry, int columnType, String columnName, String fullType, boolean includeUnknownDatatypes, PgConnectionSupplier connection); } /** diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/ReplicationMessageColumnValueResolver.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/ReplicationMessageColumnValueResolver.java index 1c4c760b3fb..b0018fdbd28 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/ReplicationMessageColumnValueResolver.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/ReplicationMessageColumnValueResolver.java @@ -5,14 +5,13 @@ */ package io.debezium.connector.postgresql.connection; -import java.sql.SQLException; - -import org.postgresql.jdbc.PgArray; +import org.postgresql.util.PGmoney; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.PgConnectionSupplier; import io.debezium.connector.postgresql.PostgresType; +import io.debezium.connector.postgresql.TypeRegistry; import io.debezium.connector.postgresql.connection.ReplicationMessage.ColumnValue; /** @@ -31,24 +30,22 @@ public class ReplicationMessageColumnValueResolver { * @param value the column value * @param connection a postgres connection supplier * @param includeUnknownDatatypes true to include unknown data types, false otherwise + * @param typeRegistry the postgres type registry * @return */ public static Object resolveValue(String columnName, PostgresType type, String fullType, ColumnValue value, final PgConnectionSupplier connection, - boolean includeUnknownDatatypes) { + boolean includeUnknownDatatypes, TypeRegistry typeRegistry) { if (value.isNull()) { // nulls are null return null; } - if (type.isArrayType()) { - try { - final String dataString = value.asString(); - return new PgArray(connection.get(), type.getOid(), dataString); - } - catch (SQLException e) { - LOGGER.warn("Unexpected exception trying to process PgArray ({}) column '{}', {}", fullType, columnName, e); - } - return null; + if (!type.isRootType()) { + return resolveValue(columnName, type.getParentType(), fullType, value, connection, includeUnknownDatatypes, typeRegistry); + } + + if (value.isArray(type)) { + return value.asArray(columnName, type, fullType, connection); } switch (type.getName()) { @@ -110,7 +107,7 @@ public static Object resolveValue(String columnName, PostgresType type, String f return value.asInstant(); case "time": - return value.asString(); + return value.asTime(); case "time without time zone": return value.asLocalTime(); @@ -136,7 +133,8 @@ public static Object resolveValue(String columnName, PostgresType type, String f case "lseg": return value.asLseg(); case "money": - return value.asMoney().val; + final Object v = value.asMoney(); + return (v instanceof PGmoney) ? ((PGmoney) v).val : v; case "path": return value.asPath(); case "point": @@ -181,14 +179,6 @@ public static Object resolveValue(String columnName, PostgresType type, String f break; } - if (includeUnknownDatatypes) { - // this includes things like PostGIS geometries or other custom types. - // leave up to the downstream message recipient to deal with. - LOGGER.debug("processing column '{}' with unknown data type '{}' as byte array", columnName, - fullType); - return value.asString(); - } - LOGGER.debug("Unknown column type {} for column {} – ignoring", fullType, columnName); - return null; + return value.asDefault(typeRegistry, type.getOid(), columnName, fullType, includeUnknownDatatypes, connection); } } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgoutput/PgOutputMessageDecoder.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgoutput/PgOutputMessageDecoder.java index 3d3f9a29ac8..6fa3c74fc18 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgoutput/PgOutputMessageDecoder.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgoutput/PgOutputMessageDecoder.java @@ -113,6 +113,7 @@ public boolean shouldMessageBeSkipped(ByteBuffer buffer, Long lastReceivedLsn, L LOGGER.trace("Message Type: {}", type); switch (type) { case TRUNCATE: + // @formatter:off // For now we plan to gracefully skip TRUNCATE messages. // We may decide in the future that these may be emitted differently, see DBZ-1052. // @@ -127,6 +128,7 @@ public boolean shouldMessageBeSkipped(ByteBuffer buffer, Long lastReceivedLsn, L // It seems the protocol guarantees to send the most up-to-date `R` relation // messages for the tables prior to the `T` truncation message, even if in the // same session a `R` message was followed by an insert/update/delete message. + // @formatter:on case COMMIT: // For now skip these message types so that the LSN associated with the message won't // be flushed back to PostgreSQL. There is a potential LSN assignment concern with @@ -477,8 +479,8 @@ private Table resolveRelationFromMetadata(PgOutputRelationMetaData metadata) { for (ColumnMetaData columnMetadata : metadata.getColumns()) { ColumnEditor editor = io.debezium.relational.Column.editor() .name(columnMetadata.getColumnName()) - .jdbcType(columnMetadata.getPostgresType().getJdbcId()) - .nativeType(columnMetadata.getPostgresType().getOid()) + .jdbcType(columnMetadata.getPostgresType().getRootType().getJdbcId()) + .nativeType(columnMetadata.getPostgresType().getRootType().getOid()) .optional(columnMetadata.isOptional()) .type(columnMetadata.getPostgresType().getName(), columnMetadata.getTypeName()) .length(columnMetadata.getLength()) @@ -563,7 +565,8 @@ private static List resolveColumnsFromStreamTupleData(ByteBuffer buffer, new AbstractReplicationMessageColumn(columnName, columnType, typeExpression, optional, true) { @Override public Object getValue(PgConnectionSupplier connection, boolean includeUnknownDatatypes) { - return PgOutputReplicationMessage.getValue(columnName, columnType, typeExpression, valueStr, connection, includeUnknownDatatypes); + return PgOutputReplicationMessage.getValue(columnName, columnType, typeExpression, valueStr, connection, includeUnknownDatatypes, + typeRegistry); } @Override diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgoutput/PgOutputReplicationMessage.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgoutput/PgOutputReplicationMessage.java index fc3b89f5cae..71dd2924c79 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgoutput/PgOutputReplicationMessage.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgoutput/PgOutputReplicationMessage.java @@ -10,6 +10,7 @@ import io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.PgConnectionSupplier; import io.debezium.connector.postgresql.PostgresType; +import io.debezium.connector.postgresql.TypeRegistry; import io.debezium.connector.postgresql.connection.ReplicationMessage; import io.debezium.connector.postgresql.connection.ReplicationMessageColumnValueResolver; @@ -90,8 +91,8 @@ public boolean shouldSchemaBeSynchronized() { * @return the value; may be null */ public static Object getValue(String columnName, PostgresType type, String fullType, String rawValue, final PgConnectionSupplier connection, - boolean includeUnknownDataTypes) { + boolean includeUnknownDataTypes, TypeRegistry typeRegistry) { final PgOutputColumnValue columnValue = new PgOutputColumnValue(rawValue); - return ReplicationMessageColumnValueResolver.resolveValue(columnName, type, fullType, columnValue, connection, includeUnknownDataTypes); + return ReplicationMessageColumnValueResolver.resolveValue(columnName, type, fullType, columnValue, connection, includeUnknownDataTypes, typeRegistry); } } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgproto/PgProtoColumnValue.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgproto/PgProtoColumnValue.java new file mode 100644 index 00000000000..7a6b2a63b67 --- /dev/null +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgproto/PgProtoColumnValue.java @@ -0,0 +1,339 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.postgresql.connection.pgproto; + +import java.math.BigDecimal; +import java.nio.charset.Charset; +import java.sql.SQLException; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDate; +import java.time.OffsetDateTime; +import java.time.OffsetTime; +import java.time.ZoneOffset; +import java.time.temporal.ChronoUnit; +import java.util.Arrays; + +import org.postgresql.geometric.PGpoint; +import org.postgresql.jdbc.PgArray; +import org.postgresql.util.PGmoney; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.debezium.connector.postgresql.PgOid; +import io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.PgConnectionSupplier; +import io.debezium.connector.postgresql.PostgresType; +import io.debezium.connector.postgresql.PostgresValueConverter; +import io.debezium.connector.postgresql.TypeRegistry; +import io.debezium.connector.postgresql.connection.AbstractColumnValue; +import io.debezium.connector.postgresql.connection.wal2json.DateTimeFormat; +import io.debezium.connector.postgresql.proto.PgProto; +import io.debezium.data.SpecialValueDecimal; +import io.debezium.time.Conversions; + +/** + * Replication message column sent by Postgres Decoderbufs + * + * @author Chris Cranford + */ +public class PgProtoColumnValue extends AbstractColumnValue { + + private static final Logger LOGGER = LoggerFactory.getLogger(PgProtoColumnValue.class); + + private PgProto.DatumMessage value; + + public PgProtoColumnValue(PgProto.DatumMessage value) { + this.value = value; + } + + @Override + public PgProto.DatumMessage getRawValue() { + return value; + } + + @Override + public boolean isNull() { + return value.hasDatumMissing(); + } + + @Override + public String asString() { + if (value.hasDatumString()) { + return value.getDatumString(); + } + else if (value.hasDatumBytes()) { + return new String(asByteArray(), Charset.forName("UTF-8")); + } + return null; + } + + @Override + public Boolean asBoolean() { + if (value.hasDatumBool()) { + return value.getDatumBool(); + } + + final String s = asString(); + if (s != null) { + if (s.equalsIgnoreCase("t")) { + return Boolean.TRUE; + } + else if (s.equalsIgnoreCase("f")) { + return Boolean.FALSE; + } + } + return null; + } + + @Override + public Integer asInteger() { + if (value.hasDatumInt32()) { + return value.getDatumInt32(); + } + + final String s = asString(); + return s != null ? Integer.valueOf(s) : null; + } + + @Override + public Long asLong() { + if (value.hasDatumInt64()) { + return value.getDatumInt64(); + } + + final String s = asString(); + return s != null ? Long.valueOf(s) : null; + } + + @Override + public Float asFloat() { + if (value.hasDatumFloat()) { + return value.getDatumFloat(); + } + + final String s = asString(); + return s != null ? Float.valueOf(s) : null; + } + + @Override + public Double asDouble() { + if (value.hasDatumDouble()) { + return value.getDatumDouble(); + } + + final String s = asString(); + return s != null ? Double.valueOf(s) : null; + } + + @Override + public Object asDecimal() { + if (value.hasDatumDouble()) { + return value.getDatumDouble(); + } + + final String s = asString(); + if (s != null) { + return PostgresValueConverter.toSpecialValue(s).orElseGet(() -> new SpecialValueDecimal(new BigDecimal(s))); + } + return null; + } + + @Override + public byte[] asByteArray() { + return value.hasDatumBytes() ? value.getDatumBytes().toByteArray() : null; + } + + @Override + public LocalDate asLocalDate() { + if (value.hasDatumInt32()) { + return LocalDate.ofEpochDay((long) value.getDatumInt32()); + } + + final String s = asString(); + return s != null ? DateTimeFormat.get().date(s) : null; + } + + @Override + public Object asTime() { + if (value.hasDatumInt64()) { + return Duration.of(value.getDatumInt64(), ChronoUnit.MICROS); + } + + final String s = asString(); + if (s != null) { + return DateTimeFormat.get().time(s); + } + return null; + } + + @Override + public OffsetTime asOffsetTimeUtc() { + if (value.hasDatumDouble()) { + return Conversions.toInstantFromMicros((long) value.getDatumDouble()).atOffset(ZoneOffset.UTC).toOffsetTime(); + } + + final String s = asString(); + return s != null ? DateTimeFormat.get().timeWithTimeZone(s) : null; + } + + @Override + public OffsetDateTime asOffsetDateTimeAtUtc() { + if (value.hasDatumInt64()) { + return Conversions.toInstantFromMicros(value.getDatumInt64()).atOffset(ZoneOffset.UTC); + } + + final String s = asString(); + return s != null ? DateTimeFormat.get().timestampWithTimeZoneToOffsetDateTime(s).withOffsetSameInstant(ZoneOffset.UTC) : null; + } + + @Override + public Instant asInstant() { + if (value.hasDatumInt64()) { + return Conversions.toInstantFromMicros(value.getDatumInt64()); + } + + final String s = asString(); + return s != null ? DateTimeFormat.get().timestampToInstant(asString()) : null; + } + + @Override + public Object asLocalTime() { + return asTime(); + } + + @Override + public Object asInterval() { + if (value.hasDatumDouble()) { + return value.getDatumDouble(); + } + + final String s = asString(); + return s != null ? super.asInterval() : null; + } + + @Override + public PGmoney asMoney() { + if (value.hasDatumInt64()) { + return new PGmoney(value.getDatumInt64() / 100.0); + } + return super.asMoney(); + } + + @Override + public PGpoint asPoint() { + if (value.hasDatumPoint()) { + PgProto.Point datumPoint = datumPoint = value.getDatumPoint(); + return new PGpoint(datumPoint.getX(), datumPoint.getY()); + } + else if (value.hasDatumBytes()) { + return super.asPoint(); + } + return null; + } + + @Override + public boolean isArray(PostgresType type) { + final int oidValue = type.getOid(); + switch (oidValue) { + case PgOid.INT2_ARRAY: + case PgOid.INT4_ARRAY: + case PgOid.INT8_ARRAY: + case PgOid.TEXT_ARRAY: + case PgOid.NUMERIC_ARRAY: + case PgOid.FLOAT4_ARRAY: + case PgOid.FLOAT8_ARRAY: + case PgOid.BOOL_ARRAY: + case PgOid.DATE_ARRAY: + case PgOid.TIME_ARRAY: + case PgOid.TIMETZ_ARRAY: + case PgOid.TIMESTAMP_ARRAY: + case PgOid.TIMESTAMPTZ_ARRAY: + case PgOid.BYTEA_ARRAY: + case PgOid.VARCHAR_ARRAY: + case PgOid.OID_ARRAY: + case PgOid.BPCHAR_ARRAY: + case PgOid.MONEY_ARRAY: + case PgOid.NAME_ARRAY: + case PgOid.INTERVAL_ARRAY: + case PgOid.CHAR_ARRAY: + case PgOid.VARBIT_ARRAY: + case PgOid.UUID_ARRAY: + case PgOid.XML_ARRAY: + case PgOid.POINT_ARRAY: + case PgOid.JSONB_ARRAY: + case PgOid.JSON_ARRAY: + case PgOid.REF_CURSOR_ARRAY: + case PgOid.INET_ARRAY: + case PgOid.CIDR_ARRAY: + case PgOid.MACADDR_ARRAY: + case PgOid.MACADDR8_ARRAY: + case PgOid.TSRANGE_ARRAY: + case PgOid.TSTZRANGE_ARRAY: + case PgOid.DATERANGE_ARRAY: + case PgOid.INT4RANGE_ARRAY: + case PgOid.NUM_RANGE_ARRAY: + case PgOid.INT8RANGE_ARRAY: + return true; + default: + return false; + } + } + + @Override + public Object asArray(String columnName, PostgresType type, String fullType, PgConnectionSupplier connection) { + // Currently the logical decoding plugin sends unhandled types as a byte array containing the string + // representation (in Postgres) of the array value. + // The approach to decode this is sub-optimal but the only way to improve this is to update the plugin. + // Reasons for it being sub-optimal include: + // 1. It requires a Postgres JDBC connection to deserialize + // 2. The byte-array is a serialised string but we make the assumption its UTF-8 encoded (which it will + // be in most cases) + // 3. For larger arrays and especially 64-bit integers and the like it is less efficient sending string + // representations over the wire. + try { + byte[] data = asByteArray(); + if (data == null) { + return null; + } + String dataString = new String(data, Charset.forName("UTF-8")); + PgArray arrayData = new PgArray(connection.get(), (int) value.getColumnType(), dataString); + Object deserializedArray = arrayData.getArray(); + return Arrays.asList((Object[]) deserializedArray); + } + catch (SQLException e) { + LOGGER.warn("Unexpected exception trying to process PgArray column '{}'", value.getColumnName(), e); + } + return null; + } + + @Override + public Object asDefault(TypeRegistry typeRegistry, int columnType, String columnName, String fullType, boolean includeUnknownDatatypes, + PgConnectionSupplier connection) { + final PostgresType type = typeRegistry.get(columnType); + if (type.getOid() == typeRegistry.geometryOid() || type.getOid() == typeRegistry.geographyOid() || type.getOid() == typeRegistry.citextOid()) { + return asByteArray(); + } + if (type.getOid() == typeRegistry.hstoreOid()) { + return asByteArray(); + } + if (type.getOid() == typeRegistry.geometryArrayOid() || + type.getOid() == typeRegistry.geographyArrayOid() || + type.getOid() == typeRegistry.citextArrayOid() || + type.getOid() == typeRegistry.hstoreArrayOid()) { + return asArray(columnName, type, fullType, connection); + } + if (type.isEnumType()) { + return asString(); + } + // unknown data type is sent by decoder as binary value + if (includeUnknownDatatypes) { + return asByteArray(); + } + + return null; + } + +} diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgproto/PgProtoReplicationMessage.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgproto/PgProtoReplicationMessage.java index 9d74d3ae649..d176b4fbcfd 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgproto/PgProtoReplicationMessage.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgproto/PgProtoReplicationMessage.java @@ -6,38 +6,23 @@ package io.debezium.connector.postgresql.connection.pgproto; -import java.math.BigDecimal; -import java.nio.charset.Charset; -import java.sql.SQLException; -import java.time.Duration; import java.time.Instant; -import java.time.ZoneOffset; -import java.time.temporal.ChronoUnit; -import java.util.Arrays; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.IntStream; -import org.apache.kafka.connect.data.Field; -import org.apache.kafka.connect.errors.ConnectException; -import org.postgresql.geometric.PGpoint; -import org.postgresql.jdbc.PgArray; -import org.postgresql.util.PGInterval; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.debezium.connector.postgresql.PgOid; import io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.PgConnectionSupplier; import io.debezium.connector.postgresql.PostgresType; -import io.debezium.connector.postgresql.PostgresValueConverter; import io.debezium.connector.postgresql.TypeRegistry; import io.debezium.connector.postgresql.UnchangedToastedReplicationMessageColumn; import io.debezium.connector.postgresql.connection.AbstractReplicationMessageColumn; import io.debezium.connector.postgresql.connection.ReplicationMessage; +import io.debezium.connector.postgresql.connection.ReplicationMessageColumnValueResolver; import io.debezium.connector.postgresql.proto.PgProto; -import io.debezium.data.SpecialValueDecimal; -import io.debezium.time.Conversions; import io.debezium.util.Strings; /** @@ -113,12 +98,14 @@ private List transform(List mes return new UnchangedToastedReplicationMessageColumn(columnName, type, typeInfo.map(PgProto.TypeInfo::getModifier).orElse(null), typeInfo.map(PgProto.TypeInfo::getValueOptional).orElse(Boolean.FALSE), hasTypeMetadata()); } - return new AbstractReplicationMessageColumn(columnName, type, typeInfo.map(PgProto.TypeInfo::getModifier).orElse(null), + + final String fullType = typeInfo.map(PgProto.TypeInfo::getModifier).orElse(null); + return new AbstractReplicationMessageColumn(columnName, type, fullType, typeInfo.map(PgProto.TypeInfo::getValueOptional).orElse(Boolean.FALSE), hasTypeMetadata()) { @Override public Object getValue(PgConnectionSupplier connection, boolean includeUnknownDatatypes) { - return PgProtoReplicationMessage.this.getValue(datum, connection, includeUnknownDatatypes); + return PgProtoReplicationMessage.this.getValue(columnName, type, fullType, datum, connection, includeUnknownDatatypes); } @Override @@ -135,203 +122,9 @@ public boolean isLastEventForLsn() { return true; } - /** - * Converts the Protobuf value for a {@link io.debezium.connector.postgresql.proto.PgProto.DatumMessage plugin message} to - * a Java value based on the type of the column from the message. This value will be converted later on if necessary by the - * {@link PostgresValueConverter#converter(Column, Field)} instance to match whatever the Connect schema type expects. - * - * Note that the logic here is tightly coupled (i.e. dependent) on the Postgres plugin logic which writes the actual - * Protobuf messages. - * - * @param a supplier to get a connection to Postgres instance for array handling - * @return the value; may be null - */ - public Object getValue(PgProto.DatumMessage datumMessage, PgConnectionSupplier connection, boolean includeUnknownDatatypes) { - if (datumMessage.hasDatumMissing()) { - return UnchangedToastedReplicationMessageColumn.UNCHANGED_TOAST_VALUE; - } - - int columnType = (int) datumMessage.getColumnType(); - switch (columnType) { - case PgOid.BOOL: - return datumMessage.hasDatumBool() ? datumMessage.getDatumBool() : null; - case PgOid.INT2: - case PgOid.INT4: - return datumMessage.hasDatumInt32() ? datumMessage.getDatumInt32() : null; - case PgOid.INT8: - case PgOid.OID: - case PgOid.MONEY: - return datumMessage.hasDatumInt64() ? datumMessage.getDatumInt64() : null; - case PgOid.FLOAT4: - return datumMessage.hasDatumFloat() ? datumMessage.getDatumFloat() : null; - case PgOid.FLOAT8: - return datumMessage.hasDatumDouble() ? datumMessage.getDatumDouble() : null; - case PgOid.NUMERIC: - if (datumMessage.hasDatumDouble()) { - // For backwards compatibility only to enable independent upgrade of Postgres plug-in - return datumMessage.getDatumDouble(); - } - else if (datumMessage.hasDatumString()) { - final String s = datumMessage.getDatumString(); - return PostgresValueConverter.toSpecialValue(s).orElseGet(() -> new SpecialValueDecimal(new BigDecimal(s))); - } - return null; - case PgOid.CHAR: - case PgOid.VARCHAR: - case PgOid.BPCHAR: - case PgOid.TEXT: - case PgOid.JSON: - case PgOid.JSONB_OID: - case PgOid.XML: - case PgOid.UUID: - case PgOid.BIT: - case PgOid.VARBIT: - case PgOid.INET_OID: - case PgOid.CIDR_OID: - case PgOid.MACADDR_OID: - case PgOid.MACADDR8_OID: - return datumMessage.hasDatumString() ? datumMessage.getDatumString() : null; - case PgOid.DATE: - return datumMessage.hasDatumInt32() ? (long) datumMessage.getDatumInt32() : null; - case PgOid.TIMESTAMP: - if (!datumMessage.hasDatumInt64()) { - return null; - } - // these types are sent by the plugin as LONG - microseconds since Unix Epoch - return Conversions.toInstantFromMicros(datumMessage.getDatumInt64()); - case PgOid.TIMESTAMPTZ: - if (!datumMessage.hasDatumInt64()) { - return null; - } - // these types are sent by the plugin as LONG - microseconds since Unix Epoch - return Conversions.toInstantFromMicros(datumMessage.getDatumInt64()).atOffset(ZoneOffset.UTC); - case PgOid.TIME: - if (!datumMessage.hasDatumInt64()) { - return null; - } - - // these types are sent by the plugin as LONG - microseconds since Unix Epoch - return Duration.of(datumMessage.getDatumInt64(), ChronoUnit.MICROS); - case PgOid.TIMETZ: - if (!datumMessage.hasDatumDouble()) { - return null; - } - // the value is sent as a double microseconds - return Conversions.toInstantFromMicros((long) datumMessage.getDatumDouble()) - .atOffset(ZoneOffset.UTC) - .toOffsetTime(); - case PgOid.INTERVAL: - // these are sent as doubles by the plugin since their storage is larger than 8 bytes - try { - return datumMessage.hasDatumDouble() ? datumMessage.getDatumDouble() - : datumMessage.hasDatumString() ? new PGInterval(datumMessage.getDatumString()) : null; - } - catch (SQLException e) { - throw new ConnectException("Could not convert interval value"); - } - // the plugin will send back a TZ formatted string - case PgOid.BYTEA: - return datumMessage.hasDatumBytes() ? datumMessage.getDatumBytes().toByteArray() : null; - case PgOid.POINT: { - PgProto.Point datumPoint = datumMessage.getDatumPoint(); - return new PGpoint(datumPoint.getX(), datumPoint.getY()); - } - case PgOid.TSRANGE_OID: - case PgOid.TSTZRANGE_OID: - case PgOid.DATERANGE_OID: - case PgOid.INT4RANGE_OID: - case PgOid.NUM_RANGE_OID: - case PgOid.INT8RANGE_OID: - return datumMessage.hasDatumBytes() ? new String(datumMessage.getDatumBytes().toByteArray(), Charset.forName("UTF-8")) : null; - case PgOid.INT2_ARRAY: - case PgOid.INT4_ARRAY: - case PgOid.INT8_ARRAY: - case PgOid.TEXT_ARRAY: - case PgOid.NUMERIC_ARRAY: - case PgOid.FLOAT4_ARRAY: - case PgOid.FLOAT8_ARRAY: - case PgOid.BOOL_ARRAY: - case PgOid.DATE_ARRAY: - case PgOid.TIME_ARRAY: - case PgOid.TIMETZ_ARRAY: - case PgOid.TIMESTAMP_ARRAY: - case PgOid.TIMESTAMPTZ_ARRAY: - case PgOid.BYTEA_ARRAY: - case PgOid.VARCHAR_ARRAY: - case PgOid.OID_ARRAY: - case PgOid.BPCHAR_ARRAY: - case PgOid.MONEY_ARRAY: - case PgOid.NAME_ARRAY: - case PgOid.INTERVAL_ARRAY: - case PgOid.CHAR_ARRAY: - case PgOid.VARBIT_ARRAY: - case PgOid.UUID_ARRAY: - case PgOid.XML_ARRAY: - case PgOid.POINT_ARRAY: - case PgOid.JSONB_ARRAY: - case PgOid.JSON_ARRAY: - case PgOid.REF_CURSOR_ARRAY: - case PgOid.INET_ARRAY: - case PgOid.CIDR_ARRAY: - case PgOid.MACADDR_ARRAY: - case PgOid.MACADDR8_ARRAY: - case PgOid.TSRANGE_ARRAY: - case PgOid.TSTZRANGE_ARRAY: - case PgOid.DATERANGE_ARRAY: - case PgOid.INT4RANGE_ARRAY: - case PgOid.NUM_RANGE_ARRAY: - case PgOid.INT8RANGE_ARRAY: - return getArray(datumMessage, connection, columnType); - - case PgOid.UNSPECIFIED: - return null; - - default: - PostgresType type = typeRegistry.get(columnType); - if (type.getOid() == typeRegistry.geometryOid() || type.getOid() == typeRegistry.geographyOid() || type.getOid() == typeRegistry.citextOid()) { - return datumMessage.getDatumBytes().toByteArray(); - } - if (type.getOid() == typeRegistry.hstoreOid()) { - return datumMessage.getDatumBytes().toByteArray(); - } - if (type.getOid() == typeRegistry.geometryArrayOid() || - type.getOid() == typeRegistry.geographyArrayOid() || - type.getOid() == typeRegistry.citextArrayOid() || - type.getOid() == typeRegistry.hstoreArrayOid()) { - return getArray(datumMessage, connection, columnType); - } - // unknown data type is sent by decoder as binary value - if (includeUnknownDatatypes && datumMessage.hasDatumBytes()) { - return datumMessage.getDatumBytes().toByteArray(); - } - - return null; - } - } - - private Object getArray(PgProto.DatumMessage datumMessage, PgConnectionSupplier connection, int columnType) { - // Currently the logical decoding plugin sends unhandled types as a byte array containing the string - // representation (in Postgres) of the array value. - // The approach to decode this is sub-optimal but the only way to improve this is to update the plugin. - // Reasons for it being sub-optimal include: - // 1. It requires a Postgres JDBC connection to deserialize - // 2. The byte-array is a serialised string but we make the assumption its UTF-8 encoded (which it will - // be in most cases) - // 3. For larger arrays and especially 64-bit integers and the like it is less efficient sending string - // representations over the wire. - try { - byte[] data = datumMessage.hasDatumBytes() ? datumMessage.getDatumBytes().toByteArray() : null; - if (data == null) { - return null; - } - String dataString = new String(data, Charset.forName("UTF-8")); - PgArray arrayData = new PgArray(connection.get(), columnType, dataString); - Object deserializedArray = arrayData.getArray(); - return Arrays.asList((Object[]) deserializedArray); - } - catch (SQLException e) { - LOGGER.warn("Unexpected exception trying to process PgArray column '{}'", datumMessage.getColumnName(), e); - } - return null; + public Object getValue(String columnName, PostgresType type, String fullType, PgProto.DatumMessage datumMessage, final PgConnectionSupplier connection, + boolean includeUnknownDatatypes) { + final PgProtoColumnValue columnValue = new PgProtoColumnValue(datumMessage); + return ReplicationMessageColumnValueResolver.resolveValue(columnName, type, fullType, columnValue, connection, includeUnknownDatatypes, typeRegistry); } } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/Wal2JsonColumnValue.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/Wal2JsonColumnValue.java index 293b4aaf9b2..268bc0ad15b 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/Wal2JsonColumnValue.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/Wal2JsonColumnValue.java @@ -40,17 +40,41 @@ public String asString() { @Override public Boolean asBoolean() { - return value.asBoolean(); + if (value.isBoolean()) { + return value.asBoolean(); + } + else if (value.isString()) { + return "t".equalsIgnoreCase(value.asString()); + } + else { + return null; + } } @Override public Integer asInteger() { - return value.asInteger(); + if (value.isNumber()) { + return value.asInteger(); + } + else if (value.isString()) { + return Integer.valueOf(value.asString()); + } + else { + return null; + } } @Override public Long asLong() { - return value.asLong(); + if (value.isNumber()) { + return value.asLong(); + } + else if (value.isString()) { + return Long.valueOf(value.asString()); + } + else { + return null; + } } @Override diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/Wal2JsonReplicationMessage.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/Wal2JsonReplicationMessage.java index b348a8616ef..a6d8e7213ed 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/Wal2JsonReplicationMessage.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/Wal2JsonReplicationMessage.java @@ -168,7 +168,7 @@ private String parseType(String columnName, String typeWithModifiers) { public Object getValue(String columnName, PostgresType type, String fullType, Value rawValue, final PgConnectionSupplier connection, boolean includeUnknownDatatypes) { final Wal2JsonColumnValue columnValue = new Wal2JsonColumnValue(rawValue); - return ReplicationMessageColumnValueResolver.resolveValue(columnName, type, fullType, columnValue, connection, includeUnknownDatatypes); + return ReplicationMessageColumnValueResolver.resolveValue(columnName, type, fullType, columnValue, connection, includeUnknownDatatypes, typeRegistry); } @Override diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/AbstractRecordsProducerTest.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/AbstractRecordsProducerTest.java index ea0aff5bc88..a2658c021d9 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/AbstractRecordsProducerTest.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/AbstractRecordsProducerTest.java @@ -6,6 +6,7 @@ package io.debezium.connector.postgresql; +import static io.debezium.connector.postgresql.TestHelper.PK_FIELD; import static org.fest.assertions.Assertions.assertThat; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; @@ -60,6 +61,7 @@ import io.debezium.data.Bits; import io.debezium.data.Json; import io.debezium.data.SchemaUtil; +import io.debezium.data.SpecialValueDecimal; import io.debezium.data.Uuid; import io.debezium.data.VariableScaleDecimal; import io.debezium.data.VerifyRecord; @@ -68,6 +70,7 @@ import io.debezium.data.geometry.Geometry; import io.debezium.data.geometry.Point; import io.debezium.embedded.AbstractConnectorTest; +import io.debezium.jdbc.JdbcValueConverters.DecimalMode; import io.debezium.junit.TestLogger; import io.debezium.relational.TableId; import io.debezium.time.Date; @@ -775,6 +778,84 @@ protected List schemasAndValuesForCustomTypes() { } + protected List schemasAndValuesForDomainAliasTypes(boolean streaming) { + final ByteBuffer boxByteBuffer = ByteBuffer.wrap("(1.0,1.0),(0.0,0.0)".getBytes()); + final ByteBuffer circleByteBuffer = ByteBuffer.wrap("<(10.0,4.0),10.0>".getBytes()); + final ByteBuffer lineByteBuffer = ByteBuffer.wrap("{-1.0,0.0,0.0}".getBytes()); + final ByteBuffer lsegByteBuffer = ByteBuffer.wrap("[(0.0,0.0),(0.0,1.0)]".getBytes()); + final ByteBuffer pathByteBuffer = ByteBuffer.wrap("((0.0,0.0),(0.0,1.0),(0.0,2.0))".getBytes()); + final ByteBuffer polygonByteBuffer = ByteBuffer.wrap("((0.0,0.0),(0.0,1.0),(1.0,0.0),(0.0,0.0))".getBytes()); + + return Arrays.asList( + new SchemaAndValueField(PK_FIELD, SchemaBuilder.INT32_SCHEMA, 1), + new SchemaAndValueField("bit_base", Bits.builder(3).build(), new byte[]{ 5, 0 }), + new SchemaAndValueField("bit_alias", Bits.builder(3).build(), new byte[]{ 5, 0 }), + new SchemaAndValueField("smallint_base", SchemaBuilder.INT16_SCHEMA, (short) 1), + new SchemaAndValueField("smallint_alias", SchemaBuilder.INT16_SCHEMA, (short) 1), + new SchemaAndValueField("integer_base", SchemaBuilder.INT32_SCHEMA, 1), + new SchemaAndValueField("integer_alias", SchemaBuilder.INT32_SCHEMA, 1), + new SchemaAndValueField("bigint_base", SchemaBuilder.INT64_SCHEMA, 1000L), + new SchemaAndValueField("bigint_alias", SchemaBuilder.INT64_SCHEMA, 1000L), + new SchemaAndValueField("real_base", SchemaBuilder.FLOAT32_SCHEMA, 3.14f), + new SchemaAndValueField("real_alias", SchemaBuilder.FLOAT32_SCHEMA, 3.14f), + new SchemaAndValueField("float8_base", SchemaBuilder.FLOAT64_SCHEMA, 3.14), + new SchemaAndValueField("float8_alias", SchemaBuilder.FLOAT64_SCHEMA, 3.14), + new SchemaAndValueField("numeric_base", SpecialValueDecimal.builder(DecimalMode.DOUBLE, 4, 2).build(), 1234.12), + new SchemaAndValueField("numeric_alias", SpecialValueDecimal.builder(DecimalMode.DOUBLE, 4, 2).build(), 1234.12), + new SchemaAndValueField("bool_base", SchemaBuilder.BOOLEAN_SCHEMA, true), + new SchemaAndValueField("bool_alias", SchemaBuilder.BOOLEAN_SCHEMA, true), + new SchemaAndValueField("string_base", SchemaBuilder.STRING_SCHEMA, "hello"), + new SchemaAndValueField("string_alias", SchemaBuilder.STRING_SCHEMA, "hello"), + new SchemaAndValueField("date_base", Date.builder().build(), Date.toEpochDay(LocalDate.parse("2019-10-02"), null)), + new SchemaAndValueField("date_alias", Date.builder().build(), Date.toEpochDay(LocalDate.parse("2019-10-02"), null)), + new SchemaAndValueField("time_base", MicroTime.builder().build(), LocalTime.parse("01:02:03").toNanoOfDay() / 1_000), + new SchemaAndValueField("time_alias", MicroTime.builder().build(), LocalTime.parse("01:02:03").toNanoOfDay() / 1_000), + new SchemaAndValueField("timetz_base", ZonedTime.builder().build(), "01:02:03.123789Z"), + new SchemaAndValueField("timetz_alias", ZonedTime.builder().build(), "01:02:03.123789Z"), + new SchemaAndValueField("timestamp_base", MicroTimestamp.builder().build(), asEpochMicros("2019-10-02T01:02:03.123456")), + new SchemaAndValueField("timestamp_alias", MicroTimestamp.builder().build(), asEpochMicros("2019-10-02T01:02:03.123456")), + new SchemaAndValueField("timestamptz_base", ZonedTimestamp.builder().build(), "2019-10-02T11:51:30.123456Z"), + new SchemaAndValueField("timestamptz_alias", ZonedTimestamp.builder().build(), "2019-10-02T11:51:30.123456Z"), + new SchemaAndValueField("timewottz_base", MicroTime.builder().build(), LocalTime.parse("01:02:03").toNanoOfDay() / 1_000), + new SchemaAndValueField("timewottz_alias", MicroTime.builder().build(), LocalTime.parse("01:02:03").toNanoOfDay() / 1_000), + new SchemaAndValueField("interval_base", MicroDuration.builder().build(), + MicroDuration.durationMicros(1, 2, 3, 4, 5, 6, MicroDuration.DAYS_PER_MONTH_AVG)), + new SchemaAndValueField("interval_alias", MicroDuration.builder().build(), + MicroDuration.durationMicros(1, 2, 3, 4, 5, 6, MicroDuration.DAYS_PER_MONTH_AVG)), + new SchemaAndValueField("box_base", SchemaBuilder.BYTES_SCHEMA, boxByteBuffer), + new SchemaAndValueField("box_alias", SchemaBuilder.BYTES_SCHEMA, boxByteBuffer), + new SchemaAndValueField("circle_base", SchemaBuilder.BYTES_SCHEMA, circleByteBuffer), + new SchemaAndValueField("circle_alias", SchemaBuilder.BYTES_SCHEMA, circleByteBuffer), + new SchemaAndValueField("line_base", SchemaBuilder.BYTES_SCHEMA, lineByteBuffer), + new SchemaAndValueField("line_alias", SchemaBuilder.BYTES_SCHEMA, lineByteBuffer), + new SchemaAndValueField("lseg_base", SchemaBuilder.BYTES_SCHEMA, lsegByteBuffer), + new SchemaAndValueField("lseg_alias", SchemaBuilder.BYTES_SCHEMA, lsegByteBuffer), + new SchemaAndValueField("path_base", SchemaBuilder.BYTES_SCHEMA, pathByteBuffer), + new SchemaAndValueField("path_alias", SchemaBuilder.BYTES_SCHEMA, pathByteBuffer), + new SchemaAndValueField("point_base", Point.builder().build(), Point.createValue(Point.builder().build(), 1, 1)), + new SchemaAndValueField("point_alias", Point.builder().build(), Point.createValue(Point.builder().build(), 1, 1)), + new SchemaAndValueField("polygon_base", SchemaBuilder.BYTES_SCHEMA, polygonByteBuffer), + new SchemaAndValueField("polygon_alias", SchemaBuilder.BYTES_SCHEMA, polygonByteBuffer), + new SchemaAndValueField("char_base", SchemaBuilder.STRING_SCHEMA, "a"), + new SchemaAndValueField("char_alias", SchemaBuilder.STRING_SCHEMA, "a"), + new SchemaAndValueField("text_base", SchemaBuilder.STRING_SCHEMA, "Hello World"), + new SchemaAndValueField("text_alias", SchemaBuilder.STRING_SCHEMA, "Hello World"), + new SchemaAndValueField("json_base", Json.builder().build(), "{\"key\": \"value\"}"), + new SchemaAndValueField("json_alias", Json.builder().build(), "{\"key\": \"value\"}"), + new SchemaAndValueField("xml_base", Xml.builder().build(), "Hello"), + new SchemaAndValueField("xml_alias", Xml.builder().build(), "Hello"), + new SchemaAndValueField("uuid_base", Uuid.builder().build(), "40e6215d-b5c6-4896-987c-f30f3678f608"), + new SchemaAndValueField("uuid_alias", Uuid.builder().build(), "40e6215d-b5c6-4896-987c-f30f3678f608"), + new SchemaAndValueField("varbit_base", Bits.builder(3).build(), new byte[]{ 5, 0 }), + new SchemaAndValueField("varbit_alias", Bits.builder(3).build(), new byte[]{ 5, 0 }), + new SchemaAndValueField("inet_base", SchemaBuilder.STRING_SCHEMA, "192.168.0.1"), + new SchemaAndValueField("inet_alias", SchemaBuilder.STRING_SCHEMA, "192.168.0.1"), + new SchemaAndValueField("cidr_base", SchemaBuilder.STRING_SCHEMA, "192.168.0.0/24"), + new SchemaAndValueField("cidr_alias", SchemaBuilder.STRING_SCHEMA, "192.168.0.0/24"), + new SchemaAndValueField("macaddr_base", SchemaBuilder.STRING_SCHEMA, "08:00:2b:01:02:03"), + new SchemaAndValueField("macaddr_alias", SchemaBuilder.STRING_SCHEMA, "08:00:2b:01:02:03")); + } + protected List schemasAndValuesForTable(String insertTableStatement) { switch (insertTableStatement) { case INSERT_NUMERIC_TYPES_STMT: diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsSnapshotProducerIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsSnapshotProducerIT.java index f42c3e9dd4a..dac4e0851de 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsSnapshotProducerIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsSnapshotProducerIT.java @@ -13,7 +13,12 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import java.math.BigDecimal; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -22,7 +27,9 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import org.fest.assertions.Assertions; @@ -35,6 +42,8 @@ import io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotMode; import io.debezium.connector.postgresql.junit.SkipTestDependingOnDatabaseVersionRule; import io.debezium.connector.postgresql.junit.SkipWhenDatabaseVersionLessThan; +import io.debezium.data.Bits; +import io.debezium.data.Enum; import io.debezium.data.Envelope; import io.debezium.data.VerifyRecord; import io.debezium.doc.FixFor; @@ -56,6 +65,7 @@ public class RecordsSnapshotProducerIT extends AbstractRecordsProducerTest { @Before public void before() throws Exception { + TestHelper.dropDefaultReplicationSlot(); TestHelper.dropAllSchemas(); TestHelper.executeDDL("init_postgis.ddl"); TestHelper.executeDDL("postgres_create_tables.ddl"); @@ -480,6 +490,278 @@ public void shouldNotSnapshotMaterializedViews() throws Exception { consumer.process(record -> assertReadRecord(record, expectedValueByTopicName)); } + @Test + @FixFor("DBZ-1413") + public void shouldSnapshotDomainTypeWithPropagatedSourceTypeAttributes() throws Exception { + TestHelper.dropAllSchemas(); + TestHelper.execute("CREATE DOMAIN float83 AS numeric(8,3) DEFAULT 0.0;"); + TestHelper.execute("CREATE DOMAIN money2 AS MONEY DEFAULT 0.0;"); + TestHelper.execute("CREATE TABLE alias_table (pk SERIAL, salary money, salary2 money2, a numeric(8,3), area float83, PRIMARY KEY(pk));"); + TestHelper.execute("INSERT INTO alias_table (salary, salary2, a, area) values (7.25, 8.25, 12345.123, 12345.123);"); + + buildNoStreamProducer(TestHelper.defaultConfig() + .with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.DOUBLE) + .with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true) + .with("column.propagate.source.type", "public.alias_table.*")); + + final TestConsumer consumer = testConsumer(1, "public"); + consumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS); + + List expected = Arrays.asList( + new SchemaAndValueField("salary", Decimal.builder(2).optional() + .parameter(TestHelper.TYPE_NAME_PARAMETER_KEY, "MONEY") + .parameter(TestHelper.TYPE_LENGTH_PARAMETER_KEY, String.valueOf(Integer.MAX_VALUE)) + .parameter(TestHelper.TYPE_SCALE_PARAMETER_KEY, "0") + .build(), BigDecimal.valueOf(7.25)), + new SchemaAndValueField("salary2", Decimal.builder(2).optional() + .parameter(TestHelper.TYPE_NAME_PARAMETER_KEY, "MONEY2") + .parameter(TestHelper.TYPE_LENGTH_PARAMETER_KEY, String.valueOf(Integer.MAX_VALUE)) + .parameter(TestHelper.TYPE_SCALE_PARAMETER_KEY, "0") + .build(), BigDecimal.valueOf(8.25)), + new SchemaAndValueField("a", SchemaBuilder.float64().optional() + .parameter(TestHelper.TYPE_NAME_PARAMETER_KEY, "NUMERIC") + .parameter(TestHelper.TYPE_LENGTH_PARAMETER_KEY, "8") + .parameter(TestHelper.TYPE_SCALE_PARAMETER_KEY, "3") + .build(), 12345.123), + new SchemaAndValueField("area", SchemaBuilder.float64().optional() + .parameter(TestHelper.TYPE_NAME_PARAMETER_KEY, "FLOAT83") + .parameter(TestHelper.TYPE_LENGTH_PARAMETER_KEY, "8") + .parameter(TestHelper.TYPE_SCALE_PARAMETER_KEY, "3") + .build(), 12345.123)); + + consumer.process(record -> assertReadRecord(record, Collect.hashMapOf("public.alias_table", expected))); + } + + @Test + @FixFor("DBZ-1413") + public void shouldSnapshotDomainAliasWithProperModifiers() throws Exception { + TestHelper.dropAllSchemas(); + TestHelper.execute("CREATE DOMAIN varbit2 AS varbit(3);"); + TestHelper.execute("CREATE TABLE alias_table (pk SERIAL, value varbit2 NOT NULL, PRIMARY KEY(pk));"); + TestHelper.execute("INSERT INTO alias_table (value) values (B'101');"); + + buildNoStreamProducer(TestHelper.defaultConfig() + .with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.DOUBLE) + .with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true) + .with("column.propagate.source.type", "public.alias_table.value")); + + final TestConsumer consumer = testConsumer(1, "public"); + consumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS); + + List expected = Collections.singletonList( + new SchemaAndValueField("value", Bits.builder(3) + .parameter(TestHelper.TYPE_NAME_PARAMETER_KEY, "VARBIT2") + .parameter(TestHelper.TYPE_LENGTH_PARAMETER_KEY, "3") + .parameter(TestHelper.TYPE_SCALE_PARAMETER_KEY, "0") + .build(), new byte[]{ 5, 0 })); + + consumer.process(record -> assertReadRecord(record, Collect.hashMapOf("public.alias_table", expected))); + } + + @Test + @FixFor("DBZ-1413") + public void shouldSnapshotDomainTypesLikeBaseTypes() throws Exception { + TestHelper.dropAllSchemas(); + + // Construct domain types + // note: skipped macaddr8 as that is only supported on PG10+ but was manually tested + TestHelper.execute("CREATE DOMAIN bit2 AS BIT(3);"); + TestHelper.execute("CREATE DOMAIN smallint2 AS smallint;"); + TestHelper.execute("CREATE DOMAIN integer2 as integer;"); + TestHelper.execute("CREATE DOMAIN bigint2 as bigint;"); + TestHelper.execute("CREATE DOMAIN real2 as real;"); + TestHelper.execute("CREATE DOMAIN bool2 AS BOOL DEFAULT false;"); + TestHelper.execute("CREATE DOMAIN float82 as float8;"); + TestHelper.execute("CREATE DOMAIN numeric2 as numeric(6,2);"); + TestHelper.execute("CREATE DOMAIN string2 AS varchar(25) DEFAULT NULL;"); + TestHelper.execute("CREATE DOMAIN date2 AS date;"); + TestHelper.execute("CREATE DOMAIN time2 as time;"); + TestHelper.execute("CREATE DOMAIN timetz2 as timetz;"); + TestHelper.execute("CREATE DOMAIN timestamp2 as timestamp;"); + TestHelper.execute("CREATE DOMAIN timestamptz2 AS timestamptz;"); + TestHelper.execute("CREATE DOMAIN timewotz2 as time without time zone;"); + TestHelper.execute("CREATE DOMAIN box2 as box;"); + TestHelper.execute("CREATE DOMAIN circle2 as circle;"); + TestHelper.execute("CREATE DOMAIN interval2 as interval;"); + TestHelper.execute("CREATE DOMAIN line2 as line;"); + TestHelper.execute("CREATE DOMAIN lseg2 as lseg;"); + TestHelper.execute("CREATE DOMAIN path2 as path;"); + TestHelper.execute("CREATE DOMAIN point2 as point;"); + TestHelper.execute("CREATE DOMAIN polygon2 as polygon;"); + TestHelper.execute("CREATE DOMAIN char2 as char;"); + TestHelper.execute("CREATE DOMAIN text2 as text;"); + TestHelper.execute("CREATE DOMAIN json2 as json;"); + TestHelper.execute("CREATE DOMAIN xml2 as xml;"); + TestHelper.execute("CREATE DOMAIN uuid2 as uuid;"); + TestHelper.execute("CREATE DOMAIN varbit2 as varbit(3);"); + TestHelper.execute("CREATE DOMAIN inet2 as inet;"); + TestHelper.execute("CREATE DOMAIN cidr2 as cidr;"); + TestHelper.execute("CREATE DOMAIN macaddr2 as macaddr;"); + + // Create table + TestHelper.execute("CREATE TABLE alias_table (pk SERIAL" + + ", bit_base bit(3) NOT NULL, bit_alias bit2 NOT NULL" + + ", smallint_base smallint NOT NULL, smallint_alias smallint2 NOT NULL" + + ", integer_base integer NOT NULL, integer_alias integer2 NOT NULL" + + ", bigint_base bigint NOT NULL, bigint_alias bigint2 NOT NULL" + + ", real_base real NOT NULL, real_alias real2 NOT NULL" + + ", float8_base float8 NOT NULL, float8_alias float82 NOT NULL" + + ", numeric_base numeric(6,2) NOT NULL, numeric_alias numeric2 NOT NULL" + + ", bool_base bool NOT NULL, bool_alias bool2 NOT NULL" + + ", string_base varchar(25) NOT NULL, string_alias string2 NOT NULL" + + ", date_base date NOT NULL, date_alias date2 NOT NULL" + + ", time_base time NOT NULL, time_alias time2 NOT NULL" + + ", timetz_base timetz NOT NULL, timetz_alias timetz2 NOT NULL" + + ", timestamp_base timestamp NOT NULL, timestamp_alias timestamp2 NOT NULL" + + ", timestamptz_base timestamptz NOT NULL, timestamptz_alias timestamptz2 NOT NULL" + + ", timewottz_base time without time zone NOT NULL, timewottz_alias timewotz2 NOT NULL" + + ", box_base box NOT NULL, box_alias box2 NOT NULL" + + ", circle_base circle NOT NULL, circle_alias circle2 NOT NULL" + + ", interval_base interval NOT NULL, interval_alias interval2 NOT NULL" + + ", line_base line NOT NULL, line_alias line2 NOT NULL" + + ", lseg_base lseg NOT NULL, lseg_alias lseg2 NOT NULL" + + ", path_base path NOT NULL, path_alias path2 NOT NULL" + + ", point_base point NOT NULL, point_alias point2 NOT NULL" + + ", polygon_base polygon NOT NULL, polygon_alias polygon2 NOT NULL" + + ", char_base char NOT NULL, char_alias char2 NOT NULL" + + ", text_base text NOT NULL, text_alias text2 NOT NULL" + + ", json_base json NOT NULL, json_alias json2 NOT NULL" + + ", xml_base xml NOT NULL, xml_alias xml2 NOT NULL" + + ", uuid_base UUID NOT NULL, uuid_alias uuid2 NOT NULL" + + ", varbit_base varbit(3) NOT NULL, varbit_alias varbit2 NOT NULL" + + ", inet_base inet NOT NULL, inet_alias inet2 NOT NULL" + + ", cidr_base cidr NOT NULL, cidr_alias cidr2 NOT NULL" + + ", macaddr_base macaddr NOT NULL, macaddr_alias macaddr2 NOT NULL" + + ", PRIMARY KEY(pk));"); + + // Insert the one row we want to snapshot + TestHelper.execute("INSERT INTO alias_table (" + + "bit_base, bit_alias, " + + "smallint_base, smallint_alias, " + + "integer_base, integer_alias, " + + "bigint_base, bigint_alias, " + + "real_base, real_alias, " + + "float8_base, float8_alias, " + + "numeric_base, numeric_alias, " + + "bool_base, bool_alias, " + + "string_base, string_alias, " + + "date_base, date_alias, " + + "time_base, time_alias, " + + "timetz_base, timetz_alias, " + + "timestamp_base, timestamp_alias, " + + "timestamptz_base, timestamptz_alias, " + + "timewottz_base, timewottz_alias, " + + "box_base, box_alias, " + + "circle_base, circle_alias, " + + "interval_base, interval_alias, " + + "line_base, line_alias, " + + "lseg_base, lseg_alias, " + + "path_base, path_alias, " + + "point_base, point_alias, " + + "polygon_base, polygon_alias, " + + "char_base, char_alias, " + + "text_base, text_alias, " + + "json_base, json_alias, " + + "xml_base, xml_alias, " + + "uuid_base, uuid_alias, " + + "varbit_base, varbit_alias, " + + "inet_base, inet_alias, " + + "cidr_base, cidr_alias, " + + "macaddr_base, macaddr_alias " + + ") VALUES (" + + "B'101', B'101', " + + "1, 1, " + + "1, 1, " + + "1000, 1000, " + + "3.14, 3.14, " + + "3.14, 3.14, " + + "1234.12, 1234.12, " + + "true, true, " + + "'hello', 'hello', " + + "'2019-10-02', '2019-10-02', " + + "'01:02:03', '01:02:03', " + + "'01:02:03.123789Z', '01:02:03.123789Z', " + + "'2019-10-02T01:02:03.123456', '2019-10-02T01:02:03.123456', " + + "'2019-10-02T13:51:30.123456+02:00'::TIMESTAMPTZ, '2019-10-02T13:51:30.123456+02:00'::TIMESTAMPTZ, " + + "'01:02:03', '01:02:03', " + + "'(0,0),(1,1)', '(0,0),(1,1)', " + + "'10,4,10', '10,4,10', " + + "'1 year 2 months 3 days 4 hours 5 minutes 6 seconds', '1 year 2 months 3 days 4 hours 5 minutes 6 seconds', " + + "'(0,0),(0,1)', '(0,0),(0,1)', " + + "'((0,0),(0,1))', '((0,0),(0,1))', " + + "'((0,0),(0,1),(0,2))', '((0,0),(0,1),(0,2))', " + + "'(1,1)', '(1,1)', " + + "'((0,0),(0,1),(1,0),(0,0))', '((0,0),(0,1),(1,0),(0,0))', " + + "'a', 'a', " + + "'Hello World', 'Hello World', " + + "'{\"key\": \"value\"}', '{\"key\": \"value\"}', " + + "XML('Hello'), XML('Hello'), " + + "'40e6215d-b5c6-4896-987c-f30f3678f608', '40e6215d-b5c6-4896-987c-f30f3678f608', " + + "B'101', B'101', " + + "'192.168.0.1', '192.168.0.1', " + + "'192.168/24', '192.168/24', " + + "'08:00:2b:01:02:03', '08:00:2b:01:02:03' " + + ");"); + + buildNoStreamProducer(TestHelper.defaultConfig() + .with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.DOUBLE) + .with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true) + .with(PostgresConnectorConfig.TABLE_WHITELIST, "public.alias_table")); + + final TestConsumer consumer = testConsumer(1, "public"); + consumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS); + + final List expected = schemasAndValuesForDomainAliasTypes(false); + consumer.process(record -> assertReadRecord(record, Collect.hashMapOf("public.alias_table", expected))); + } + + @FixFor("DBZ-1413") + public void shouldSnapshotNestedDomainAliasTypeModifiersNotPropagated() throws Exception { + TestHelper.execute("CREATE DOMAIN varbit2 AS varbit(3);"); + TestHelper.execute("CREATE DOMAIN varbit2b AS varbit2;"); + TestHelper.execute("CREATE TABLE alias_table (pk SERIAL, value varbit2b NOT NULL, PRIMARY KEY (pk));"); + TestHelper.execute("INSERT INTO alias_table (value) values (B'101');"); + + buildNoStreamProducer(TestHelper.defaultConfig() + .with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.DOUBLE) + .with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true)); + + final TestConsumer consumer = testConsumer(1, "public"); + consumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS); + + List expected = Collections.singletonList( + new SchemaAndValueField("value", Bits.builder(3).build(), new byte[]{ 5, 0 })); + + consumer.process(record -> assertReadRecord(record, Collect.hashMapOf("public.alias_table", expected))); + } + + @Test + @FixFor("DBZ-920") + public void shouldSnapshotEnumAsKnownType() throws Exception { + TestHelper.execute("CREATE TYPE test_type AS ENUM ('V1', 'V2');"); + TestHelper.execute("CREATE TABLE enum_table (pk SERIAL, value test_type NOT NULL, primary key(pk));"); + TestHelper.execute("INSERT INTO enum_table (value) values ('V1');"); + + // Specifically enable `column.propagate.source.type` here to validate later that the actual + // type, length, and scale values are resolved correctly when paired with Enum types. + buildNoStreamProducer(TestHelper.defaultConfig() + .with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true) + .with(PostgresConnectorConfig.TABLE_WHITELIST, "public.enum_table") + .with("column.propagate.source.type", "public.enum_table.value")); + + final TestConsumer consumer = testConsumer(1, "public"); + consumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS); + + List expected = Collections.singletonList( + new SchemaAndValueField("value", Enum.builder("V1,V2") + .parameter(TestHelper.TYPE_NAME_PARAMETER_KEY, "TEST_TYPE") + .parameter(TestHelper.TYPE_LENGTH_PARAMETER_KEY, String.valueOf(Integer.MAX_VALUE)) + .parameter(TestHelper.TYPE_SCALE_PARAMETER_KEY, "0") + .build(), "V1")); + + consumer.process(record -> assertReadRecord(record, Collect.hashMapOf("public.enum_table", expected))); + } + private void buildNoStreamProducer(Configuration.Builder config) { start(PostgresConnector.class, config .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL_ONLY) @@ -497,4 +779,9 @@ private void buildWithStreamProducer(Configuration.Builder config) { .build()); assertConnectorIsRunning(); } + + private long asEpochMicros(String timestamp) { + Instant instant = LocalDateTime.parse(timestamp).atOffset(ZoneOffset.UTC).toInstant(); + return instant.getEpochSecond() * 1_000_000 + instant.getNano() / 1_000; + } } diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java index ad3c80ab3b0..e7bbef7662f 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java @@ -18,6 +18,8 @@ import java.math.BigDecimal; import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -29,6 +31,7 @@ import org.apache.commons.lang3.RandomStringUtils; import org.apache.kafka.connect.data.Decimal; +import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; @@ -48,15 +51,20 @@ import io.debezium.connector.postgresql.junit.SkipTestDependingOnDecoderPluginNameRule; import io.debezium.connector.postgresql.junit.SkipWhenDecoderPluginNameIs; import io.debezium.connector.postgresql.junit.SkipWhenDecoderPluginNameIsNot; +import io.debezium.data.Bits; +import io.debezium.data.Enum; import io.debezium.data.Envelope; +import io.debezium.data.SpecialValueDecimal; import io.debezium.data.VariableScaleDecimal; import io.debezium.data.VerifyRecord; import io.debezium.doc.FixFor; import io.debezium.heartbeat.Heartbeat; +import io.debezium.jdbc.JdbcValueConverters.DecimalMode; import io.debezium.jdbc.TemporalPrecisionMode; import io.debezium.junit.ConditionalFail; import io.debezium.junit.ShouldFailWhen; import io.debezium.junit.logging.LogInterceptor; +import io.debezium.relational.RelationalDatabaseConnectorConfig.DecimalHandlingMode; import io.debezium.relational.Table; import io.debezium.relational.TableId; import io.debezium.util.Stopwatch; @@ -842,7 +850,7 @@ record = consumer.remove(); public void shouldReceiveNumericTypeAsDouble() throws Exception { TestHelper.executeDDL("postgres_create_tables.ddl"); - startConnector(config -> config.with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, PostgresConnectorConfig.DecimalHandlingMode.DOUBLE)); + startConnector(config -> config.with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.DOUBLE)); assertInsert(INSERT_NUMERIC_DECIMAL_TYPES_STMT, 1, schemasAndValuesForDoubleEncodedNumericTypes()); } @@ -852,7 +860,7 @@ public void shouldReceiveNumericTypeAsDouble() throws Exception { public void shouldReceiveNumericTypeAsString() throws Exception { TestHelper.executeDDL("postgres_create_tables.ddl"); - startConnector(config -> config.with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, PostgresConnectorConfig.DecimalHandlingMode.STRING)); + startConnector(config -> config.with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.STRING)); assertInsert(INSERT_NUMERIC_DECIMAL_TYPES_STMT, 1, schemasAndValuesForStringEncodedNumericTypes()); } @@ -1458,6 +1466,340 @@ public void shouldGracefullySkipTruncateMessages() throws Exception { assertTrue(consumer.isEmpty()); } + @Test + @FixFor("DBZ-1413") + public void shouldStreamChangesForDataTypeAlias() throws Exception { + TestHelper.execute("CREATE DOMAIN money2 AS money DEFAULT 0.0;"); + TestHelper.execute("CREATE TABLE alias_table (pk SERIAL, data VARCHAR(50), salary money, salary2 money2, PRIMARY KEY(pk));"); + + startConnector(config -> config + .with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.DOUBLE) + .with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL) + .with(PostgresConnectorConfig.TABLE_WHITELIST, "public.alias_table"), + false); + + waitForStreamingToStart(); + + consumer = testConsumer(1); + executeAndWait("INSERT INTO alias_table (data, salary, salary2) values ('hello', 7.25, 8.25);"); + + SourceRecord rec = assertRecordInserted("public.alias_table", PK_FIELD, 1); + assertSourceInfo(rec, "postgres", "public", "alias_table"); + + List expected = Arrays.asList( + new SchemaAndValueField("pk", SchemaBuilder.INT32_SCHEMA, 1), + new SchemaAndValueField("data", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "hello"), + new SchemaAndValueField("salary", Decimal.builder(2).optional().build(), new BigDecimal(7.25)), + new SchemaAndValueField("salary2", Decimal.builder(2).optional().build(), new BigDecimal(8.25))); + + assertRecordSchemaAndValues(expected, rec, Envelope.FieldName.AFTER); + assertThat(consumer.isEmpty()).isTrue(); + } + + @Test + @FixFor("DBZ-1413") + public void shouldStreamChangesForDomainAliasAlterTable() throws Exception { + TestHelper.execute("CREATE TABLE alias_table (pk SERIAL, data VARCHAR(50), salary money, PRIMARY KEY(pk));"); + startConnector(config -> config + .with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.DOUBLE) + .with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER) + .with(PostgresConnectorConfig.TABLE_WHITELIST, "public.alias_table") + .with("column.propagate.source.type", "public.alias_table.salary3"), + false); + + waitForStreamingToStart(); + + // Now that streaming has started, alter the table schema + TestHelper.execute("CREATE DOMAIN money2 AS money DEFAULT 0.0;"); + TestHelper.execute("CREATE DOMAIN money3 AS numeric(8,3) DEFAULT 0.0;"); + TestHelper.execute("ALTER TABLE alias_table ADD COLUMN salary2 money2 NOT NULL;"); + TestHelper.execute("ALTER TABLE alias_table ADD COLUMN salary3 money3 NOT NULL;"); + + consumer = testConsumer(1); + executeAndWait("INSERT INTO alias_table (data, salary, salary2, salary3) values ('hello', 7.25, 8.25, 123.456);"); + + SourceRecord rec = assertRecordInserted("public.alias_table", PK_FIELD, 1); + assertSourceInfo(rec, "postgres", "public", "alias_table"); + + List expected = Arrays.asList( + new SchemaAndValueField("pk", SchemaBuilder.INT32_SCHEMA, 1), + new SchemaAndValueField("data", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "hello"), + new SchemaAndValueField("salary", Decimal.builder(2).optional().build(), new BigDecimal(7.25)), + new SchemaAndValueField("salary2", Decimal.builder(2).build(), new BigDecimal(8.25)), + new SchemaAndValueField("salary3", SchemaBuilder.float64() + .parameter(TestHelper.TYPE_NAME_PARAMETER_KEY, "MONEY3") + .parameter(TestHelper.TYPE_LENGTH_PARAMETER_KEY, "8") + .parameter(TestHelper.TYPE_SCALE_PARAMETER_KEY, "3") + .build(), 123.456)); + + assertRecordSchemaAndValues(expected, rec, Envelope.FieldName.AFTER); + assertThat(consumer.isEmpty()).isTrue(); + } + + @Test + @FixFor("DBZ-1413") + public void shouldStreamDomainAliasWithProperModifiers() throws Exception { + TestHelper.execute("CREATE TABLE alias_table (pk SERIAL, PRIMARY KEY(pk));"); + startConnector(config -> config + .with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.DOUBLE) + .with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER) + .with(PostgresConnectorConfig.TABLE_WHITELIST, "public.alias_table"), + false); + + waitForStreamingToStart(); + + TestHelper.execute("CREATE DOMAIN varbit2 AS varbit(3);"); + TestHelper.execute("ALTER TABLE public.alias_table ADD COLUMN value varbit2 NOT NULL;"); + + consumer = testConsumer(1); + executeAndWait("INSERT INTO public.alias_table (value) VALUES (B'101');"); + + SourceRecord rec = assertRecordInserted("public.alias_table", PK_FIELD, 1); + assertSourceInfo(rec, "postgres", "public", "alias_table"); + + List expected = Arrays.asList( + new SchemaAndValueField(PK_FIELD, SchemaBuilder.INT32_SCHEMA, 1), + new SchemaAndValueField("value", Bits.builder(3).build(), new byte[]{ 5, 0 })); + + assertRecordSchemaAndValues(expected, rec, Envelope.FieldName.AFTER); + assertThat(consumer.isEmpty()).isTrue(); + } + + @Test + @FixFor("DBZ-1413") + public void shouldStreamValuesForDomainTypeOfDomainType() throws Exception { + TestHelper.execute("CREATE DOMAIN numeric82 as numeric(8,2);"); + TestHelper.execute("CREATE DOMAIN numericex as numeric82;"); + TestHelper.execute("CREATE TABLE alias_table (pk SERIAL, value numericex, PRIMARY KEY (pk));"); + startConnector(config -> config + .with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.DOUBLE) + .with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER) + .with(PostgresConnectorConfig.TABLE_WHITELIST, "public.alias_table") + .with("column.propagate.source.type", "public.alias_table.value"), false); + + waitForStreamingToStart(); + + consumer = testConsumer(1); + executeAndWait("INSERT INTO alias_table (value) values (123.45);"); + + SourceRecord rec = assertRecordInserted("public.alias_table", PK_FIELD, 1); + assertSourceInfo(rec, "postgres", "public", "alias_table"); + + List expected = Arrays.asList( + new SchemaAndValueField(PK_FIELD, SchemaBuilder.INT32_SCHEMA, 1), + new SchemaAndValueField("value", SpecialValueDecimal.builder(DecimalMode.DOUBLE, 8, 2) + .optional() + .parameter(TestHelper.TYPE_NAME_PARAMETER_KEY, "NUMERICEX") + .parameter(TestHelper.TYPE_LENGTH_PARAMETER_KEY, "8") + .parameter(TestHelper.TYPE_SCALE_PARAMETER_KEY, "2") + .build(), 123.45)); + + assertRecordSchemaAndValues(expected, rec, Envelope.FieldName.AFTER); + assertThat(consumer.isEmpty()).isTrue(); + } + + @Test + @FixFor("DBZ-1413") + public void shouldStreamValuesForAliasLikeBaseTypes() throws Exception { + TestHelper.execute("CREATE TABLE alias_table (pk SERIAL, PRIMARY KEY (pk));"); + startConnector(config -> config + .with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.DOUBLE) + .with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER) + .with(PostgresConnectorConfig.TABLE_WHITELIST, "public.alias_table"), + false); + + waitForStreamingToStart(); + + // note: skipped macaddr8 as that is only supported on PG10+ but was manually tested + TestHelper.execute("CREATE DOMAIN bit2 AS BIT(3);"); + TestHelper.execute("CREATE DOMAIN smallint2 AS smallint;"); + TestHelper.execute("CREATE DOMAIN integer2 as integer;"); + TestHelper.execute("CREATE DOMAIN bigint2 as bigint;"); + TestHelper.execute("CREATE DOMAIN real2 as real;"); + TestHelper.execute("CREATE DOMAIN bool2 AS BOOL DEFAULT false;"); + TestHelper.execute("CREATE DOMAIN float82 as float8;"); + TestHelper.execute("CREATE DOMAIN numeric2 as numeric(6,2);"); + TestHelper.execute("CREATE DOMAIN string2 AS varchar(25) DEFAULT NULL;"); + TestHelper.execute("CREATE DOMAIN date2 AS date;"); + TestHelper.execute("CREATE DOMAIN time2 as time;"); + TestHelper.execute("CREATE DOMAIN timetz2 as timetz;"); + TestHelper.execute("CREATE DOMAIN timestamp2 as timestamp;"); + TestHelper.execute("CREATE DOMAIN timestamptz2 AS timestamptz;"); + TestHelper.execute("CREATE DOMAIN timewotz2 as time without time zone;"); + TestHelper.execute("CREATE DOMAIN box2 as box;"); + TestHelper.execute("CREATE DOMAIN circle2 as circle;"); + TestHelper.execute("CREATE DOMAIN interval2 as interval;"); + TestHelper.execute("CREATE DOMAIN line2 as line;"); + TestHelper.execute("CREATE DOMAIN lseg2 as lseg;"); + TestHelper.execute("CREATE DOMAIN path2 as path;"); + TestHelper.execute("CREATE DOMAIN point2 as point;"); + TestHelper.execute("CREATE DOMAIN polygon2 as polygon;"); + TestHelper.execute("CREATE DOMAIN char2 as char;"); + TestHelper.execute("CREATE DOMAIN text2 as text;"); + TestHelper.execute("CREATE DOMAIN json2 as json;"); + TestHelper.execute("CREATE DOMAIN xml2 as xml;"); + TestHelper.execute("CREATE DOMAIN uuid2 as uuid;"); + TestHelper.execute("CREATE DOMAIN varbit2 as varbit(3);"); + TestHelper.execute("CREATE DOMAIN inet2 as inet;"); + TestHelper.execute("CREATE DOMAIN cidr2 as cidr;"); + TestHelper.execute("CREATE DOMAIN macaddr2 as macaddr;"); + + TestHelper.execute("ALTER TABLE alias_table " + + "ADD COLUMN bit_base bit(3) NOT NULL, ADD COLUMN bit_alias bit2 NOT NULL, " + + "ADD COLUMN smallint_base smallint NOT NULL, ADD COLUMN smallint_alias smallint2 NOT NULL, " + + "ADD COLUMN integer_base integer NOT NULL, ADD COLUMN integer_alias integer2 NOT NULL, " + + "ADD COLUMN bigint_base bigint NOT NULL, ADD COLUMN bigint_alias bigint2 NOT NULL, " + + "ADD COLUMN real_base real NOT NULL, ADD COLUMN real_alias real2 NOT NULL, " + + "ADD COLUMN float8_base float8 NOT NULL, ADD COLUMN float8_alias float82 NOT NULL, " + + "ADD COLUMN numeric_base numeric(6,2) NOT NULL, ADD COLUMN numeric_alias numeric2 NOT NULL, " + + "ADD COLUMN bool_base bool NOT NULL, ADD COLUMN bool_alias bool2 NOT NULL, " + + "ADD COLUMN string_base varchar(25) NOT NULL, ADD COLUMN string_alias string2 NOT NULL, " + + "ADD COLUMN date_base date NOT NULL, ADD COLUMN date_alias date2 NOT NULL, " + + "ADD COLUMN time_base time NOT NULL, ADD COLUMN time_alias time2 NOT NULL, " + + "ADD COLUMN timetz_base timetz NOT NULL, ADD COLUMN timetz_alias timetz2 NOT NULL, " + + "ADD COLUMN timestamp_base timestamp NOT NULL, ADD COLUMN timestamp_alias timestamp2 NOT NULL, " + + "ADD COLUMN timestamptz_base timestamptz NOT NULL, ADD COLUMN timestamptz_alias timestamptz2 NOT NULL, " + + "ADD COLUMN timewottz_base time without time zone NOT NULL, ADD COLUMN timewottz_alias timewotz2 NOT NULL, " + + "ADD COLUMN box_base box NOT NULL, ADD COLUMN box_alias box2 NOT NULL, " + + "ADD COLUMN circle_base circle NOT NULL, ADD COLUMN circle_alias circle2 NOT NULL, " + + "ADD COLUMN interval_base interval NOT NULL, ADD COLUMN interval_alias interval2 NOT NULL, " + + "ADD COLUMN line_base line NOT NULL, ADD COLUMN line_alias line2 NOT NULL, " + + "ADD COLUMN lseg_base lseg NOT NULL, ADD COLUMN lseg_alias lseg2 NOT NULL, " + + "ADD COLUMN path_base path NOT NULL, ADD COLUMN path_alias path2 NOT NULL, " + + "ADD COLUMN point_base point NOT NULL, ADD COLUMN point_alias point2 NOT NULL, " + + "ADD COLUMN polygon_base polygon NOT NULL, ADD COLUMN polygon_alias polygon2 NOT NULL, " + + "ADD COLUMN char_base char NOT NULL, ADD COLUMN char_alias char2 NOT NULL, " + + "ADD COLUMN text_base text NOT NULL, ADD COLUMN text_alias text2 NOT NULL, " + + "ADD COLUMN json_base json NOT NULL, ADD COLUMN json_alias json2 NOT NULL, " + + "ADD COLUMN xml_base xml NOT NULL, ADD COLUMN xml_alias xml2 NOT NULL, " + + "ADD COLUMN uuid_base UUID NOT NULL, ADD COLUMN uuid_alias uuid2 NOT NULL, " + + "ADD COLUMN varbit_base varbit(3) NOT NULL, ADD COLUMN varbit_alias varbit2 NOT NULL," + + "ADD COLUMN inet_base inet NOT NULL, ADD COLUMN inet_alias inet2 NOT NULL, " + + "ADD COLUMN cidr_base cidr NOT NULL, ADD COLUMN cidr_alias cidr2 NOT NULL, " + + "ADD COLUMN macaddr_base macaddr NOT NULL, ADD COLUMN macaddr_alias macaddr2 NOT NULL"); + + consumer = testConsumer(1); + executeAndWait("INSERT INTO alias_table (" + + "bit_base, bit_alias, " + + "smallint_base, smallint_alias, " + + "integer_base, integer_alias, " + + "bigint_base, bigint_alias, " + + "real_base, real_alias, " + + "float8_base, float8_alias, " + + "numeric_base, numeric_alias, " + + "bool_base, bool_alias, " + + "string_base, string_alias, " + + "date_base, date_alias, " + + "time_base, time_alias, " + + "timetz_base, timetz_alias, " + + "timestamp_base, timestamp_alias, " + + "timestamptz_base, timestamptz_alias, " + + "timewottz_base, timewottz_alias, " + + "box_base, box_alias, " + + "circle_base, circle_alias, " + + "interval_base, interval_alias, " + + "line_base, line_alias, " + + "lseg_base, lseg_alias, " + + "path_base, path_alias, " + + "point_base, point_alias, " + + "polygon_base, polygon_alias, " + + "char_base, char_alias, " + + "text_base, text_alias, " + + "json_base, json_alias, " + + "xml_base, xml_alias, " + + "uuid_base, uuid_alias, " + + "varbit_base, varbit_alias, " + + "inet_base, inet_alias, " + + "cidr_base, cidr_alias, " + + "macaddr_base, macaddr_alias " + + ") VALUES (" + + "B'101', B'101', " + + "1, 1, " + + "1, 1, " + + "1000, 1000, " + + "3.14, 3.14, " + + "3.14, 3.14, " + + "1234.12, 1234.12, " + + "true, true, " + + "'hello', 'hello', " + + "'2019-10-02', '2019-10-02', " + + "'01:02:03', '01:02:03', " + + "'01:02:03.123789Z', '01:02:03.123789Z', " + + "'2019-10-02T01:02:03.123456', '2019-10-02T01:02:03.123456', " + + "'2019-10-02T13:51:30.123456+02:00'::TIMESTAMPTZ, '2019-10-02T13:51:30.123456+02:00'::TIMESTAMPTZ, " + + "'01:02:03', '01:02:03', " + + "'(0,0),(1,1)', '(0,0),(1,1)', " + + "'10,4,10', '10,4,10', " + + "'1 year 2 months 3 days 4 hours 5 minutes 6 seconds', '1 year 2 months 3 days 4 hours 5 minutes 6 seconds', " + + "'(0,0),(0,1)', '(0,0),(0,1)', " + + "'((0,0),(0,1))', '((0,0),(0,1))', " + + "'((0,0),(0,1),(0,2))', '((0,0),(0,1),(0,2))', " + + "'(1,1)', '(1,1)', " + + "'((0,0),(0,1),(1,0),(0,0))', '((0,0),(0,1),(1,0),(0,0))', " + + "'a', 'a', " + + "'Hello World', 'Hello World', " + + "'{\"key\": \"value\"}', '{\"key\": \"value\"}', " + + "XML('Hello'), XML('Hello'), " + + "'40e6215d-b5c6-4896-987c-f30f3678f608', '40e6215d-b5c6-4896-987c-f30f3678f608', " + + "B'101', B'101', " + + "'192.168.0.1', '192.168.0.1', " + + "'192.168/24', '192.168/24', " + + "'08:00:2b:01:02:03', '08:00:2b:01:02:03' " + + ");"); + + SourceRecord rec = assertRecordInserted("public.alias_table", PK_FIELD, 1); + assertSourceInfo(rec, "postgres", "public", "alias_table"); + + assertRecordSchemaAndValues(schemasAndValuesForDomainAliasTypes(true), rec, Envelope.FieldName.AFTER); + assertThat(consumer.isEmpty()).isTrue(); + } + + @Test + @FixFor("DBZ-920") + public void shouldStreamEnumAsKnownType() throws Exception { + // Specifically enable `column.propagate.source.type` here to validate later that the actual + // type, length, and scale values are resolved correctly when paired with Enum types. + TestHelper.execute("CREATE TABLE enum_table (pk SERIAL, PRIMARY KEY (pk));"); + startConnector(config -> config + .with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER) + .with("column.propagate.source.type", "public.enum_table.value") + .with(PostgresConnectorConfig.TABLE_WHITELIST, "public.enum_table"), false); + + waitForStreamingToStart(); + + // We create the enum type after streaming started to simulate some future schema change + TestHelper.execute("CREATE TYPE test_type AS ENUM ('V1','V2');"); + TestHelper.execute("ALTER TABLE enum_table ADD COLUMN value test_type NOT NULL"); + + consumer = testConsumer(1); + executeAndWait("INSERT INTO enum_table (value) VALUES ('V1');"); + + SourceRecord rec = assertRecordInserted("public.enum_table", PK_FIELD, 1); + assertSourceInfo(rec, "postgres", "public", "enum_table"); + + List expected = Arrays.asList( + new SchemaAndValueField(PK_FIELD, Schema.INT32_SCHEMA, 1), + new SchemaAndValueField("value", Enum.builder("V1,V2") + .parameter(TestHelper.TYPE_NAME_PARAMETER_KEY, "TEST_TYPE") + .parameter(TestHelper.TYPE_LENGTH_PARAMETER_KEY, String.valueOf(Integer.MAX_VALUE)) + .parameter(TestHelper.TYPE_SCALE_PARAMETER_KEY, "0") + .build(), "V1")); + + assertRecordSchemaAndValues(expected, rec, Envelope.FieldName.AFTER); + assertThat(consumer.isEmpty()).isTrue(); + } + + private long asEpochMicros(String timestamp) { + Instant instant = LocalDateTime.parse(timestamp).atOffset(ZoneOffset.UTC).toInstant(); + return instant.getEpochSecond() * 1_000_000 + instant.getNano() / 1_000; + } + private void testReceiveChangesForReplicaIdentityFullTableWithToastedValue(PostgresConnectorConfig.SchemaRefreshMode mode, boolean tablesBeforeStart) throws Exception { if (tablesBeforeStart) { diff --git a/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java b/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java index 5fe044dab04..63b2f17d9e0 100644 --- a/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java +++ b/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java @@ -969,6 +969,17 @@ protected int resolveNativeType(String typeName) { return Column.UNSET_INT_VALUE; } + /** + * Resolves the supplied metadata JDBC type to a final JDBC type. + * + * @param metadataJdbcType the JDBC type from the underlying driver's metadata lookup + * @param nativeType the database native type or -1 for unknown + * @return the resolved JDBC type + */ + protected int resolveJdbcType(int metadataJdbcType, int nativeType) { + return metadataJdbcType; + } + /** * Create definitions for each tables in the database, given the catalog name, schema pattern, table filter, and * column filter. @@ -1055,7 +1066,6 @@ protected Optional readTableColumn(ResultSet columnMetadata, Table final String columnName = columnMetadata.getString(4); if (columnFilter == null || columnFilter.matches(tableId.catalog(), tableId.schema(), tableId.table(), columnName)) { final ColumnEditor column = Column.editor().name(columnName); - column.jdbcType(columnMetadata.getInt(5)); column.type(columnMetadata.getString(6)); column.length(columnMetadata.getInt(7)); if (columnMetadata.getObject(9) != null) { @@ -1074,6 +1084,7 @@ protected Optional readTableColumn(ResultSet columnMetadata, Table column.generated("YES".equalsIgnoreCase(autogenerated)); column.nativeType(resolveNativeType(column.typeName())); + column.jdbcType(resolveJdbcType(columnMetadata.getInt(5), column.nativeType())); return Optional.of(column); } @@ -1203,7 +1214,7 @@ public static void columnsFor(ResultSet resultSet, Consumer consumer) th } } - private static boolean isNullable(int jdbcNullable) { + protected static boolean isNullable(int jdbcNullable) { return jdbcNullable == ResultSetMetaData.columnNullable || jdbcNullable == ResultSetMetaData.columnNullableUnknown; } diff --git a/debezium-core/src/main/java/io/debezium/jdbc/JdbcValueConverters.java b/debezium-core/src/main/java/io/debezium/jdbc/JdbcValueConverters.java index 10cfcec8664..c1e0c42a44c 100644 --- a/debezium-core/src/main/java/io/debezium/jdbc/JdbcValueConverters.java +++ b/debezium-core/src/main/java/io/debezium/jdbc/JdbcValueConverters.java @@ -246,8 +246,8 @@ public SchemaBuilder schemaBuilder(Column column) { return SchemaBuilder.bytes(); // Unhandled types - case Types.ARRAY: case Types.DISTINCT: + case Types.ARRAY: case Types.JAVA_OBJECT: case Types.OTHER: case Types.REF: @@ -257,7 +257,6 @@ public SchemaBuilder schemaBuilder(Column column) { break; } return null; - } @Override @@ -341,8 +340,8 @@ public ValueConverter converter(Column column, Field fieldDefn) { return (data) -> convertRowId(column, fieldDefn, data); // Unhandled types - case Types.ARRAY: case Types.DISTINCT: + case Types.ARRAY: case Types.JAVA_OBJECT: case Types.OTHER: case Types.REF: diff --git a/documentation/modules/ROOT/pages/connectors/postgresql.adoc b/documentation/modules/ROOT/pages/connectors/postgresql.adoc index 0dc68e0f6b8..eaf69136033 100644 --- a/documentation/modules/ROOT/pages/connectors/postgresql.adoc +++ b/documentation/modules/ROOT/pages/connectors/postgresql.adoc @@ -1018,6 +1018,11 @@ The _semantic type_ describes how the Kafka Connect schema captures the _meaning |n/a |Contains the string representation of a date range. It always has an exclusive upper-bound. +|`ENUM` +|`STRING` +|`io.debezium.data.Enum` +|Contains the string representation of the PostgreSQL ENUM value. The set of allowed values are maintained in the schema parameter named `allowed`. + |======================= Other data type mappings are described in the following sections. @@ -1254,6 +1259,21 @@ However, when `hstore.handling.mode` configuration property is set to `json`, th |======================= +[[domain-types]] +==== PostgreSQL Domain Types + +PostgreSQL also supports the notion of user-defined types that are based upon other underlying types. +When such column types are used, Debezium exposes the column's representation based on the full type hierarchy. + +[IMPORTANT] +==== +Special consideration should be taken when monitoring columns that use domain types. + +When a column is defined using a domain type that extends one of the default database types and the domain type defines a custom length/scale, the generated schema will inherit that defined length/scale. + +When a column is defined using a domain type that extends another domain type that defines a custom length/scale, the generated schema will **not** inherit the defined length/scale because the PostgreSQL driver's column metadata implementation. +==== + [[postgis-types]] [[network-address-types]]