New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support Kafka Managed IO #31172
Support Kafka Managed IO #31172
Conversation
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
R: @chamikaramj |
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control |
Map<String, Object> remappedConfig = new HashMap<>(); | ||
for (Map.Entry<String, Object> entry : configMap.entrySet()) { | ||
String paramName = entry.getKey(); | ||
if (mapping.containsKey(paramName)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now any parameters that aren't recognized are silently dropped. I think we should be at least warning here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The mapping contains parameter names that should be updated. If a parameter is not included in the mapping, we assume its original name is correct.
But agreed some logging would be helpful. I'll add a log showing the final Row configuration used to build the underlying transform
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P.S. alternatively we can make it mandatory to include all parameter names in the mapping, regardless if they need to be updated or not. (i'm open to this option but i find it unnecessarily strict)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah got it - I think leaving it is fine, thanks
But agreed some logging would be helpful. I'll add a log showing the final Row configuration used to build the underlying transform
Sounds good, thank you!
// The config Row object will be used to build the underlying SchemaTransform. | ||
// If a mapping for the SchemaTransform exists, we use it to update parameter names and align | ||
// with the underlying config schema | ||
Map<String, String> mapping = MAPPINGS.get(config.getTransformIdentifier()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if the transform identifier isn't found - semes like Iceberg already falls into that case, I'm also worried about typos though if folks don't use the enum (maybe rare)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to the reasoning above, a mapping exists only if the transform needs one.
In the Iceberg case, we are already producing a configuration schema with snake_case
convention, so no remapping of names is needed.
@@ -18,16 +18,25 @@ | |||
package org.apache.beam.sdk.io.kafka; | |||
|
|||
import static org.junit.Assert.assertEquals; | |||
import static org.junit.Assert.assertNotNull; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would also be good to add a basic Kafka IT (though this is about testing Managed as much as it is Kafka). I'm fine deferring that to a future PR where we can address this together with Iceberg or including it here, up to you
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks!
Onboarding KafkaIO to be available via Managed API