Using Kafka Streams with Serdes that rely on schema references in the Headers(将Kafka Streams与Serde结合使用,这些Serde依赖于标头中的架构引用)
问题描述
我正在尝试使用Kafka Streams对CDC数据执行KTable-KTable外键联接。我将读取的数据是Avro格式的,但是它被序列化的方式与其他行业序列化程序/反序列化程序(例如。合流架构注册表),因为架构标识符存储在标头中。
当我设置KTables的Serdes时,我的Kafka Streams应用程序最初运行,但最终失败,因为它在内部调用了带有byte[] serialize(String topic, T data);
的序列化程序方法,而不是带有标头的方法(即。byte[] serialize(String topic, Headers headers, T data)
在包装序列化程序ValueAndTimestampSerializer中。我正在使用的Serdes无法处理此问题并引发异常。
第一个问题是,有没有人知道如何恳求Kafka Streams在内部调用带有正确方法签名的方法? 我正在探索解决这个问题的方法,包括编写新的Serde,用消息本身中的模式标识符重新序列化。这可能涉及将数据重新复制到新主题或使用拦截器。
但是,我知道ValueTransformer
可以访问ProcessorContext
中的标头,我想知道是否有更快的方法使用transformValues()
。其想法是首先将该值作为byte[]
读取,然后将该值反序列化为转换器中的Avro类(请参见下面的示例)。但是,当我这样做时,我会得到一个例外。
当我运行它时,我收到一个ClassCastException。我如何解决此问题或找到解决方法?我需要使用辅助状态存储吗?
推荐答案
我能够通过以下方法解决此问题:首先将输入主题作为KStream读取,然后将其转换为具有不同Serde的KTable。第二步,状态存储似乎遇到了未调用带有标头的序列化程序/反序列化程序方法签名的问题。
这篇关于将Kafka Streams与Serde结合使用,这些Serde依赖于标头中的架构引用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!