You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I am currently evaluating ksqldb and confluent cloud as a possible solution for a new project.
I am using this setup:
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
d80264a9b068 docker.io/confluentinc/cp-kafka:7.5.2 /etc/confluent/do... 2 days ago Up 2 days 0.0.0.0:9092->9092/tcp, 0.0.0.0:9101->9101/tcp broker
c7a682693a9a docker.io/confluentinc/cp-schema-registry:7.5.2 /etc/confluent/do... 2 days ago Up 30 hours 0.0.0.0:8081->8081/tcp schema-registry
6745244f5009 docker.io/cnfldemos/cp-server-connect-datagen:0.6.2-7.5.0 /etc/confluent/do... 2 days ago Up 2 days 0.0.0.0:8083->8083/tcp connect
86f2b921d238 docker.io/confluentinc/cp-kafka-rest:7.5.2 /etc/confluent/do... 2 days ago Up 2 days 0.0.0.0:8082->8082/tcp rest-proxy
da8baf91b142 docker.io/confluentinc/cp-ksqldb-server:7.5.2 /etc/confluent/do... 2 days ago Up 2 days 0.0.0.0:8088->8088/tcp ksqldb-server
fc61a849dd37 docker.io/confluentinc/cp-enterprise-control-center:7.5.2 /etc/confluent/do... 2 days ago Up 2 days 0.0.0.0:9021->9021/tcp control-center
d252df63d02e docker.io/confluentinc/cp-ksqldb-cli:7.5.2 2 days ago Up 2 days ksqldb-cli
I have created this table:
CREATE SOURCE TABLE locations (
id VARCHARPRIMARY KEY,
meta ARRAY<STRUCT<key STRING, value BYTES>> HEADERS
) WITH (
KAFKA_TOPIC ='locations',
VALUE_FORMAT ='JSON_SR',
VALUE_SCHEMA_ID =1
);
That uses this schema:
{
"schemaType":"JSON",
"schema":"{\"$id\":\"http://schema-registry:8081/schemas/ids/1\",\"$schema\":\"https://json-schema.org/draft/2020-12/schema\",\"title\":\"Locationvalue\",\"type\":\"object\",\"properties\":{\"profileId\":{\"type\":\"string\",\"description\":\"The id of the location.\"},\"latitude\":{\"type\":\"number\",\"minimum\":-90,\"maximum\":90,\"description\":\"The location's latitude.\"},\"longitude\":{\"type\":\"number\",\"minimum\":-180,\"maximum\":180,\"description\":\"The location's longitude.\"}}}"
}
But decoding that column still does not work.
Using a STREAM instead of a TABLE doesn't change this behavior.
I also tried to to create a simple table:
ksql> CREATE table test (key varchar primary key, value BYTES)
WITH (kafka_topic='test', value_format='json', partitions=1);
Message
---------------
Table created
---------------
ksql> INSERT INTO test (KEY, VALUE) VALUES ('A', TO_BYTES('36d2784c823311eeb2ce05ab766c73d1', 'base64'));
ksql> SELECT KEY, VALUE, FROM_BYTES(VALUE, 'base64') AS FOO FROM test EMIT CHANGES;
+--------------------------------+--------------------------------+--------------------------------+
|KEY |VALUE |FOO |
+--------------------------------+--------------------------------+--------------------------------+
|A |36d2784c823311eeb2ce05ab766c73d1|36d2784c823311eeb2ce05ab766c73d1|
^CQuery terminated
The given value for the VALUE column is not encoded as base64? Or is it decoded implicitly for the output?
When I try to insert this string I get an error:
ksql> INSERT INTO test (KEY, VALUE) VALUES ('B', TO_BYTES('36d2784c-8233-11ee-b2ce-05ab766c73d1', 'base64'));
Failed to insert values into 'TEST'.
the ksqldb-server has this exception logged:
[2023-11-17 13:09:10,692] INFO Processed unsuccessfully: KsqlRequest{configOverrides={auto.offset.reset=earliest}, requestProperties={}, commandSequenceNumber=Optional[40]} (f102667e-3ffa-3bf1-ba78-f7daaef003ed): INSERT INTO stream1 (column1, column2) VALUES ('[string]' ,'[string]'); (io.confluent.ksql.logging.query.QueryLogger)
io.confluent.ksql.util.KsqlStatementException: Failed to insert values into 'TEST'.
at io.confluent.ksql.rest.server.execution.InsertValuesExecutor.buildRecord(InsertValuesExecutor.java:293)
at io.confluent.ksql.rest.server.execution.InsertValuesExecutor.execute(InsertValuesExecutor.java:166)
at io.confluent.ksql.rest.server.validation.CustomValidators.validate(CustomValidators.java:145)
at io.confluent.ksql.rest.server.validation.RequestValidator.validate(RequestValidator.java:166)
at io.confluent.ksql.rest.server.validation.RequestValidator.validate(RequestValidator.java:129)
at io.confluent.ksql.rest.server.resources.KsqlResource.handleKsqlStatements(KsqlResource.java:310)
at io.confluent.ksql.rest.server.KsqlServerEndpoints.lambda$executeKsqlRequest$2(KsqlServerEndpoints.java:183)
at io.confluent.ksql.rest.server.KsqlServerEndpoints.lambda$executeOldApiEndpointOnWorker$24(KsqlServerEndpoints.java:348)
at io.confluent.ksql.rest.server.KsqlServerEndpoints.lambda$executeOnWorker$23(KsqlServerEndpoints.java:334)
at io.vertx.core.impl.ContextBase.lambda$null$0(ContextBase.java:137)
at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:264)
at io.vertx.core.impl.ContextBase.lambda$executeBlocking$1(ContextBase.java:135)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: io.confluent.ksql.util.KsqlException:
at io.confluent.ksql.engine.generic.GenericExpressionResolver$1.error(GenericExpressionResolver.java:62)
at io.confluent.ksql.execution.codegen.CompiledExpression.evaluate(CompiledExpression.java:96)
at io.confluent.ksql.engine.generic.GenericExpressionResolver$Visitor.visitExpression(GenericExpressionResolver.java:114)
at io.confluent.ksql.engine.generic.GenericExpressionResolver$Visitor.visitExpression(GenericExpressionResolver.java:99)
at io.confluent.ksql.execution.expression.tree.VisitParentExpressionVisitor.visitFunctionCall(VisitParentExpressionVisitor.java:101)
at io.confluent.ksql.execution.expression.tree.FunctionCall.accept(FunctionCall.java:65)
at io.confluent.ksql.execution.expression.tree.ExpressionVisitor.process(ExpressionVisitor.java:21)
at io.confluent.ksql.engine.generic.GenericExpressionResolver.resolve(GenericExpressionResolver.java:96)
at io.confluent.ksql.engine.generic.GenericRecordFactory.resolveValues(GenericRecordFactory.java:161)
at io.confluent.ksql.engine.generic.GenericRecordFactory.build(GenericRecordFactory.java:90)
at io.confluent.ksql.rest.server.execution.InsertValuesExecutor.buildRecord(InsertValuesExecutor.java:271)
... 15 more
It looks like base64 byte encoding is broken.
Any non alphanumeric string causes an error when using base64.
I can do this:
ksql> INSERT INTO test (KEY, VALUE) VALUES ('C', TO_BYTES('1234567890', 'base64'));
ksql> INSERT INTO test (KEY, VALUE) VALUES ('D', TO_BYTES('1234567890', 'utf8'));
ksql> SELECT KEY, VALUE, FROM_BYTES(VALUE, 'base64') AS FOO FROM test EMIT CHANGES;
+--------------------------------+--------------------------------+--------------------------------+
|KEY |VALUE |FOO |
+--------------------------------+--------------------------------+--------------------------------+
|C |123456789w== |123456789w== |
|D |MTIzNDU2Nzg5MA== |MTIzNDU2Nzg5MA== |
^CQuery terminated
ksql> SELECT KEY, VALUE, FROM_BYTES(VALUE, 'utf8') AS FOO FROM test EMIT CHANGES;
+--------------------------------+--------------------------------+--------------------------------+
|KEY |VALUE |FOO |
+--------------------------------+--------------------------------+--------------------------------+
|C |123456789w== |�m���� |
|D |MTIzNDU2Nzg5MA== |1234567890 |
^CQuery terminated
Notice how the value in row Ccolumn FOO is not decoded when using base64 but it works in row D when decoding the utf8 value.
UPDATE:
I have tried this with a my initial SOURCE table:
I am currently evaluating ksqldb and confluent cloud as a possible solution for a new project.
I am using this setup:
I have created this table:
That uses this schema:
Then I produce data with headers:
And I am able to see it in the table:
My problems start when I want to access the
META
column:META returns the values from column longitude?
Trying to access the data in column META does also not work:
How can I use column
META
?This looks similar to this issue: #8895
When I recreate my table to get direct access to the header value:
I see my header value in the new column.
The value
MzZkMjc4NGMtODIzMy0xMWVlLWIyY2UtMDVhYjc2NmM3M2Qx
is the string36d2784c-8233-11ee-b2ce-05ab766c73d1
inbase64
.But when I try to decode it I get a null column and the values shift to the other columns:
Selecting just that column also does not work:
When I recreate the table as a non
SOURCE
table I can select theCORRELATIONID
column:But decoding that column still does not work.
Using a
STREAM
instead of aTABLE
doesn't change this behavior.I also tried to to create a simple table:
The given value for the
VALUE
column is not encoded asbase64
? Or is it decoded implicitly for the output?When I try to insert this string I get an error:
the ksqldb-server has this exception logged:
It looks like
base64
byte encoding is broken.Any non alphanumeric string causes an error when using
base64
.I can do this:
Notice how the value in row
C
columnFOO
is not decoded when usingbase64
but it works in rowD
when decoding theutf8
value.UPDATE:
I have tried this with a my initial
SOURCE
table:Which only gives me the correct result on a push query but not on a pull query.
The text was updated successfully, but these errors were encountered: