spring boot 集成 rabbitmq 、 redis 、 mqtt(mosquitto)、activemq

举报
小米粒-biubiubiu 发表于 2020/12/03 00:31:06 2020/12/03
5.6k+ 0 0
【摘要】                      spring boot  集成rabbitmq、  redis 、 和  mqtt(mosquitto) 一、 添加依赖,编写 application.xml 依赖 <!--添加 r...

                     spring boot  集成rabbitmq、  redis 、 和  mqtt(mosquitto)

一、 添加依赖,编写 application.xml 依赖


      <!--添加 rabbitmq 的依赖-->
      <dependency>
     			<groupId>com.rabbitmq</groupId>
     			<artifactId>http-client</artifactId>
     			<version>${rabbitmq.http-client}</version>
     		</dependency>
     		<dependency>
     			<groupId>org.springframework.boot</groupId>
     			<artifactId>spring-boot-starter-amqp</artifactId>
     		</dependency>
     		<!-- 引入redis依赖 -->
     		<dependency>
     			<groupId>org.springframework.boot</groupId>
     			<artifactId>spring-boot-starter-data-redis</artifactId>
     		</dependency>
     		<!-- 缓存的依赖 -->
     		<dependency>
     			<groupId>org.springframework.boot</groupId>
     			<artifactId>spring-boot-starter-cache</artifactId>
     		</dependency>
      <!--mqtt依赖-->
     		<dependency>
     			<groupId>org.springframework.boot</groupId>
     			<artifactId>spring-boot-starter-integration</artifactId>
     		</dependency>
     		<dependency>
     			<groupId>org.springframework.integration</groupId>
     			<artifactId>spring-integration-stream</artifactId>
     		</dependency>
     		<dependency>
     			<groupId>org.springframework.integration</groupId>
     			<artifactId>spring-integration-mqtt</artifactId>
     		</dependency>
      <!--添加activemq的依赖-->
      <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-activemq</artifactId>
      </dependency>
      <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-pool</artifactId>
      <version>5.15.8</version>
      </dependency>
  
 

二、配置自动配置类

1.编写 RabbitConfig 配置类


      package com.devframe.common.config;
      import org.springframework.amqp.core.AmqpAdmin;
      import org.springframework.amqp.core.AmqpManagementOperations;
      import org.springframework.amqp.core.Queue;
      import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
      import org.springframework.amqp.rabbit.connection.ConnectionFactory;
      import org.springframework.amqp.rabbit.core.RabbitAdmin;
      import org.springframework.amqp.rabbit.core.RabbitManagementTemplate;
      import org.springframework.amqp.rabbit.core.RabbitTemplate;
      import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
      import org.springframework.boot.context.properties.ConfigurationProperties;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      /**
       * @ClassName:
       * @Description:
       * @author DuanZhaoXu
       * @data 2019年1月5日上午10:52:32
       */
      @Configuration
      public class RabbitConfig {
     	@Bean
     	@ConfigurationProperties(prefix = "spring.rabbitmq")
     	public ConnectionFactory connectionFactory() {
     		return new CachingConnectionFactory();
      	}
     	@Bean
     	public AmqpAdmin AmqpAdmin() {
     		return new RabbitAdmin(connectionFactory());
      	}
     	@Bean
     	public RabbitTemplate rabbitTemplate() {
      		RabbitTemplate template = new RabbitTemplate(connectionFactory());
      		template.setMessageConverter(new Jackson2JsonMessageConverter());
     		return template;
      	}
     	@Bean
     	public AmqpManagementOperations amqpManagementOperations() {
      		AmqpManagementOperations amqpManagementOperations = new RabbitManagementTemplate(
      "http://192.168.19.200:15672", "admin", "admin@123");
     		return amqpManagementOperations;
      	}
     	@Bean
     	public Queue mqttQueue() {
     		return new Queue("mqttQueue", true, false, false);
      	}
      }
  
 

 2.编写 RedisConfig配置类


      package com.devframe.common.config;
      import java.io.Serializable;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      import org.springframework.data.redis.connection.RedisConnectionFactory;
      import org.springframework.data.redis.core.RedisTemplate;
      import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
      import org.springframework.data.redis.serializer.StringRedisSerializer;
      @Configuration
      public class RedisConfig {
     	@Bean
      public RedisTemplate<String, Serializable> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
       RedisTemplate<String, Serializable> redisTemplate = new RedisTemplate<>();
       redisTemplate.setKeySerializer(new StringRedisSerializer());
       redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
       redisTemplate.setConnectionFactory(redisConnectionFactory);
      return redisTemplate;
       }
      }
  
 

 3.编写 MqttSenderConfig 配置类 

