Skip to content

Commit

Permalink
[improve][io] JDBC Sink: add flag to exclude non declared fields (#18008
Browse files Browse the repository at this point in the history
)

* [improve][io] JDBC Sink: add flag to exclude non declared fields

* rename and doc

* fix doc

* Update pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java

Co-authored-by: tison <wander4096@gmail.com>

Co-authored-by: tison <wander4096@gmail.com>
  • Loading branch information
nicoloboschi and tisonkun committed Oct 18, 2022
1 parent eed8c74 commit d901138
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,10 @@ private void initStatement() throws Exception {
List<String> keyList = getListFromConfig(jdbcSinkConfig.getKey());
List<String> nonKeyList = getListFromConfig(jdbcSinkConfig.getNonKey());

tableDefinition = JdbcUtils.getTableDefinition(connection, tableId, keyList, nonKeyList);
tableDefinition = JdbcUtils.getTableDefinition(connection, tableId,
keyList, nonKeyList, jdbcSinkConfig.isExcludeNonDeclaredFields());
insertStatement = connection.prepareStatement(generateInsertQueryStatement());

if (jdbcSinkConfig.getInsertMode() == JdbcSinkConfig.InsertMode.UPSERT) {
if (nonKeyList.isEmpty() || keyList.isEmpty()) {
throw new IllegalStateException("UPSERT mode is not configured if 'key' and 'nonKey' "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,17 @@ public class JdbcSinkConfig implements Serializable {
help = "Fields used in where condition of update and delete Events. A comma-separated list."
)
private String key;

@FieldDoc(
required = false,
defaultValue = "false",
help = "All the table fields are discovered automatically. 'excludeNonDeclaredFields' indicates if the "
+ "table fields not explicitly listed in `nonKey` and `key` must be included in the query. "
+ "By default all the table fields are included. To leverage of table fields defaults "
+ "during insertion, it is suggested to set this value to `true`."
)
private boolean excludeNonDeclaredFields = false;

@FieldDoc(
required = false,
defaultValue = "500",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.util.Collections;
import java.util.List;
import java.util.StringJoiner;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -113,10 +114,16 @@ public static TableId getTableId(Connection connection, String tableName) throws
* Get the {@link TableDefinition} for the given table.
*/
public static TableDefinition getTableDefinition(
Connection connection, TableId tableId, List<String> keyList, List<String> nonKeyList) throws Exception {
Connection connection, TableId tableId,
List<String> keyList,
List<String> nonKeyList,
boolean excludeNonDeclaredFields) throws Exception {
TableDefinition table = TableDefinition.of(
tableId, Lists.newArrayList(), Lists.newArrayList(), Lists.newArrayList());

keyList = keyList == null ? Collections.emptyList(): keyList;
nonKeyList = nonKeyList == null ? Collections.emptyList(): nonKeyList;

try (ResultSet rs = connection.getMetaData().getColumns(
tableId.getCatalogName(),
tableId.getSchemaName(),
Expand All @@ -130,26 +137,21 @@ public static TableDefinition getTableDefinition(
final String typeName = rs.getString(6);
final int position = rs.getInt(17);

ColumnId columnId = ColumnId.of(tableId, columnName, sqlDataType, typeName, position);
table.columns.add(columnId);
if (keyList != null) {
keyList.forEach((key) -> {
if (key.equals(columnName)) {
table.keyColumns.add(columnId);
}
});
}
if (nonKeyList != null) {
nonKeyList.forEach((key) -> {
if (key.equals(columnName)) {
table.nonKeyColumns.add(columnId);
}
});
}

if (log.isDebugEnabled()) {
log.debug("Get column. name: {}, data type: {}, position: {}", columnName, typeName, position);
}

ColumnId columnId = ColumnId.of(tableId, columnName, sqlDataType, typeName, position);

if (keyList.contains(columnName)) {
table.keyColumns.add(columnId);
table.columns.add(columnId);
} else if (nonKeyList.contains(columnName)) {
table.nonKeyColumns.add(columnId);
table.columns.add(columnId);
} else if (!excludeNonDeclaredFields) {
table.columns.add(columnId);
}
}
return table;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import java.io.IOException;
Expand All @@ -50,22 +51,29 @@ public void tearDown() throws IOException, SQLException {
sqliteUtils.tearDown();
}

@Test
public void TestGetTableId() throws Exception {
@DataProvider(name = "excludeNonDeclaredFields")
public Object[] excludeNonDeclaredFields() {
return new Object[]{
false, true
};
}

@Test(dataProvider = "excludeNonDeclaredFields")
public void testGetTableId(boolean excludeNonDeclaredFields) throws Exception {
String tableName = "TestGetTableId";

sqliteUtils.createTable(
"CREATE TABLE " + tableName + "(" +
" firstName TEXT," +
" lastName TEXT," +
" age INTEGER," +
" bool NUMERIC," +
" byte INTEGER," +
" bool NUMERIC NULL," +
" byte INTEGER NULL," +
" short INTEGER NULL," +
" long INTEGER," +
" float NUMERIC," +
" double NUMERIC," +
" bytes BLOB, " +
" float NUMERIC NULL," +
" double NUMERIC NULL," +
" bytes BLOB NULL, " +
"PRIMARY KEY (firstName, lastName));"
);

Expand All @@ -84,39 +92,87 @@ public void TestGetTableId() throws Exception {
List<String> nonKeyList = Lists.newArrayList();
nonKeyList.add("age");
nonKeyList.add("long");
TableDefinition table = JdbcUtils.getTableDefinition(connection, id, keyList, nonKeyList);
Assert.assertEquals(table.getColumns().get(0).getName(), "firstName");
Assert.assertEquals(table.getColumns().get(0).getTypeName(), "TEXT");
Assert.assertEquals(table.getColumns().get(2).getName(), "age");
Assert.assertEquals(table.getColumns().get(2).getTypeName(), "INTEGER");
Assert.assertEquals(table.getColumns().get(7).getName(), "float");
Assert.assertEquals(table.getColumns().get(7).getTypeName(), "NUMERIC");
Assert.assertEquals(table.getKeyColumns().get(0).getName(), "firstName");
Assert.assertEquals(table.getKeyColumns().get(0).getTypeName(), "TEXT");
Assert.assertEquals(table.getKeyColumns().get(1).getName(), "lastName");
Assert.assertEquals(table.getKeyColumns().get(1).getTypeName(), "TEXT");
Assert.assertEquals(table.getNonKeyColumns().get(0).getName(), "age");
Assert.assertEquals(table.getNonKeyColumns().get(0).getTypeName(), "INTEGER");
Assert.assertEquals(table.getNonKeyColumns().get(1).getName(), "long");
Assert.assertEquals(table.getNonKeyColumns().get(1).getTypeName(), "INTEGER");
// Test get getTableDefinition
log.info("verify buildInsertSql");
String expctedInsertStatement = "INSERT INTO " + tableName +
"(firstName,lastName,age,bool,byte,short,long,float,double,bytes)" +
" VALUES(?,?,?,?,?,?,?,?,?,?)";
String insertStatement = JdbcUtils.buildInsertSql(table);
Assert.assertEquals(insertStatement, expctedInsertStatement);
log.info("verify buildUpdateSql");
String expectedUpdateStatement = "UPDATE " + tableName +
" SET age=? ,long=? WHERE firstName=? AND lastName=?";
String updateStatement = JdbcUtils.buildUpdateSql(table);
Assert.assertEquals(updateStatement, expectedUpdateStatement);
log.info("verify buildDeleteSql");
String expectedDeleteStatement = "DELETE FROM " + tableName +
" WHERE firstName=? AND lastName=?";
String deleteStatement = JdbcUtils.buildDeleteSql(table);
Assert.assertEquals(deleteStatement, expectedDeleteStatement);
TableDefinition table = JdbcUtils.getTableDefinition(connection, id, keyList, nonKeyList,
excludeNonDeclaredFields);
if (!excludeNonDeclaredFields) {
Assert.assertEquals(table.getColumns().size(), 10);
Assert.assertEquals(table.getColumns().get(0).getName(), "firstName");
Assert.assertEquals(table.getColumns().get(0).getTypeName(), "TEXT");
Assert.assertEquals(table.getColumns().get(2).getName(), "age");
Assert.assertEquals(table.getColumns().get(2).getTypeName(), "INTEGER");
Assert.assertEquals(table.getColumns().get(7).getName(), "float");
Assert.assertEquals(table.getColumns().get(7).getTypeName(), "NUMERIC");

Assert.assertEquals(table.getKeyColumns().size(), 2);
Assert.assertEquals(table.getKeyColumns().get(0).getName(), "firstName");
Assert.assertEquals(table.getKeyColumns().get(0).getTypeName(), "TEXT");
Assert.assertEquals(table.getKeyColumns().get(1).getName(), "lastName");
Assert.assertEquals(table.getKeyColumns().get(1).getTypeName(), "TEXT");

Assert.assertEquals(table.getNonKeyColumns().size(), 2);
Assert.assertEquals(table.getNonKeyColumns().get(0).getName(), "age");
Assert.assertEquals(table.getNonKeyColumns().get(0).getTypeName(), "INTEGER");
Assert.assertEquals(table.getNonKeyColumns().get(1).getName(), "long");
Assert.assertEquals(table.getNonKeyColumns().get(1).getTypeName(), "INTEGER");
// Test get getTableDefinition
log.info("verify buildInsertSql");
String expctedInsertStatement = "INSERT INTO " + tableName +
"(firstName,lastName,age,bool,byte,short,long,float,double,bytes)" +
" VALUES(?,?,?,?,?,?,?,?,?,?)";
String insertStatement = JdbcUtils.buildInsertSql(table);
Assert.assertEquals(insertStatement, expctedInsertStatement);
log.info("verify buildUpdateSql");
String expectedUpdateStatement = "UPDATE " + tableName +
" SET age=? ,long=? WHERE firstName=? AND lastName=?";
String updateStatement = JdbcUtils.buildUpdateSql(table);
Assert.assertEquals(updateStatement, expectedUpdateStatement);
log.info("verify buildDeleteSql");
String expectedDeleteStatement = "DELETE FROM " + tableName +
" WHERE firstName=? AND lastName=?";
String deleteStatement = JdbcUtils.buildDeleteSql(table);
Assert.assertEquals(deleteStatement, expectedDeleteStatement);
} else {
Assert.assertEquals(table.getColumns().size(), 4);
Assert.assertEquals(table.getColumns().get(0).getName(), "firstName");
Assert.assertEquals(table.getColumns().get(0).getTypeName(), "TEXT");
Assert.assertEquals(table.getColumns().get(1).getName(), "lastName");
Assert.assertEquals(table.getColumns().get(1).getTypeName(), "TEXT");
Assert.assertEquals(table.getColumns().get(2).getName(), "age");
Assert.assertEquals(table.getColumns().get(2).getTypeName(), "INTEGER");
Assert.assertEquals(table.getColumns().get(3).getName(), "long");
Assert.assertEquals(table.getColumns().get(3).getTypeName(), "INTEGER");


Assert.assertEquals(table.getKeyColumns().size(), 2);
Assert.assertEquals(table.getKeyColumns().get(0).getName(), "firstName");
Assert.assertEquals(table.getKeyColumns().get(0).getTypeName(), "TEXT");
Assert.assertEquals(table.getKeyColumns().get(1).getName(), "lastName");
Assert.assertEquals(table.getKeyColumns().get(1).getTypeName(), "TEXT");

Assert.assertEquals(table.getNonKeyColumns().size(), 2);
Assert.assertEquals(table.getNonKeyColumns().get(0).getName(), "age");
Assert.assertEquals(table.getNonKeyColumns().get(0).getTypeName(), "INTEGER");
Assert.assertEquals(table.getNonKeyColumns().get(1).getName(), "long");
Assert.assertEquals(table.getNonKeyColumns().get(1).getTypeName(), "INTEGER");
// Test get getTableDefinition
log.info("verify buildInsertSql");
String expctedInsertStatement = "INSERT INTO " + tableName +
"(firstName,lastName,age,long)" +
" VALUES(?,?,?,?)";
String insertStatement = JdbcUtils.buildInsertSql(table);
Assert.assertEquals(insertStatement, expctedInsertStatement);
log.info("verify buildUpdateSql");
String expectedUpdateStatement = "UPDATE " + tableName +
" SET age=? ,long=? WHERE firstName=? AND lastName=?";
String updateStatement = JdbcUtils.buildUpdateSql(table);
Assert.assertEquals(updateStatement, expectedUpdateStatement);
log.info("verify buildDeleteSql");
String expectedDeleteStatement = "DELETE FROM " + tableName +
" WHERE firstName=? AND lastName=?";
String deleteStatement = JdbcUtils.buildDeleteSql(table);
Assert.assertEquals(deleteStatement, expectedDeleteStatement);

}
}
}

}

0 comments on commit d901138

Please sign in to comment.