Skip to content

Commit

Permalink
[FLINK-20539][table-planner] Fix type mismatch when using ROW in comp…
Browse files Browse the repository at this point in the history
…uted column

(cherry picked from commit db934bf)
  • Loading branch information
xuyangzhong committed May 17, 2024
1 parent 4848a50 commit 08ba649
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.calcite.sql.fun;

import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.StructKind;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlOperatorBinding;
Expand All @@ -27,14 +28,17 @@
import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.type.InferTypes;
import org.apache.calcite.sql.type.OperandTypes;
import org.apache.calcite.util.Pair;

import java.util.AbstractList;
import java.util.Map;

/**
* Copied to keep null semantics of table api and sql in sync. At the same time SQL standard says
* that the next about `ROW`:
* Copied to keep null semantics of table api and sql in sync.
*
* <p>There are differences following:
*
* <p>1. The return value about {@code R IS NULL} and {@code R IS NOT NULL}.
*
* <p>At the same time SQL standard says that the next about `ROW`:
*
* <ul>
* <li>The value of {@code R IS NULL} is:
Expand Down Expand Up @@ -66,12 +70,19 @@
* </ul>
* </ul>
*
* Once Flink applies same logic for both table api and sql, this class should be removed.
* <p>Once Flink applies same logic for both table api and sql, this first changes should be
* removed.
*
* <p>2. It uses {@link StructKind#PEEK_FIELDS_NO_EXPAND} with a nested struct type (Flink [[{@link
* org.apache.flink.table.types.logical.RowType}]]).
*
* <p>See more at {@link org.apache.flink.table.planner.typeutils.LogicalRelDataTypeConverter} and
* {@link org.apache.flink.table.planner.calcite.FlinkTypeFactory}.
*
* <p>Changed lines
*
* <ol>
* <li>Line 92 ~ 112
* <li>Line 106 ~ 137
* </ol>
*/
public class SqlRowOperator extends SqlSpecialOperator {
Expand All @@ -96,20 +107,31 @@ public RelDataType inferReturnType(SqlOperatorBinding opBinding) {
// The type of a ROW(e1,e2) expression is a record with the types
// {e1type,e2type}. According to the standard, field names are
// implementation-defined.
int fieldCount = opBinding.getOperandCount();
return opBinding
.getTypeFactory()
.createStructType(
new AbstractList<Map.Entry<String, RelDataType>>() {
StructKind.PEEK_FIELDS_NO_EXPAND,
new AbstractList<RelDataType>() {
@Override
public RelDataType get(int index) {
return opBinding.getOperandType(index);
}

@Override
public int size() {
return fieldCount;
}
},
new AbstractList<String>() {
@Override
public Map.Entry<String, RelDataType> get(int index) {
return Pair.of(
SqlUtil.deriveAliasFromOrdinal(index),
opBinding.getOperandType(index));
public String get(int index) {
return SqlUtil.deriveAliasFromOrdinal(index);
}

@Override
public int size() {
return opBinding.getOperandCount();
return fieldCount;
}
});
// ----- FLINK MODIFICATION END -----
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,24 @@ LogicalProject(timestamp=[$0], metadata_timestamp=[$1], other=[$2], computed_oth
<![CDATA[
Calc(select=[timestamp, metadata_timestamp, other, UPPER(other) AS computed_other, CAST(metadata_timestamp AS VARCHAR(2147483647)) AS computed_timestamp])
+- TableSourceScan(table=[[default_catalog, default_database, MetadataTable]], fields=[timestamp, other, metadata_timestamp])
]]>
</Resource>
</TestCase>
<TestCase name="testDDLWithRowTypeComputedColumn">
<Resource name="sql">
<![CDATA[SELECT * FROM t1]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2])
+- LogicalProject(a=[$0], b=[$1], c=[ROW($0, $1)])
+- LogicalTableScan(table=[[default_catalog, default_database, t1]])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[a, b, ROW(a, b) AS c])
+- TableSourceScan(table=[[default_catalog, default_database, t1]], fields=[a, b])
]]>
</Resource>
</TestCase>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,20 @@ class TableScanTest extends TableTestBase {
util.verifyExecPlan("SELECT * FROM t1")
}

@Test
def testDDLWithRowTypeComputedColumn(): Unit = {
util.addTable(s"""
|create table t1(
| a int,
| b varchar,
| c as row(a, b)
|) with (
| 'connector' = 'values'
|)
""".stripMargin)
util.verifyExecPlan("SELECT * FROM t1")
}

@Test
def testDDLWithMetadataColumn(): Unit = {
// tests reordering, skipping, and casting of metadata
Expand Down

0 comments on commit 08ba649

Please sign in to comment.