https://docs.spring.io/spring-integration/docs/5.1.1.RELEASE/reference/html/mqtt.html#mqtt-inbound


      package com.devframe.common.config;
      import java.util.Arrays;
      import java.util.List;
      import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
      import org.springframework.beans.factory.annotation.Value;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      import org.springframework.integration.annotation.IntegrationComponentScan;
      import org.springframework.integration.annotation.ServiceActivator;
      import org.springframework.integration.channel.DirectChannel;
      import org.springframework.integration.core.MessageProducer;
      import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
      import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
      import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
      import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
      import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
      import org.springframework.messaging.Message;
      import org.springframework.messaging.MessageChannel;
      import org.springframework.messaging.MessageHandler;
      import org.springframework.messaging.MessagingException;
      @Configuration
      @IntegrationComponentScan
      public class MqttSenderConfig {
      @Value("${spring.mqtt.username}")
      private String username;
      @Value("${spring.mqtt.password}")
      private String password;
      @Value("${spring.mqtt.url}")
      private String hostUrl;
      //@Value("${spring.mqtt.client.id}")
      private String clientId = String.valueOf(System.currentTimeMillis());
      @Value("${spring.mqtt.default.topic}")
      private String defaultTopic;
      @Value("#{'${spring.mqtt.topics}'.split(',')}")
      private List<String> topics ;
      @Value("#{'${spring.mqtt.qosValues}'.split(',')}")
      private List<Integer> qosValues;
      @Bean
      public MqttConnectOptions getMqttConnectOptions(){
       MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();
       mqttConnectOptions.setUserName(username);
       mqttConnectOptions.setCleanSession(true);
       mqttConnectOptions.setPassword(password.toCharArray());
       mqttConnectOptions.setServerURIs(new String[]{hostUrl});
       mqttConnectOptions.setKeepAliveInterval(2);
      // 设置超时时间 单位为秒
       mqttConnectOptions.setConnectionTimeout(10);
       mqttConnectOptions.setMaxInflight(100000000);
      return mqttConnectOptions;
       }
      @Bean
      public MqttPahoClientFactory mqttClientFactory() {
       DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
       factory.setConnectionOptions(getMqttConnectOptions());
      return factory;
       }
      @Bean
      @ServiceActivator(inputChannel = "mqttOutboundChannel")
      public MessageHandler mqttOutbound() {
       MqttPahoMessageHandler messageHandler =  new MqttPahoMessageHandler(clientId, mqttClientFactory());
       messageHandler.setAsync(true);
       messageHandler.setDefaultTopic(defaultTopic);
       messageHandler.setDefaultRetained(false);
      return messageHandler;
       }
      @Bean
      public MessageChannel mqttOutboundChannel() {
      return new DirectChannel();
       }
      //接收通道
      @Bean
      public MessageChannel mqttInputChannel() {
      return new DirectChannel();
       }
      //配置client,监听的topic 
      @Bean
      public MessageProducer inbound() {
       String[] strings = new String[topics.size()];
       Integer[] ints = new Integer[qosValues.size()];
       topics.toArray(strings);
       qosValues.toArray(ints);
       int[] its= Arrays.stream(ints).mapToInt(Integer::valueOf).toArray();
       MqttPahoMessageDrivenChannelAdapter adapter =
       new MqttPahoMessageDrivenChannelAdapter(clientId+"_inbound", mqttClientFactory(),strings);
       adapter.setCompletionTimeout(3000);
       adapter.setConverter(new DefaultPahoMessageConverter());
       adapter.setOutputChannel(mqttInputChannel());
      return adapter;
       }
      //通过通道获取数据
      @Bean
      @ServiceActivator(inputChannel = "mqttInputChannel")
      public MessageHandler handler() {
      return new MessageHandler() {
      @Override
      public void handleMessage(Message<?> message) throws MessagingException {
       String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
       String type = topic.substring(topic.lastIndexOf("/")+1, topic.length());
      // System.out.println(topic+"|"+message.getPayload().toString());
       }
       };
       }
      }
  
 

