Using Kafka Streams with Serdes that rely on schema references in the Headers(将Kafka Streams与Serde结合使用,这些Serde依赖于标头中的架构引用)
问题描述
我正在尝试使用Kafka Streams对CDC数据执行KTable-KTable外键联接。我将读取的数据是Avro格式的,但是它被序列化的方式与其他行业序列化程序/反序列化程序(例如。合流架构注册表),因为架构标识符存储在标头中。
当我设置KTables的Serdes时,我的Kafka Streams应用程序最初运行,但最终失败,因为它在内部调用了带有byte[] serialize(String topic, T data);
的序列化程序方法,而不是带有标头的方法(即。byte[] serialize(String topic, Headers headers, T data)
在包装序列化程序ValueAndTimestampSerializer中。我正在使用的Serdes无法处理此问题并引发异常。
第一个问题是,有没有人知道如何恳求Kafka Streams在内部调用带有正确方法签名的方法? 我正在探索解决这个问题的方法,包括编写新的Serde,用消息本身中的模式标识符重新序列化。这可能涉及将数据重新复制到新主题或使用拦截器。
但是,我知道ValueTransformer
可以访问ProcessorContext
中的标头,我想知道是否有更快的方法使用transformValues()
。其想法是首先将该值作为byte[]
读取,然后将该值反序列化为转换器中的Avro类(请参见下面的示例)。但是,当我这样做时,我会得到一个例外。
StreamsBuilder builder = new StreamsBuilder();
final KTable<Long, MySpecificClass> myTable = builder.table(
"my-topic",
Consumed.with(Serdes.Long(), Serdes.ByteArray())
)
.transformValues(MyDeserializerTransformer::new);
...
KTable<Long, JoinResult> joinResultTable = myTable.join(rightTable, MySpecificClass::getJoinKey, myValueJoiner);
joinResultTable.toStream()...
public class MyDeserializerTransformer implements
ValueTransformerWithKey<Long, byte[], MySpecificClass> {
MyAvroDeserializer deserializer;
ProcessorContext context;
@Override
public void init(ProcessorContext context) {
deserializer = new MyAvroDeserializer();
this.context = context;
}
@Override
public MySpecificClass transform(Long key, byte[] value) {
return deserializer.deserialize(context.topic(), context.headers(), value);
}
@Override
public void close() {
}
}
当我运行它时,我收到一个ClassCastException。我如何解决此问题或找到解决方法?我需要使用辅助状态存储吗?
"class": "org.apache.kafka.streams.errors.StreamsException",
"msg": "ClassCastException invoking Processor. Do the Processor's input types match the deserialized types? Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters. Make sure the Processor can accept the deserialized input of type key: java.lang.Long, and value: org.apache.kafka.streams.kstream.internals.Change.
Note that although incorrect Serdes are a common cause of error, the cast exception might have another cause (in user code, for example). For example, if a processor wires in a store, but casts the generics incorrectly, a class cast exception could be raised during processing, but the cause would not be wrong Serdes.",
"stack": [
"org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:185)",
"org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)",
"org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)",
"org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)",
"org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)",
"org.apache.kafka.streams.kstream.internals.KTableTransformValues$KTableTransformValuesProcessor.process(KTableTransformValues.java:117)",
"org.apache.kafka.streams.kstream.internals.KTableTransformValues$KTableTransformValuesProcessor.process(KTableTransformValues.java:87)",
"org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)",
"org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)",
"org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)",
"org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)",
...
"cause": {
"class": "java.lang.ClassCastException",
"msg": "class com.package.MySpecificClass cannot be cast to class [B (com.package.MySpecificClass is in unnamed module of loader 'app'; [B is in module java.base of loader 'bootstrap')",
"stack": [
"org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19)",
"org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.process(ForeignJoinSubscriptionSendProcessorSupplier.java:102)",
"org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.process(ForeignJoinSubscriptionSendProcessorSupplier.java:72)",
"org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)",
"org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)",
"org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)",
"org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)",
推荐答案
我能够通过以下方法解决此问题:首先将输入主题作为KStream读取,然后将其转换为具有不同Serde的KTable。第二步,状态存储似乎遇到了未调用带有标头的序列化程序/反序列化程序方法签名的问题。
这篇关于将Kafka Streams与Serde结合使用,这些Serde依赖于标头中的架构引用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:将Kafka Streams与Serde结合使用,这些Serde依赖于标头中的架构引用
- 将 Java Swing 桌面应用程序国际化的最佳实践是什么? 2022-01-01
- 如何使 JFrame 背景和 JPanel 透明且仅显示图像 2022-01-01
- java.lang.IllegalStateException:Bean 名称“类别"的 BindingResult 和普通目标对象都不能用作请求属性 2022-01-01
- 在 Java 中,如何将 String 转换为 char 或将 char 转换 2022-01-01
- 未找到/usr/local/lib 中的库 2022-01-01
- 如何指定 CORS 的响应标头? 2022-01-01
- 转换 ldap 日期 2022-01-01
- GC_FOR_ALLOC 是否更“严重"?在调查内存使用情况时? 2022-01-01
- 获取数字的最后一位 2022-01-01
- Eclipse 的最佳 XML 编辑器 2022-01-01