... 2020-07-19 15:55:38.729 INFO 13484 --- [ main] org.koorye.test.TestRabbit : Started TestRabbit in 6.182 seconds (JVM running for 7.474) [ INFO ] Get message: Hello world! 2020-07-19 15:55:39.166 INFO 13484 --- [extShutdownHook] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for workers to finish. ...
@RabbitListener(queuesToDeclare = @Queue(value = "hello world")) publicvoidconsumer2(String msg) { System.out.println("[ INFO ] Consumer2 get message: " + msg); } }
这次我们循环发送10条消息:
1 2 3 4 5
@Test publicvoidsendMsg() { for (inti=0; i < 10; ++i) rabbitTemplate.convertAndSend("hello world", "Hello world!"); }
运行:
1 2 3 4 5 6 7 8 9 10 11 12 13
[ INFO ] Consumer1 get message: Hello world! [ INFO ] Consumer2 get message: Hello world! [ INFO ] Consumer2 get message: Hello world! [ INFO ] Consumer1 get message: Hello world! [ INFO ] Consumer2 get message: Hello world! [ INFO ] Consumer2 get message: Hello world! [ INFO ] Consumer1 get message: Hello world! [ INFO ] Consumer2 get message: Hello world! [ INFO ] Consumer1 get message: Hello world! [ INFO ] Consumer1 get message: Hello world! ...
Process finished with exit code 0
发布 / 订阅模型
Publisher
使用convertAndSend的第二种重载,不指定路由键:
1 2 3 4 5
@Test publicvoidsendMsg() { for (inti=0; i < 10; ++i) rabbitTemplate.convertAndSend("logs","", "Hello world!"); }
Consumer
使用@QueueBingding绑定队列和交换机。
使用@Queue(后不跟内容)来声明一个临时队列,测试完成即删除
使用@Exchange声明一个交换机,value指定名字,type指定类型为fanout
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
@RabbitListener(bindings = { @QueueBinding( value = @Queue, exchange = @Exchange(value = "logs", type = "fanout") )}) publicvoidconsumer1(String msg) { System.out.println("[ INFO ] Consumer1 get message: " + msg); }
@RabbitListener(bindings = { @QueueBinding( value = @Queue, exchange = @Exchange(value = "logs", type = "fanout") )}) publicvoidconsumer2(String msg) { System.out.println("[ INFO ] Consumer2 get message: " + msg); }
[ INFO ] Consumer1 get message: Hello world! [ INFO ] Consumer2 get message: Hello world! [ INFO ] Consumer1 get message: Hello world! [ INFO ] Consumer2 get message: Hello world! [ INFO ] Consumer1 get message: Hello world! [ INFO ] Consumer2 get message: Hello world! [ INFO ] Consumer1 get message: Hello world! [ INFO ] Consumer2 get message: Hello world! [ INFO ] Consumer1 get message: Hello world! [ INFO ] Consumer1 get message: Hello world! [ INFO ] Consumer2 get message: Hello world! [ INFO ] Consumer1 get message: Hello world! [ INFO ] Consumer2 get message: Hello world! [ INFO ] Consumer1 get message: Hello world! [ INFO ] Consumer2 get message: Hello world! [ INFO ] Consumer1 get message: Hello world! [ INFO ] Consumer2 get message: Hello world! [ INFO ] Consumer1 get message: Hello world! [ INFO ] Consumer2 get message: Hello world! [ INFO ] Consumer2 get message: Hello world!
Process finished with exit code 0
测试成功。
路由(直连)模型
Publisher
在发布订阅模型的基础上,发送时指定路由键即可。
1 2 3 4 5
@Test publicvoidsendMsg() { rabbitTemplate.convertAndSend("logs", "info", "This is an info."); rabbitTemplate.convertAndSend("logs", "err", "This is an err."); }
Consumer
在@QueueBinding的key中指定路由键,以花括号包围字符串。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
@RabbitListener(bindings = { @QueueBinding( value = @Queue, exchange = @Exchange(value = "logs", type = "direct"), key = {"info"} )}) publicvoidconsumer1(String msg) { System.out.println("[ INFO ] Consumer1 get message: " + msg); }
@RabbitListener(bindings = { @QueueBinding( value = @Queue, exchange = @Exchange(value = "logs", type = "direct"), key = {"info", "err"} )}) publicvoidconsumer2(String msg) { System.out.println("[ INFO ] Consumer2 get message: " + msg); }
运行,记得先删除之前的交换机:
1 2 3 4 5 6
[ INFO ] Consumer2 get message: This is an info. [ INFO ] Consumer1 get message: This is an info. [ INFO ] Consumer2 get message: This is an err. ...
Process finished with exit code 0
测试成功。
主题(通配符)模式
Publisher
1 2 3 4 5 6
@Test publicvoidsendMsg() { rabbitTemplate.convertAndSend("logs", "info", "This is an info."); rabbitTemplate.convertAndSend("logs", "err", "This is an err."); rabbitTemplate.convertAndSend("logs", "info.err", "This is an info and err."); }
@RabbitListener(bindings = { @QueueBinding( value = @Queue, exchange = @Exchange(value = "logs", type = "topic"), key = {"#.info.#"} )}) publicvoidconsumer1(String msg) { System.out.println("[ INFO ] Consumer1 get message: " + msg); }
@RabbitListener(bindings = { @QueueBinding( value = @Queue, exchange = @Exchange(value = "logs", type = "topic"), key = {"#.err.#"} )}) publicvoidconsumer2(String msg) { System.out.println("[ INFO ] Consumer2 get message: " + msg); }
@RabbitListener(bindings = { @QueueBinding( value = @Queue, exchange = @Exchange(value = "logs", type = "topic"), key = {"info.#.err.#"} )}) publicvoidconsumer3(String msg) { System.out.println("[ INFO ] Consumer3 get message: " + msg); }
运行:
1 2 3 4 5 6 7 8
[ INFO ] Consumer3 get message: This is an info and err. [ INFO ] Consumer1 get message: This is an info. [ INFO ] Consumer2 get message: This is an err. [ INFO ] Consumer1 get message: This is an info and err. [ INFO ] Consumer2 get message: This is an info and err. ...