4.编写  MqttGateway 发送消息的接口


      package com.devframe.common.config;
      import org.springframework.integration.annotation.MessagingGateway;
      import org.springframework.integration.mqtt.support.MqttHeaders;
      import org.springframework.messaging.handler.annotation.Header;
      import org.springframework.stereotype.Component;
      @Component
      @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
      public interface MqttGateway {
     	void sendToMqtt(String data);
     	void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);
     	void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
      }
  
 

在  其他的 需要 用到 mqtt 发送消息的时候直接 @Autowired 该 接口 既可进行消息的发送 

5、编写 activemq 的 配置类


      package com.devframe.common.config;
      import javax.jms.Queue;
      import javax.jms.Topic;
      import org.apache.activemq.ActiveMQConnectionFactory;
      import org.apache.activemq.command.ActiveMQQueue;
      import org.apache.activemq.command.ActiveMQTopic;
      import org.apache.activemq.pool.PooledConnectionFactory;
      import org.springframework.beans.factory.annotation.Value;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
      import org.springframework.jms.config.JmsListenerContainerFactory;
      @Configuration
      public class ActivemqConfig {
      // #配置 农机上报的消息转发到的topic名称
      // queueName: mqttQueue
      // topicName: mqttTopic
      //@Value("${spring.queueName}")
      public static final String queueName ="mqttQueue";
      //@Value("${spring.topicName}")
      public static final String topicName ="mqttTopic";
      @Value("${spring.activemq.user}")
      private String usrName;
      @Value("${spring.activemq.password}")
      private  String password;
      @Value("${spring.activemq.broker-url}")
      private  String brokerUrl;
      @Bean
      public Queue queue(){
      return new ActiveMQQueue(queueName);
       }
      @Bean
      public Topic topic(){
      return new ActiveMQTopic(topicName);
       }
      //配置activemq连接工厂
      // @Bean
      // public ActiveMQConnectionFactory activeMQConnectionFactory() {
      // return new ActiveMQConnectionFactory(usrName, password, brokerUrl);
      // }
      //配置连接池工厂(在高并发的情况下需要使用池连接工厂,不然当向activemq发送过多的消息时候会报错)
      @Bean
      public PooledConnectionFactory pooledConnectionFactory() {
      return new PooledConnectionFactory(new ActiveMQConnectionFactory(usrName, password, brokerUrl));
       }
      //配置JmsListenerContainerFactory
      @Bean
      public JmsListenerContainerFactory<?> jmsListenerContainerQueue( ){
       DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
       bean.setConnectionFactory(pooledConnectionFactory());
      return bean;
       }
      //配置发布订阅模式的JmsListenerContainerFactory,用于在消费者方指定
      @Bean
      public JmsListenerContainerFactory<?> jmsListenerContainerTopic(){
       DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
      //设置为发布订阅方式, 默认情况下使用的生产消费者方式
       bean.setPubSubDomain(true);
       bean.setConnectionFactory(pooledConnectionFactory());
      return bean;
       }
      }
  
 

 三 、AmqpTemplate 、RedisTemplate、MqttGateway ,

 1.AmqpTemplate 的使用


      package com.devframe.controller;
      import java.io.UnsupportedEncodingException;
      import java.util.HashMap;
      import java.util.List;
      import java.util.Map;
      import java.util.Map.Entry;
      import java.util.UUID;
      import org.springframework.amqp.core.AmqpAdmin;
      import org.springframework.amqp.core.AmqpManagementOperations;
      import org.springframework.amqp.core.AmqpTemplate;
      import org.springframework.amqp.core.Binding;
      import org.springframework.amqp.core.BindingBuilder;
      import org.springframework.amqp.core.Exchange;
      import org.springframework.amqp.core.ExchangeBuilder;
      import org.springframework.amqp.core.Message;
      import org.springframework.amqp.core.MessageProperties;
      import org.springframework.amqp.core.Queue;
      import org.springframework.amqp.core.QueueBuilder;
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.ui.Model;
      import org.springframework.web.bind.annotation.RequestMapping;
      import org.springframework.web.bind.annotation.RequestMethod;
      import org.springframework.web.bind.annotation.RequestParam;
      import org.springframework.web.bind.annotation.RestController;
      import com.devframe.entity.OperatEntity;
      import io.swagger.annotations.Api;
      import io.swagger.annotations.ApiOperation;
      @RestController
      @Api(tags = "RabbitServerController rabbitmq测试controller")
      public class RabbitServerController {
     	@Autowired
     	private AmqpTemplate amqpTemplate;
     	@Autowired
     	private AmqpManagementOperations amqpManagementOperations;
     	@Autowired
     	private AmqpAdmin amqpAdmin;
     	@RequestMapping(value = "/sendMsg", method = RequestMethod.POST)
     	public String sendAmqbMsg(Model model, @RequestParam(value = "msg", defaultValue = "hello world!!!") String msg) {
     		if (model != null && !"".equals(msg)) {
      			amqpTemplate.convertAndSend("mq.asdfExChange", "mq.asdf.send", msg);
      		} else {
      			amqpTemplate.convertAndSend("mq.asdfExChange", "mq.asdf.send", "hello world");
      		}
      		amqpManagementOperations.getQueues().forEach(x -> {
      			System.out.println(x.getName());
      		});
      		;
     		return "success";
      	}
     	@RequestMapping(value = "/sendMsg2", method = RequestMethod.POST)
     	public String sendAmqbMsg2(Model model, @RequestParam(value = "msg", defaultValue = "hello world!!!") String msg) {
     		if (model != null && !"".equals(msg)) {
      			amqpTemplate.convertAndSend("mq.asdfExChange", "mq.asdf2.send", "这个世界很奇妙!!!");
      		} else {
      			amqpTemplate.convertAndSend("mq.asdfExChange", "mq.asdf2.send", "这个世界很奇妙");
      		}
     		return "success";
      	}
     	@RequestMapping(value = "/sendMsg3", method = RequestMethod.POST)
     	public String sendAmqbMsg3(Model model, @RequestParam(value = "msg", defaultValue = "hello world!!!") String msg) {
     		if (model != null && !"".equals(msg)) {
      			amqpTemplate.convertAndSend("mq.qwerExChange", "mq.qwer.send", "神奇的世界!!!");
      		} else {
      			amqpTemplate.convertAndSend("mq.qwerExChange", "mq.qwer.send", "神奇的世界");
      		}
     		return "success";
      	}
     	@RequestMapping(value = "/helloWorld", method = RequestMethod.POST)
     	@ApiOperation("1对1 or 1对多 无交换机模式")
     	public String helloWorld(Model model, @RequestParam(value = "msg", defaultValue = "1对1,无交换机模式!") String msg) {
     		if (model != null && !"".equals(msg)) {
      			amqpTemplate.convertAndSend("mqttQueue", msg);
      		}
     		return "success";
      	}
     	@RequestMapping(value = "/pubSub", method = RequestMethod.POST)
     	@ApiOperation("广播发布/订阅模式")
     	public String pubSub(Model model, @RequestParam(value = "msg", defaultValue = "广播发布/订阅模式") String msg) {
     		if (model != null && !"".equals(msg)) {
     			// 广播模式对于路由无效,所有的消费者都可以获取都消息
      			amqpTemplate.convertAndSend("pubSubExchange", "", msg);
      		}
     		return "success";
      	}
     	@RequestMapping(value = "/routing", method = RequestMethod.POST)
     	@ApiOperation("路由消息模式")
     	public String routing(Model model, @RequestParam(value = "msg", defaultValue = "路由消息模式") String msg) {
     		if (model != null && !"".equals(msg)) {
      			String[] infoTyp = new String[] { "info", "warn", "error" };
     			for (String routing : infoTyp) {
       amqpTemplate.convertAndSend("routingExchange", routing, msg);
      			}
      		}
     		return "success";
      	}
     	@RequestMapping(value = "/topicMatch", method = RequestMethod.POST)
     	@ApiOperation("主题模式")
     	public String topicModal(Model model, @RequestParam(value = "msg", defaultValue = "主题模式") String msg) {
     		if (model != null && !"".equals(msg)) {
      			String[] infoTyp = new String[] { "P.123.asdasd", "P.456.JQBE", "P.789.WBD", "P.ASBDJBAS" };
     			for (String routing : infoTyp) {
       amqpTemplate.convertAndSend("topicExchange", routing, msg);
      			}
      		}
     		return "success";
      	}
     	@RequestMapping(value = "/headerModal", method = RequestMethod.POST)
     	@ApiOperation("header模式")
     	public String headerModal(Model model, @RequestParam(value = "msg", defaultValue = "主题模式") String msg)
      			throws UnsupportedEncodingException {
     		if (model != null && !"".equals(msg)) {
      			Map<String, Object> map = new HashMap<String, Object>();
      			map.put("abc", "nb");
      			map.put("def", "pl");
      			map.put("jabs", "aksd");
     			for (Entry<String, Object> entry : map.entrySet()) {
       MessageProperties messageProperties = new MessageProperties();
       messageProperties.setHeader(entry.getKey(), entry.getValue());
       Message message = new Message(msg.getBytes("utf-8"), messageProperties);
       amqpTemplate.convertAndSend("headerExchange", null, message);
      			}
      		}
     		return "success";
      	}
     	@RequestMapping(value = "/createTaskQueue", method = RequestMethod.POST)
     	@ApiOperation("自动创建队列并发送消息到队列")
     	public String createTaskQueue(Model model, @RequestParam(value = "msg", defaultValue = "主题模式") String msg,
     			@RequestParam(name = "queueName", defaultValue = "ff008") String queueName)
      			throws UnsupportedEncodingException {
     		if (model != null && !"".equals(msg)) {
      			amqpAdmin.declareQueue(new Queue(queueName, true, false, false));
      			OperatEntity operatEntity = new OperatEntity();
      			operatEntity.setId("123456");
      			operatEntity.setIndepeid("askdnad");
      			operatEntity.setRecordid(msg);
      			operatEntity.setTablename(msg);
      			amqpTemplate.convertAndSend(queueName, operatEntity);
     			// Object object = amqpTemplate.receiveAndConvert(queueName);
     			// if(object instanceof OperatEntity){
     			// OperatEntity operatEntity2 = (OperatEntity)object;
     			// System.out.println("从队列"+queueName+"收到了一个【"+operatEntity2.toString()+"】");
     			// System.out.println(operatEntity.getId());
     			// System.out.println(operatEntity.getIndepeid());
     			// System.out.println(operatEntity.getRecordid());
     			// System.out.println(operatEntity.getTablename());
     			// }else{
     			// String result = (String)object;
     			// System.out.println("从队列"+queueName+"收到了一个【"+result+"】");
     			// }
      		}
     		return "success";
      	}
     	/**
       * 删除以 任务名称为前缀的队列
       *
       * @param queueNamePre
       * @return String
       */
     	@RequestMapping(value = "/deleteQueue", method = RequestMethod.POST)
     	@ApiOperation("删除以 任务名称为前缀的队列")
     	public String deleteQueueWithPre(String queueNamePre) {
      		List<Queue> queues = amqpManagementOperations.getQueues();
     		for (Queue queue : queues) {
     			if (queue.getName().startsWith(queueNamePre)) {
       amqpManagementOperations.deleteQueue(queue);
      			}
      		}
     		return "success";
      	}
     	@RequestMapping(value = "/createExchangeBindTaskQueue", method = RequestMethod.POST)
     	@ApiOperation("自动创建交换机并绑定队列")
     	public String createExchangeBindTaskQueue(Model model,
     			@RequestParam(value = "msg", defaultValue = "主题模式") String msg,
     			@RequestParam(name = "exchangeName", defaultValue = "ff008") String exchangeName)
      			throws UnsupportedEncodingException {
     		if (model != null && !"".equals(msg)) {
     			// 查询交换机是否存在
     			// Exchange exchange =
     			// amqpManagementOperations.getExchange(exchangeName);
     			// if(exchange==null){ //如果不存在 ,则声明该交换机
      			String randomNum = UUID.randomUUID().toString().substring(0, 8);
      			Exchange exchange = ExchangeBuilder.directExchange(exchangeName).durable(true).build();
      			Queue queue = QueueBuilder.durable(exchangeName + "-" + randomNum).build();
      			amqpAdmin.declareExchange(exchange);
      			amqpAdmin.declareQueue(queue);
     			// }
     			// 否则直接 将 该队列绑定到 交换机上面,routingkey 为 生成的8位随机数
      			amqpAdmin.declareBinding(BindingBuilder.bind(queue).to(exchange).with(randomNum).noargs());
      			amqpTemplate.convertAndSend(exchangeName, randomNum, msg);
      			String result = (String) amqpTemplate.receiveAndConvert(queue.getName());
      			System.out.println(result);
      		}
     		return "success";
      	}
     	/**
       * 删除以 任务名称的交换机
       *
       * @param queueNamePre
       * @return String
       */
     	@RequestMapping(value = "/deleteExchange", method = RequestMethod.POST)
     	@ApiOperation("删除以 任务名称的交换机")
     	public String deleteExchange(String exchangeName) {
     		// Map<String, Object> map =
     		// amqpManagementOperations.getExchange(exchangeName).getArguments();
      		List<Binding> bindings = amqpManagementOperations.getBindingsForExchange("/", exchangeName);
     		for (Binding binding : bindings) {
      			String routingkey = binding.getRoutingKey();
      			amqpAdmin.deleteQueue(exchangeName + "-" + routingkey);
      		}
      		amqpAdmin.deleteExchange(exchangeName);
     		return "success";
      	}
     	// rpc调用未实现
      }
  
 

