How to change timestamp of records?(如何更改记录的时间戳?)
问题描述
我正在使用FluentD(v.12最后一个稳定版本)向Kafka发送消息。但FluentD使用的是旧的KafkaProducer,因此记录时间戳始终设置为-1。 因此,我必须使用WallclockTimestampExtractor将记录的时间戳设置为消息到达Kafka时的时间点。
是否有特定于Kafka Streams的解决方案?
我真正感兴趣的时间戳是由fluentd在消息中发送的:
";timestamp";:";1507885936";,&Quot;主机:&Quot;V.X.Y.Z.
以卡夫卡表示的记录:
偏移量=0,时间戳=-1,键=空,值={";timestamp";:";1507885936";,;主机;:V.X.Y.Z.&Quot;}
我希望有这样一张卡夫卡唱片:
OFFSET=0,TIMESTAMP=1507885936,KEY=NULL,VALUE={";timestamp";:";1507885936";,;HOST&QOT;:&QOT;V.X.Y.Z.&QOT;}
我的解决方法如下所示:
编写消费者提取时间戳(https://kafka.apache.org/0110/javadoc/org/apache/kafka/streams/processor/TimestampExtractor.html)
编写一个生产者,生成一个时间戳设置为(ProducerRecord(字符串主题,整数分区,长时间戳,K键,V值)的新记录)
我更喜欢KafkaStreams解决方案(如果有)。
推荐答案
您可以编写非常简单的Kafka Streams应用程序,如下所示:
KStreamBuilder builder = new KStreamBuilder();
builder.stream("input-topic").to("output-topic");
并使用从记录中提取时间戳并返回时间戳的自定义TimestampExtractor
配置应用程序。
Kafka Streams在将记录写回Kafka时将使用返回的时间戳。
注意:如果您有乱序数据--即时间戳没有严格排序--结果也将包含乱序时间戳。Kafka Streams使用返回的时间戳回写Kafka(即,无论提取程序返回什么,都用作记录元数据时间戳)。请注意,在写入时,当前处理的输入记录中的时间戳用于所有生成的输出记录--这适用于版本1.0,但在将来的版本中可能会更改。)。
更新:
一般来说,您可以通过处理器API修改时间戳。调用context.forward()
可以通过To.all().withTimestamp(...)
将输出记录时间戳设置为forward()
的参数。
这篇关于如何更改记录的时间戳?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:如何更改记录的时间戳?
- java.lang.IllegalStateException:Bean 名称“类别"的 BindingResult 和普通目标对象都不能用作请求属性 2022-01-01
- 未找到/usr/local/lib 中的库 2022-01-01
- 如何指定 CORS 的响应标头? 2022-01-01
- 如何使 JFrame 背景和 JPanel 透明且仅显示图像 2022-01-01
- 获取数字的最后一位 2022-01-01
- 转换 ldap 日期 2022-01-01
- Eclipse 的最佳 XML 编辑器 2022-01-01
- GC_FOR_ALLOC 是否更“严重"?在调查内存使用情况时? 2022-01-01
- 在 Java 中,如何将 String 转换为 char 或将 char 转换 2022-01-01
- 将 Java Swing 桌面应用程序国际化的最佳实践是什么? 2022-01-01