Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBZ-1413 Support PostgreSQL Domain and Enum types #1079

Merged
merged 13 commits into from Dec 10, 2019
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -341,10 +341,10 @@ private Table tableFromFromMessage(List<ReplicationMessage.Column> columns, Tabl
final PostgresType type = column.getType();
final ColumnEditor columnEditor = Column.editor()
.name(column.getName())
.jdbcType(type.getJdbcId())
.jdbcType(type.getRootType().getJdbcId())
.type(type.getName())
.optional(column.isOptional())
.nativeType(type.getOid());
.nativeType(type.getRootType().getOid());
columnEditor.length(column.getTypeMetadata().getLength());
columnEditor.scale(column.getTypeMetadata().getScale());
return columnEditor.create();
Expand Down
Expand Up @@ -5,6 +5,7 @@
*/
package io.debezium.connector.postgresql;

import java.util.List;
import java.util.Objects;

import org.postgresql.core.Oid;
Expand All @@ -19,25 +20,31 @@
*/
public class PostgresType {

public static final PostgresType UNKNOWN = new PostgresType("unknown", -1, Integer.MIN_VALUE, null);
public static final PostgresType UNKNOWN = new PostgresType("unknown", -1, Integer.MIN_VALUE, null, null, null, null);

private final String name;
private final int oid;
private final int jdbcId;
private final PostgresType parentType;
private final PostgresType elementType;
private final TypeInfo typeInfo;
private final int modifiers;
private final List<String> enumValues;

public PostgresType(String name, int oid, int jdbcId, TypeInfo typeInfo) {
this(name, oid, jdbcId, typeInfo, null);
private PostgresType(String name, int oid, int jdbcId, TypeInfo typeInfo, List<String> enumValues, PostgresType parentType, PostgresType elementType) {
this(name, oid, jdbcId, TypeRegistry.NO_TYPE_MODIFIER, typeInfo, enumValues, parentType, elementType);
}

public PostgresType(String name, int oid, int jdbcId, TypeInfo typeInfo, PostgresType elementType) {
private PostgresType(String name, int oid, int jdbcId, int modifiers, TypeInfo typeInfo, List<String> enumValues, PostgresType parentType, PostgresType elementType) {
Objects.requireNonNull(name);
this.name = name;
this.oid = oid;
this.jdbcId = jdbcId;
this.elementType = elementType;
this.typeInfo = typeInfo;
this.parentType = parentType;
this.elementType = elementType;
this.modifiers = modifiers;
this.enumValues = enumValues;
}

/**
Expand All @@ -47,6 +54,24 @@ public boolean isArrayType() {
return elementType != null;
}

/**
* The type system allows for the creation of user defined types (UDTs) which can be based
* on any existing type. When a type does not extend another type, it is considered to be
* a base or root type in the type hierarchy.
*
* @return true if this type is a base/root type
*/
public boolean isRootType() {
return parentType == null;
}

/**
* @return true if this type is an enum type
*/
public boolean isEnumType() {
return enumValues != null;
}

/**
*
* @return symbolic name of the type
Expand Down Expand Up @@ -79,6 +104,30 @@ public PostgresType getElementType() {
return elementType;
}

/**
*
* @return the parent postgres type this type is based upon
*/
public PostgresType getParentType() {
return parentType;
}

/**
*
* @return the postgres type at the top/root level for this type's hierarchy
*/
public PostgresType getRootType() {
PostgresType rootType = this;
while (!rootType.isRootType()) {
rootType = rootType.getParentType();
}
return rootType;
}

public List<String> getEnumValues() {
return enumValues;
}

/**
*
* @return the default length of the type
Expand All @@ -87,9 +136,23 @@ public int getDefaultLength() {
if (typeInfo == null) {
return TypeRegistry.UNKNOWN_LENGTH;
}
int size = typeInfo.getPrecision(oid, TypeRegistry.NO_TYPE_MODIFIER);
if (parentType != null) {
if (modifiers == TypeRegistry.NO_TYPE_MODIFIER) {
return parentType.getDefaultLength();
}
else {
int size = typeInfo.getPrecision(parentType.getOid(), modifiers);
if (size == 0) {
size = typeInfo.getDisplaySize(parentType.getOid(), modifiers);
}
if (size != 0 && size != Integer.MAX_VALUE) {
return size;
}
}
}
int size = typeInfo.getPrecision(oid, modifiers);
if (size == 0) {
size = typeInfo.getDisplaySize(oid, TypeRegistry.NO_TYPE_MODIFIER);
size = typeInfo.getDisplaySize(oid, modifiers);
}
return size;
}
Expand All @@ -102,7 +165,15 @@ public int getDefaultScale() {
if (typeInfo == null) {
return TypeRegistry.UNKNOWN_LENGTH;
}
return typeInfo.getScale(oid, TypeRegistry.NO_TYPE_MODIFIER);
if (parentType != null) {
if (modifiers == TypeRegistry.NO_TYPE_MODIFIER) {
return parentType.getDefaultScale();
}
else {
return typeInfo.getScale(parentType.getOid(), modifiers);
}
}
return typeInfo.getScale(oid, modifiers);
}

/**
Expand Down Expand Up @@ -179,7 +250,57 @@ public boolean equals(Object obj) {

@Override
public String toString() {
return "PostgresType [name=" + name + ", oid=" + oid + ", jdbcId=" + jdbcId + ", defaultLength=" + getDefaultLength()
+ ", defaultScale=" + getDefaultScale() + ", elementType=" + elementType + "]";
return "PostgresType [name=" + name + ", oid=" + oid + ", jdbcId=" + jdbcId + ", modifiers=" + modifiers + ", defaultLength=" + getDefaultLength()
+ ", defaultScale=" + getDefaultScale() + ", parentType=" + parentType + ", elementType=" + elementType + "]";
}

public static class Builder {
gunnarmorling marked this conversation as resolved.
Show resolved Hide resolved
private final TypeRegistry typeRegistry;
private final String name;
private final int oid;
private final int jdbcId;
private final int modifiers;
private final TypeInfo typeInfo;
private int parentTypeOid;
private int elementTypeOid;
private List<String> enumValues;

public Builder(TypeRegistry typeRegistry, String name, int oid, int jdbcId, int modifiers, TypeInfo typeInfo) {
this.typeRegistry = typeRegistry;
this.name = name;
this.oid = oid;
this.jdbcId = jdbcId;
this.modifiers = modifiers;
this.typeInfo = typeInfo;
}

public Builder parentType(int parentTypeOid) {
this.parentTypeOid = parentTypeOid;
return this;
}

public Builder elementType(int elementTypeOid) {
this.elementTypeOid = elementTypeOid;
return this;
}

public Builder enumValues(List<String> enumValues) {
this.enumValues = enumValues;
return this;
}

public PostgresType build() {
PostgresType parentType = null;
if (parentTypeOid != 0) {
parentType = typeRegistry.get(parentTypeOid);
}

PostgresType elementType = null;
if (elementTypeOid != 0) {
elementType = typeRegistry.get(elementTypeOid);
}

return new PostgresType(name, oid, jdbcId, modifiers, typeInfo, enumValues, parentType, elementType);
}
}
}
Expand Up @@ -151,7 +151,6 @@ protected PostgresValueConverter(Charset databaseCharset, DecimalMode decimalMod
@Override
public SchemaBuilder schemaBuilder(Column column) {
int oidValue = column.nativeType();

switch (oidValue) {
case PgOid.BIT:
case PgOid.BIT_ARRAY:
Expand Down Expand Up @@ -277,6 +276,12 @@ else if (oidValue == typeRegistry.citextArrayOid()) {
else if (oidValue == typeRegistry.ltreeArrayOid()) {
return SchemaBuilder.array(Ltree.builder().optional().build());
}

final PostgresType resolvedType = typeRegistry.get(oidValue);
if (resolvedType.isEnumType()) {
return io.debezium.data.Enum.builder(Strings.join(",", resolvedType.getEnumValues()));
}

final SchemaBuilder jdbcSchemaBuilder = super.schemaBuilder(column);
if (jdbcSchemaBuilder == null) {
return includeUnknownDatatypes ? SchemaBuilder.bytes() : null;
Expand Down Expand Up @@ -309,7 +314,6 @@ private SchemaBuilder hstoreSchema() {
@Override
public ValueConverter converter(Column column, Field fieldDefn) {
int oidValue = column.nativeType();

switch (oidValue) {
case PgOid.BIT:
case PgOid.VARBIT:
Expand Down Expand Up @@ -417,6 +421,7 @@ else if (oidValue == typeRegistry.geometryArrayOid() ||
oidValue == typeRegistry.hstoreArrayOid()) {
return createArrayConverter(column, fieldDefn);
}

final ValueConverter jdbcConverter = super.converter(column, fieldDefn);
if (jdbcConverter == null) {
return includeUnknownDatatypes ? data -> convertBinary(column, fieldDefn, data) : null;
Expand Down