SpringBoot整合ActiveMQ消息队列

举报
小毕超 发表于 2022/06/24 22:18:04 2022/06/24
【摘要】 SpringBoot ActiveMQ ActiveMQpom.xmlapplication.propertiesActiveMQConfig.java(发布订阅)消息提供者ProducerSer...

ActiveMQ

Apache ActiveMQ是Apache软件基金会所研发的开放源代码消息中间件;由于ActiveMQ是一个纯Java程序,因此只需要操作系统支持Java虚拟机,ActiveMQ便可执行。
下面是SpringBoot 集成ActiveMQ实现点对点以及发布订阅。

pom.xml

 <!--        activemq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>

        <!--        <dependency>-->
        <!--            <groupId>org.apache.activemq</groupId>-->
        <!--            <artifactId>activemq-pool</artifactId>-->
        <!--        </dependency>-->

        <dependency>
            <groupId>org.messaginghub</groupId>
            <artifactId>pooled-jms</artifactId>
            <version>1.0.3</version>
        </dependency>

application.properties

#activemq
queueName=common.queue
topicName=video.topic
spring.activemq.broker-url=tcp://localhost:61616
#集群配置
#spring.activemq.broker-url=failover:(tcp://localhost:61616,tcp://localhost:61617)
spring.activemq.user=admin
spring.activemq.password=admin
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=100

ActiveMQConfig.java(发布订阅)

@Configuration
@SuppressWarnings({"all"})
public class ActiveMQConfig {
    @Value("${queueName}")
    private String queueName;

    @Value("${topicName}")
    private String topicName;


    @Bean
    public Queue queue(){
        return new ActiveMQQueue(queueName);
    }

    @Bean
    public Topic topic(){
        return new ActiveMQTopic(topicName);
    }

    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) {
        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        bean.setPubSubDomain(true);
        bean.setConnectionFactory(activeMQConnectionFactory);
        return bean;
    }
}

消息提供者ProducerService.java

public interface ProducerService {

	public boolean sendMessage(Destination destination,String message);

	public boolean sendMessage(String message);

	public boolean publish(String msg);
}

ProducerServiceImpl.java

@Service
public class ProducerServiceImpl implements ProducerService {
	@Autowired
	private Queue queue;

	@Autowired
	private Topic topic;

	@Autowired
	JmsMessagingTemplate jmsMessagingTemplate;

	private Logger logger = LoggerFactory.getLogger(this.getClass());

	@Override
	public boolean sendMessage(Destination destination, String message) {
		try{
			jmsMessagingTemplate.convertAndSend(destination,message);
			return true;
		}catch (Exception e){
			e.printStackTrace();
			logger.info(e.toString());
		}
		return false;
	}

	@Override
	public boolean sendMessage(String message) {
		try {
			jmsMessagingTemplate.convertAndSend(queue,message);
			return true;
		}catch (Exception e){
			e.printStackTrace();
			logger.info(e.toString());
		}
		return false;
	}

	@Override
	public boolean publish(String msg) {
		try{
			jmsMessagingTemplate.convertAndSend(topic,msg);
			return true;
		}catch (Exception e){
			e.printStackTrace();
		}
		return false;
	}
	 	
}

消息接收者

点对点

@JmsListener(destination="common.queue")
	public void receiveQueue(String text){
		System.out.println("CommonConsumer收到的报文为:"+text);
	}
@JmsListener(destination="order.queue")
	public void receiveQueue(String text){
		System.out.println("OrderConsumer收到的报文为:"+text);
	}

订阅

	@JmsListener(destination="video.topic", containerFactory="jmsListenerContainerTopic",id = "1")
	public void receive1(String text){
		System.out.println("video.topic 消费者:receive1="+text);
	}
	
	
	@JmsListener(destination="video.topic", containerFactory="jmsListenerContainerTopic",id = "3")
	public void receive2(String text){
		System.out.println("video.topic 消费者:receive2="+text);
	}
	
	
	@JmsListener(destination="video.topic", containerFactory="jmsListenerContainerTopic",id = "2")
	public void receive3(String text){
		System.out.println("video.topic 消费者:receive3="+text);
	}

启动类

@EnableJms

测试ActiveController

@RestController
@RequestMapping("/avtivemq")
public class ActiveController {
	@Autowired
	private ProducerService producerService;


	@GetMapping("/AddMq")
	public String AddMq(){
		Destination destination = new ActiveMQQueue("commue.abc");
		boolean result = producerService.sendMessage(destination,"你哈啊");
		if (!result){
			return "fail";
		}
		return "success";
	}

	@GetMapping("/AddMqComm")
	public String AddMqComm(){
		boolean result =producerService.sendMessage("你哈啊Comm");
		if (!result){
			return "fail";
		}
		return "success";
	}

	@GetMapping("/AddMqAll")
	public String AddMqAll(){
		boolean result =producerService.publish("你哈啊Comm");
		if (!result){
			return "fail";
		}
		return "success";
	}
	
}

文章来源: blog.csdn.net,作者:小毕超,版权归原作者所有,如需转载,请联系作者。

原文链接:blog.csdn.net/qq_43692950/article/details/107442701

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。