How to use @Transactional with @KafkaListener?(如何将@Transaction与@KafkaListener一起使用?)
问题描述
有没有可能将声明性TX管理(通过@Transaction)与@KafkaListener注释方法一起使用? 例如,我想使用它来为每个监听器定义单独的发送超时。 我的设置如下:
TransactionManager:
@Bean
@ConditionalOnBean(value = {HibernateTransactionManager.class})
public ChainedKafkaTransactionManager<Object, Object> chainedHibernateTm(KafkaTransactionManager<String, String> kafkaTransactionManager,
org.springframework.orm.hibernate5.HibernateTransactionManager hibernateTransactionManager) {
return new ChainedKafkaTransactionManager<>(
kafkaTransactionManager,
hibernateTransactionManager);
}
KafkaListener:
@KafkaListener(topic = "my_topic")
@Transactional(timeout = 5)
public void handleMessage(SomeMessage message){
}
问题是-KafkaMessageListenerContainer在调用此类方法之前创建自己的事务-它使用自己的TransactionTemplate:
@Nullable
private TransactionTemplate determineTransactionTemplate() {
return this.transactionManager != null
? new TransactionTemplate(this.transactionManager)
: null;
}
未使用TransactionInterceptor。那么具体的@KafkaListener方法如何设置具体的TX超时时间呢?
推荐答案
可以这样做,但有点复杂,因为您必须将消耗的偏移量发送到Kafka交易。
不使用ChainedKafkaTransactionManager
,您可以为容器使用KafkaTransactionManager
,为HibernateTransactionManager
使用@Transactional
。
这将产生类似的结果,因为Hibernate Tx将在Kafka事务之前提交(如果Hibernate提交失败,则Kafka Tx将回滚)。
编辑
若要将不同的链式TM配置到每个侦听器容器中,可以执行以下操作。
@组件 类ContainerFactoryCustomizer{ContainerFactoryCustomizer(AbstractKafkaListenerContainerFactory<?, ?, ?> factory,
ChainedKafkaTransactionManager<?, ?> chainedOne,
ChainedKafkaTransactionManager<?, ?> chainedTwo) {
factory.setContainerCustomizer(
container -> {
String groupId = container.getContainerProperties().getGroupId();
if (groupId.equals("foo")) {
container.getContainerProperties().setTransactionManager(chainedOne);
}
else {
container.getContainerProperties().setTransactionManager(chainedTwo);
}
});
}
}
Where each chained TM has a Hibernate TM with a different default timeout.
The `groupid` is populated from the `@KafkaListener` `id` or `groupId` property.
这篇关于如何将@Transaction与@KafkaListener一起使用?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:如何将@Transaction与@KafkaListener一起使用?
- 转换 ldap 日期 2022-01-01
- 如何指定 CORS 的响应标头? 2022-01-01
- java.lang.IllegalStateException:Bean 名称“类别"的 BindingResult 和普通目标对象都不能用作请求属性 2022-01-01
- 在 Java 中,如何将 String 转换为 char 或将 char 转换 2022-01-01
- Eclipse 的最佳 XML 编辑器 2022-01-01
- GC_FOR_ALLOC 是否更“严重"?在调查内存使用情况时? 2022-01-01
- 获取数字的最后一位 2022-01-01
- 如何使 JFrame 背景和 JPanel 透明且仅显示图像 2022-01-01
- 将 Java Swing 桌面应用程序国际化的最佳实践是什么? 2022-01-01
- 未找到/usr/local/lib 中的库 2022-01-01