spring boot 集成 rabbitmq 、 redis 、 mqtt(mosquitto)、activemq
【摘要】 spring boot 集成rabbitmq、 redis 、 和 mqtt(mosquitto)
一、 添加依赖,编写 application.xml 依赖
<!--添加 r...
spring boot 集成rabbitmq、 redis 、 和 mqtt(mosquitto)
一、 添加依赖,编写 application.xml 依赖
<!--添加 rabbitmq 的依赖-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>http-client</artifactId>
<version>${rabbitmq.http-client}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- 引入redis依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- 缓存的依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-cache</artifactId>
</dependency>
<!--mqtt依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<!--添加activemq的依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.15.8</version>
</dependency>
二、配置自动配置类
1.编写 RabbitConfig 配置类
package com.devframe.common.config;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpManagementOperations;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitManagementTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @ClassName:
* @Description:
* @author DuanZhaoXu
* @data 2019年1月5日上午10:52:32
*/
@Configuration
public class RabbitConfig {
@Bean
@ConfigurationProperties(prefix = "spring.rabbitmq")
public ConnectionFactory connectionFactory() {
return new CachingConnectionFactory();
}
@Bean
public AmqpAdmin AmqpAdmin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setMessageConverter(new Jackson2JsonMessageConverter());
return template;
}
@Bean
public AmqpManagementOperations amqpManagementOperations() {
AmqpManagementOperations amqpManagementOperations = new RabbitManagementTemplate(
"http://192.168.19.200:15672", "admin", "admin@123");
return amqpManagementOperations;
}
@Bean
public Queue mqttQueue() {
return new Queue("mqttQueue", true, false, false);
}
}
2.编写 RedisConfig配置类
package com.devframe.common.config;
import java.io.Serializable;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String, Serializable> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<String, Serializable> redisTemplate = new RedisTemplate<>();
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
redisTemplate.setConnectionFactory(redisConnectionFactory);
return redisTemplate;
}
}
3.编写 MqttSenderConfig 配置类
https://docs.spring.io/spring-integration/docs/5.1.1.RELEASE/reference/html/mqtt.html#mqtt-inbound
package com.devframe.common.config;
import java.util.Arrays;
import java.util.List;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
@Configuration
@IntegrationComponentScan
public class MqttSenderConfig {
@Value("${spring.mqtt.username}")
private String username;
@Value("${spring.mqtt.password}")
private String password;
@Value("${spring.mqtt.url}")
private String hostUrl;
//@Value("${spring.mqtt.client.id}")
private String clientId = String.valueOf(System.currentTimeMillis());
@Value("${spring.mqtt.default.topic}")
private String defaultTopic;
@Value("#{'${spring.mqtt.topics}'.split(',')}")
private List<String> topics ;
@Value("#{'${spring.mqtt.qosValues}'.split(',')}")
private List<Integer> qosValues;
@Bean
public MqttConnectOptions getMqttConnectOptions(){
MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();
mqttConnectOptions.setUserName(username);
mqttConnectOptions.setCleanSession(true);
mqttConnectOptions.setPassword(password.toCharArray());
mqttConnectOptions.setServerURIs(new String[]{hostUrl});
mqttConnectOptions.setKeepAliveInterval(2);
// 设置超时时间 单位为秒
mqttConnectOptions.setConnectionTimeout(10);
mqttConnectOptions.setMaxInflight(100000000);
return mqttConnectOptions;
}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getMqttConnectOptions());
return factory;
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(defaultTopic);
messageHandler.setDefaultRetained(false);
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
//接收通道
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
//配置client,监听的topic
@Bean
public MessageProducer inbound() {
String[] strings = new String[topics.size()];
Integer[] ints = new Integer[qosValues.size()];
topics.toArray(strings);
qosValues.toArray(ints);
int[] its= Arrays.stream(ints).mapToInt(Integer::valueOf).toArray();
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(clientId+"_inbound", mqttClientFactory(),strings);
adapter.setCompletionTimeout(3000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
//通过通道获取数据
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
String type = topic.substring(topic.lastIndexOf("/")+1, topic.length());
// System.out.println(topic+"|"+message.getPayload().toString());
}
};
}
}
4.编写 MqttGateway 发送消息的接口
package com.devframe.common.config;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
void sendToMqtt(String data);
void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}
在 其他的 需要 用到 mqtt 发送消息的时候直接 @Autowired 该 接口 既可进行消息的发送
5、编写 activemq 的 配置类
package com.devframe.common.config;
import javax.jms.Queue;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.pool.PooledConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
@Configuration
public class ActivemqConfig {
// #配置 农机上报的消息转发到的topic名称
// queueName: mqttQueue
// topicName: mqttTopic
//@Value("${spring.queueName}")
public static final String queueName ="mqttQueue";
//@Value("${spring.topicName}")
public static final String topicName ="mqttTopic";
@Value("${spring.activemq.user}")
private String usrName;
@Value("${spring.activemq.password}")
private String password;
@Value("${spring.activemq.broker-url}")
private String brokerUrl;
@Bean
public Queue queue(){
return new ActiveMQQueue(queueName);
}
@Bean
public Topic topic(){
return new ActiveMQTopic(topicName);
}
//配置activemq连接工厂
// @Bean
// public ActiveMQConnectionFactory activeMQConnectionFactory() {
// return new ActiveMQConnectionFactory(usrName, password, brokerUrl);
// }
//配置连接池工厂(在高并发的情况下需要使用池连接工厂,不然当向activemq发送过多的消息时候会报错)
@Bean
public PooledConnectionFactory pooledConnectionFactory() {
return new PooledConnectionFactory(new ActiveMQConnectionFactory(usrName, password, brokerUrl));
}
//配置JmsListenerContainerFactory
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerQueue( ){
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setConnectionFactory(pooledConnectionFactory());
return bean;
}
//配置发布订阅模式的JmsListenerContainerFactory,用于在消费者方指定
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerTopic(){
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
//设置为发布订阅方式, 默认情况下使用的生产消费者方式
bean.setPubSubDomain(true);
bean.setConnectionFactory(pooledConnectionFactory());
return bean;
}
}
三 、AmqpTemplate 、RedisTemplate、MqttGateway ,
1.AmqpTemplate 的使用
package com.devframe.controller;
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.UUID;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpManagementOperations;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.ui.Model;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import com.devframe.entity.OperatEntity;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
@RestController
@Api(tags = "RabbitServerController rabbitmq测试controller")
public class RabbitServerController {
@Autowired
private AmqpTemplate amqpTemplate;
@Autowired
private AmqpManagementOperations amqpManagementOperations;
@Autowired
private AmqpAdmin amqpAdmin;
@RequestMapping(value = "/sendMsg", method = RequestMethod.POST)
public String sendAmqbMsg(Model model, @RequestParam(value = "msg", defaultValue = "hello world!!!") String msg) {
if (model != null && !"".equals(msg)) {
amqpTemplate.convertAndSend("mq.asdfExChange", "mq.asdf.send", msg);
} else {
amqpTemplate.convertAndSend("mq.asdfExChange", "mq.asdf.send", "hello world");
}
amqpManagementOperations.getQueues().forEach(x -> {
System.out.println(x.getName());
});
;
return "success";
}
@RequestMapping(value = "/sendMsg2", method = RequestMethod.POST)
public String sendAmqbMsg2(Model model, @RequestParam(value = "msg", defaultValue = "hello world!!!") String msg) {
if (model != null && !"".equals(msg)) {
amqpTemplate.convertAndSend("mq.asdfExChange", "mq.asdf2.send", "这个世界很奇妙!!!");
} else {
amqpTemplate.convertAndSend("mq.asdfExChange", "mq.asdf2.send", "这个世界很奇妙");
}
return "success";
}
@RequestMapping(value = "/sendMsg3", method = RequestMethod.POST)
public String sendAmqbMsg3(Model model, @RequestParam(value = "msg", defaultValue = "hello world!!!") String msg) {
if (model != null && !"".equals(msg)) {
amqpTemplate.convertAndSend("mq.qwerExChange", "mq.qwer.send", "神奇的世界!!!");
} else {
amqpTemplate.convertAndSend("mq.qwerExChange", "mq.qwer.send", "神奇的世界");
}
return "success";
}
@RequestMapping(value = "/helloWorld", method = RequestMethod.POST)
@ApiOperation("1对1 or 1对多 无交换机模式")
public String helloWorld(Model model, @RequestParam(value = "msg", defaultValue = "1对1,无交换机模式!") String msg) {
if (model != null && !"".equals(msg)) {
amqpTemplate.convertAndSend("mqttQueue", msg);
}
return "success";
}
@RequestMapping(value = "/pubSub", method = RequestMethod.POST)
@ApiOperation("广播发布/订阅模式")
public String pubSub(Model model, @RequestParam(value = "msg", defaultValue = "广播发布/订阅模式") String msg) {
if (model != null && !"".equals(msg)) {
// 广播模式对于路由无效,所有的消费者都可以获取都消息
amqpTemplate.convertAndSend("pubSubExchange", "", msg);
}
return "success";
}
@RequestMapping(value = "/routing", method = RequestMethod.POST)
@ApiOperation("路由消息模式")
public String routing(Model model, @RequestParam(value = "msg", defaultValue = "路由消息模式") String msg) {
if (model != null && !"".equals(msg)) {
String[] infoTyp = new String[] { "info", "warn", "error" };
for (String routing : infoTyp) {
amqpTemplate.convertAndSend("routingExchange", routing, msg);
}
}
return "success";
}
@RequestMapping(value = "/topicMatch", method = RequestMethod.POST)
@ApiOperation("主题模式")
public String topicModal(Model model, @RequestParam(value = "msg", defaultValue = "主题模式") String msg) {
if (model != null && !"".equals(msg)) {
String[] infoTyp = new String[] { "P.123.asdasd", "P.456.JQBE", "P.789.WBD", "P.ASBDJBAS" };
for (String routing : infoTyp) {
amqpTemplate.convertAndSend("topicExchange", routing, msg);
}
}
return "success";
}
@RequestMapping(value = "/headerModal", method = RequestMethod.POST)
@ApiOperation("header模式")
public String headerModal(Model model, @RequestParam(value = "msg", defaultValue = "主题模式") String msg)
throws UnsupportedEncodingException {
if (model != null && !"".equals(msg)) {
Map<String, Object> map = new HashMap<String, Object>();
map.put("abc", "nb");
map.put("def", "pl");
map.put("jabs", "aksd");
for (Entry<String, Object> entry : map.entrySet()) {
MessageProperties messageProperties = new MessageProperties();
messageProperties.setHeader(entry.getKey(), entry.getValue());
Message message = new Message(msg.getBytes("utf-8"), messageProperties);
amqpTemplate.convertAndSend("headerExchange", null, message);
}
}
return "success";
}
@RequestMapping(value = "/createTaskQueue", method = RequestMethod.POST)
@ApiOperation("自动创建队列并发送消息到队列")
public String createTaskQueue(Model model, @RequestParam(value = "msg", defaultValue = "主题模式") String msg,
@RequestParam(name = "queueName", defaultValue = "ff008") String queueName)
throws UnsupportedEncodingException {
if (model != null && !"".equals(msg)) {
amqpAdmin.declareQueue(new Queue(queueName, true, false, false));
OperatEntity operatEntity = new OperatEntity();
operatEntity.setId("123456");
operatEntity.setIndepeid("askdnad");
operatEntity.setRecordid(msg);
operatEntity.setTablename(msg);
amqpTemplate.convertAndSend(queueName, operatEntity);
// Object object = amqpTemplate.receiveAndConvert(queueName);
// if(object instanceof OperatEntity){
// OperatEntity operatEntity2 = (OperatEntity)object;
// System.out.println("从队列"+queueName+"收到了一个【"+operatEntity2.toString()+"】");
// System.out.println(operatEntity.getId());
// System.out.println(operatEntity.getIndepeid());
// System.out.println(operatEntity.getRecordid());
// System.out.println(operatEntity.getTablename());
// }else{
// String result = (String)object;
// System.out.println("从队列"+queueName+"收到了一个【"+result+"】");
// }
}
return "success";
}
/**
* 删除以 任务名称为前缀的队列
*
* @param queueNamePre
* @return String
*/
@RequestMapping(value = "/deleteQueue", method = RequestMethod.POST)
@ApiOperation("删除以 任务名称为前缀的队列")
public String deleteQueueWithPre(String queueNamePre) {
List<Queue> queues = amqpManagementOperations.getQueues();
for (Queue queue : queues) {
if (queue.getName().startsWith(queueNamePre)) {
amqpManagementOperations.deleteQueue(queue);
}
}
return "success";
}
@RequestMapping(value = "/createExchangeBindTaskQueue", method = RequestMethod.POST)
@ApiOperation("自动创建交换机并绑定队列")
public String createExchangeBindTaskQueue(Model model,
@RequestParam(value = "msg", defaultValue = "主题模式") String msg,
@RequestParam(name = "exchangeName", defaultValue = "ff008") String exchangeName)
throws UnsupportedEncodingException {
if (model != null && !"".equals(msg)) {
// 查询交换机是否存在
// Exchange exchange =
// amqpManagementOperations.getExchange(exchangeName);
// if(exchange==null){ //如果不存在 ,则声明该交换机
String randomNum = UUID.randomUUID().toString().substring(0, 8);
Exchange exchange = ExchangeBuilder.directExchange(exchangeName).durable(true).build();
Queue queue = QueueBuilder.durable(exchangeName + "-" + randomNum).build();
amqpAdmin.declareExchange(exchange);
amqpAdmin.declareQueue(queue);
// }
// 否则直接 将 该队列绑定到 交换机上面,routingkey 为 生成的8位随机数
amqpAdmin.declareBinding(BindingBuilder.bind(queue).to(exchange).with(randomNum).noargs());
amqpTemplate.convertAndSend(exchangeName, randomNum, msg);
String result = (String) amqpTemplate.receiveAndConvert(queue.getName());
System.out.println(result);
}
return "success";
}
/**
* 删除以 任务名称的交换机
*
* @param queueNamePre
* @return String
*/
@RequestMapping(value = "/deleteExchange", method = RequestMethod.POST)
@ApiOperation("删除以 任务名称的交换机")
public String deleteExchange(String exchangeName) {
// Map<String, Object> map =
// amqpManagementOperations.getExchange(exchangeName).getArguments();
List<Binding> bindings = amqpManagementOperations.getBindingsForExchange("/", exchangeName);
for (Binding binding : bindings) {
String routingkey = binding.getRoutingKey();
amqpAdmin.deleteQueue(exchangeName + "-" + routingkey);
}
amqpAdmin.deleteExchange(exchangeName);
return "success";
}
// rpc调用未实现
}
2. RedisTemplate 的使用
package com.devframe.common.util;
import java.util.List;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
@Component
public class RedisTemplateUtils {
@Autowired
private StringRedisTemplate stringRedisTemplate;
// @Autowired
// private RedisTemplate<String, Serializable> redisTemplate;
public static final int LOCK_TIMEOUT = 4;
// ---------------value 为String 的 操作
/**
* set 字符串的 key,value
*
* @param key
* @param value
*/
public void set(String key, String value) {
stringRedisTemplate.opsForValue().set(key, value);
}
/**
* 根据key获取字符串的 value
*
* @param key
*/
public String get(String key) {
return stringRedisTemplate.opsForValue().get(key);
}
/**
* 根据 key 删除
*
* @param key
*/
public void del(String key) {
stringRedisTemplate.opsForValue().getOperations().delete(key);
}
// ---------------value 为 List 的操作
/**
* push 元素到 list中
*/
public void lpush(String key,String value) {
stringRedisTemplate.opsForList().leftPush(key, value);
}
/**
* 获取 list中 某个下标的元素
* @param key
* @param index
*/
public String lindex(String key,long index) {
return stringRedisTemplate.opsForList().index(key, index);
}
/**
* 根据key 获取 list的长度
* @param key
* @return
*/
public long llen(String key) {
return stringRedisTemplate.opsForList().size(key);
}
/**
* 根据 key,start,end 获取某一个区间的 list数据集
* @param key
* @param start
* @param end
* @return
*/
public List<String> lrange(String key, long start, long end){
return stringRedisTemplate.opsForList().range(key, start, end);
}
/**
* 根据 key值 pattern查询所有匹配的值,比如login*
* @param key
* @return
*/
public Set<String> keys(String key){
return stringRedisTemplate.keys(key);
}
/**
* 加锁
*
* @param key productId - 商品的唯一标志
* @param value 当前时间+超时时间 也就是时间戳
* @return
*/
public boolean lock(String key,String value) {
if (stringRedisTemplate.opsForValue().setIfAbsent(key, value)) {// 对应setnx命令
// 可以成功设置,也就是key不存在
return true;
}
// 判断锁超时 - 防止原来的操作异常,没有运行解锁操作 防止死锁
String currentValue = stringRedisTemplate.opsForValue().get(key);
// 如果锁过期
if (!StringUtils.isEmpty(currentValue) && Long.parseLong(currentValue) < System.currentTimeMillis()) {// currentValue不为空且小于当前时间
// 获取上一个锁的时间value
String oldValue = stringRedisTemplate.opsForValue().getAndSet(key, value);// 对应getset,如果key存在
// 假设两个线程同时进来这里,因为key被占用了,而且锁过期了。获取的值currentValue=A(get取的旧的值肯定是一样的),两个线程的value都是B,key都是K.锁时间已经过期了。
// 而这里面的getAndSet一次只会一个执行,也就是一个执行之后,上一个的value已经变成了B。只有一个线程获取的上一个值会是A,另一个线程拿到的值是B。
if (!StringUtils.isEmpty(oldValue) && oldValue.equals(currentValue)) {
// oldValue不为空且oldValue等于currentValue,也就是校验是不是上个对应的商品时间戳,也是防止并发
return true;
}
}
return false;
}
/**
* 解锁
*
* @param key
* @param value
*/
public void unlock(String key,String value) {
try {
String currentValue = stringRedisTemplate.opsForValue().get(key);
if (!StringUtils.isEmpty(currentValue) && currentValue.equals(value)) {
stringRedisTemplate.opsForValue().getOperations().delete(key);// 删除key
}
} catch (Exception e) {
Log.error(e);
}
}
}
3.MqttGateway 的使用
@Autowired
private MqttGateway mqttGateway;
@RequestMapping("/sendMsg")
public String sendMsg(String sendData,String topic){
try {
mqttGateway.sendToMqtt(sendData,topic);
mqttGateway.sendToMqtt(topic, 0, sendData);
return "发送成功";
}catch (Exception e) {
// TODO: handle exception
Log.error(e);
return "发送失败";
}
}
4、jmsMessagingTemplate 的使用
@@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
jmsMessagingTemplate.convertAndSend(new ActiveMQTopic(ActivemqConfig.topicName), payload);
文章来源: blog.csdn.net,作者:血煞风雨城2018,版权归原作者所有,如需转载,请联系作者。
原文链接:blog.csdn.net/qq_31905135/article/details/86233738
【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
作者其他文章
评论(0)