2. RedisTemplate 的使用


      package com.devframe.common.util;
      import java.util.List;
      import java.util.Set;
      import org.apache.commons.lang3.StringUtils;
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.data.redis.core.StringRedisTemplate;
      import org.springframework.stereotype.Component;
      @Component
      public class RedisTemplateUtils {
      	@Autowired
     	private StringRedisTemplate stringRedisTemplate;
      // @Autowired
      // private RedisTemplate<String, Serializable> redisTemplate;
     	public static final int LOCK_TIMEOUT = 4;
     	// ---------------value 为String 的 操作
     	/**
       * set 字符串的 key,value
       *
       * @param key
       * @param value
       */
     	public void set(String key, String value) {
      		stringRedisTemplate.opsForValue().set(key, value);
      	}
     	/**
       * 根据key获取字符串的 value
       *
       * @param key
       */
     	public String get(String key) {
     		return stringRedisTemplate.opsForValue().get(key);
      	}
     	/**
       * 根据 key 删除
       *
       * @param key
       */
     	public void del(String key) {
      		stringRedisTemplate.opsForValue().getOperations().delete(key);
      	}
     	// ---------------value 为 List 的操作
     	/**
       * push 元素到 list中
       */
     	public  void  lpush(String key,String value) {
      		stringRedisTemplate.opsForList().leftPush(key, value);
      	}
     	/**
       * 获取 list中 某个下标的元素
       * @param key
       * @param index
       */
     	public String  lindex(String key,long index) {
      return stringRedisTemplate.opsForList().index(key, index);
      	}
     	/**
       * 根据key 获取 list的长度
       * @param key
       * @return
       */
     	public long  llen(String key) {
     		return stringRedisTemplate.opsForList().size(key);
      	}
     	/**
       * 根据 key,start,end 获取某一个区间的 list数据集
       * @param key
       * @param start
       * @param end
       * @return
       */
     	public  List<String>  lrange(String key, long start, long end){
     		return stringRedisTemplate.opsForList().range(key, start, end);
      	}
     	/**
       * 根据 key值 pattern查询所有匹配的值,比如login*
       * @param key
       * @return
       */
     	public  Set<String> keys(String key){
     		return  stringRedisTemplate.keys(key);
      	}
     	/**
       * 加锁
       *
       * @param key productId - 商品的唯一标志
       * @param value 当前时间+超时时间 也就是时间戳
       * @return
       */
     	public boolean lock(String key,String value) {
     		if (stringRedisTemplate.opsForValue().setIfAbsent(key, value)) {// 对应setnx命令
     			// 可以成功设置,也就是key不存在
     			return true;
      		}
     		// 判断锁超时 - 防止原来的操作异常,没有运行解锁操作 防止死锁
     		String currentValue = stringRedisTemplate.opsForValue().get(key);
     		// 如果锁过期
     		if (!StringUtils.isEmpty(currentValue) && Long.parseLong(currentValue) < System.currentTimeMillis()) {// currentValue不为空且小于当前时间
     			// 获取上一个锁的时间value
     			String oldValue = stringRedisTemplate.opsForValue().getAndSet(key, value);// 对应getset,如果key存在
     			// 假设两个线程同时进来这里,因为key被占用了,而且锁过期了。获取的值currentValue=A(get取的旧的值肯定是一样的),两个线程的value都是B,key都是K.锁时间已经过期了。
     			// 而这里面的getAndSet一次只会一个执行,也就是一个执行之后,上一个的value已经变成了B。只有一个线程获取的上一个值会是A,另一个线程拿到的值是B。
     			if (!StringUtils.isEmpty(oldValue) && oldValue.equals(currentValue)) {
      // oldValue不为空且oldValue等于currentValue,也就是校验是不是上个对应的商品时间戳,也是防止并发
      return true;
      			}
      		}
     		return false;
      	}
     	/**
       * 解锁
       *
       * @param key
       * @param value
       */
     	public void unlock(String key,String value) {
     		try {
     			String currentValue = stringRedisTemplate.opsForValue().get(key);
     			if (!StringUtils.isEmpty(currentValue) && currentValue.equals(value)) {
       stringRedisTemplate.opsForValue().getOperations().delete(key);// 删除key
      			}
      		} catch (Exception e) {
      			Log.error(e);
      		}
      	}
      }
  
 

 3.MqttGateway 的使用


      @Autowired
     	  private MqttGateway mqttGateway;
     	  @RequestMapping("/sendMsg")
      public String sendMsg(String  sendData,String topic){
      try {
       mqttGateway.sendToMqtt(sendData,topic);
       mqttGateway.sendToMqtt(topic, 0, sendData);
      return "发送成功";
       }catch (Exception e) {
      // TODO: handle exception
       Log.error(e);
      return "发送失败";
      			}
      	  }
  
 

4、jmsMessagingTemplate 的使用


      @@Autowired
      private JmsMessagingTemplate  jmsMessagingTemplate;
      jmsMessagingTemplate.convertAndSend(new ActiveMQTopic(ActivemqConfig.topicName), payload);
  
 

文章来源: blog.csdn.net,作者:血煞风雨城2018,版权归原作者所有,如需转载,请联系作者。

原文链接:blog.csdn.net/qq_31905135/article/details/86233738

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

作者其他文章

评论(0

抱歉,系统识别当前为高风险访问,暂不支持该操作

    全部回复

    上滑加载中

    设置昵称

    在此一键设置昵称,即可参与社区互动!

    *长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

    *长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。