使用 Spring AMQP 接收和发送 Java 对象

Receive and Send Java Objects with Spring AMQP(使用 Spring AMQP 接收和发送 Java 对象)

本文介绍了使用 Spring AMQP 接收和发送 Java 对象的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想实现 Spring AMQP 示例,以使用侦听器发送和接收 Java 对象.我试过这个:

I want to implement Spring AMQP example for sending and receiving Java Objects using listener. I tried this:

发送 Java 对象

ConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
AmqpAdmin admin = new RabbitAdmin(connectionFactory);
admin.declareBinding(BindingBuilder.bind(new Queue(QUEUE_PROCESSING_TRANSACTION, false)).to(new TopicExchange(EXCHANGE_PROCESSING)).with(ROUTING_KEY_PROCESSING_TRANSACTION));              
AmqpTemplate template = new RabbitTemplate(connectionFactory);

TransactionsBean obj = new TransactionsBean();
obj.setId(Long.valueOf(111222333));

接收并发送回另一个 Java 对象:

Receive and send Back another Java Object:

ConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
AmqpAdmin admin = new RabbitAdmin(connectionFactory);
admin.declareBinding(BindingBuilder.bind(new Queue(QUEUE_PROCESSING_TRANSACTION, false))
                .to(new TopicExchange(EXCHANGE_PROCESSING)).with(ROUTING_KEY_PROCESSING_TRANSACTION));
AmqpTemplate template = new RabbitTemplate(connectionFactory);

TransactionsBean obj = (TransactionsBean) template.receiveAndConvert(QUEUE_PROCESSING_TRANSACTION);
System.out.println(" !!!!!!! Received id " + obj.getTransaction_id());

SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueues(new Queue(QUEUE_PROCESSING_TRANSACTION, false));

container.setMessageListener(new MessageListener() {
  @Override
  public void onMessage(Message message) {
    // Receive here Java object and send back another object
  }
});

你能告诉我如何在没有复杂注释的情况下扩展代码,只需要简单的侦听器吗?

Can you show me how to extend the code without complex annotations just simple listeners?

推荐答案

最简单的方法是使用 @RabbitListener - 使用 Spring Boot 时更容易,因为他会连接基础设施 bean(模板、管理员等).

The simplest way is to use a @RabbitListener - made even easier when using Spring Boot since he will wire up infrastructure beans (template, admin, etc).

@SpringBootApplication
public class So51009346Application {

    public static final String QUEUE_PROCESSING_TRANSACTION = "q1";

    public static void main(String[] args) {
        SpringApplication.run(So51009346Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(RabbitTemplate template) {
        return args -> {
            ReplyObject reply = (ReplyObject) template.convertSendAndReceive("ex", "rk", new RequestObject());
            System.out.println(reply);
        };
    }

    @Bean
    public Queue queue() {
        return new Queue(QUEUE_PROCESSING_TRANSACTION);
    }

    @Bean
    public TopicExchange te() {
        return new TopicExchange("ex");
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(te()).with("rk");
    }

}

class RequestObject implements Serializable {

    private static final long serialVersionUID = 1L;

}

class ReplyObject implements Serializable {

    private static final long serialVersionUID = 1L;

}

@Component
class Listener {

    @RabbitListener(queues = So51009346Application.QUEUE_PROCESSING_TRANSACTION)
    public ReplyObject process(RequestObject ro) {
        return new ReplyObject();
    }

}

如果您出于某种原因不想使用该注释,您可以使用 MessageListenerAdapter 连接一个容器...

If you don't want to use that annotation for some reason, you can wire up a container using a MessageListenerAdapter...

@SpringBootApplication
public class So51009346Application {

    public static final String QUEUE_PROCESSING_TRANSACTION = "q1";

    public static void main(String[] args) {
        SpringApplication.run(So51009346Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(RabbitTemplate template) {
        return args -> {
            ReplyObject reply = (ReplyObject) template.convertSendAndReceive("ex", "rk", new RequestObject());
            System.out.println(reply);
        };
    }

    @Bean
    public SimpleMessageListenerContainer container(ConnectionFactory cf, Listener listener) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cf);
        container.setQueueNames(QUEUE_PROCESSING_TRANSACTION);
        container.setMessageListener(new MessageListenerAdapter(listener, "process"));
        return container;
    }

    @Bean
    public Queue queue() {
        return new Queue(QUEUE_PROCESSING_TRANSACTION);
    }

    @Bean
    public TopicExchange te() {
        return new TopicExchange("ex");
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(te()).with("rk");
    }

}

class RequestObject implements Serializable {

    private static final long serialVersionUID = 1L;

}

class ReplyObject implements Serializable {

    private static final long serialVersionUID = 1L;

}

@Component
class Listener {

    public ReplyObject process(RequestObject ro) {
        return new ReplyObject();
    }

}

当然,您可以自己连接容器,就像在您的问题中一样,使用适配器,但通常最好让 Spring 将其作为 @Bean 进行管理,否则您会错过一些功能(例如失败的事件发布,空闲容器).适配器获取对您的请求/回复侦听器的引用以及要调用的方法名称.

You can, of course, wire up the container yourself, as in your question, using the adapter, but it's generally better to let Spring manage it as a @Bean or you will miss some functionality (e.g. event publishing for failures, idle container). The adapter gets a reference to your request/reply listener and the method name to call.

这篇关于使用 Spring AMQP 接收和发送 Java 对象的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!

本文标题为:使用 Spring AMQP 接收和发送 Java 对象