Spring Cloud Stream 消息发送

举报
周杰伦本人 发表于 2022/05/18 11:02:34 2022/05/18
【摘要】 Spring Cloud Stream 消息发送 业务发送消息source.output().send(message);来发送消息public interface Source { /** * Name of the output channel. */ String OUTPUT = "output"; /** * @return output chann...

Spring Cloud Stream 消息发送

业务发送消息

source.output().send(message);来发送消息

public interface Source {

   /**
    * Name of the output channel.
    */
   String OUTPUT = "output";

   /**
    * @return output channel
    */
   @Output(Source.OUTPUT)
   MessageChannel output();

}
@FunctionalInterface
public interface MessageChannel {

   long INDEFINITE_TIMEOUT = -1;


   default boolean send(Message<?> message) {
      return send(message, INDEFINITE_TIMEOUT);
   }

  
   boolean send(Message<?> message, long timeout);

}

AbstractMessageChannel是消息通道的基本实现,提供发送消息和接收消息的公共方法。

AbstractSubscribableChannel类的doSend()方法

消息发送到AbstractSubscribableChannel类的doSend()方法如下:

public abstract class AbstractSubscribableChannel extends AbstractMessageChannel
		implements SubscribableChannel, SubscribableChannelManagement {

	
	@Override
	protected boolean doSend(Message<?> message, long timeout) {
		try {
			return getRequiredDispatcher().dispatch(message);
		}
		catch (MessageDispatchingException e) {
			String description = e.getMessage() + " for channel '" + this.getFullChannelName() + "'.";
			throw new MessageDeliveryException(message, description, e);
		}
	}

	private MessageDispatcher getRequiredDispatcher() {
		MessageDispatcher dispatcher = getDispatcher();
		Assert.state(dispatcher != null, "'dispatcher' must not be null");
		return dispatcher;
	}

	protected abstract MessageDispatcher getDispatcher();

}

调用getDispatcher方法从DirectChannel中得到消息分发类MessageDispatcher的实现类UnicastingDispatcher,调用dispatch方法把消息分发给各个MessageHandler

UnicastingDispatcher的doDispatch()方法

UnicastingDispatcher的doDispatch方法:

private boolean doDispatch(Message<?> message) {
   if (tryOptimizedDispatch(message)) {
      return true;
   }
   boolean success = false;
   Iterator<MessageHandler> handlerIterator = this.getHandlerIterator(message);
   if (!handlerIterator.hasNext()) {
      throw new MessageDispatchingException(message, "Dispatcher has no subscribers");
   }
   List<RuntimeException> exceptions = new ArrayList<RuntimeException>();
   while (!success && handlerIterator.hasNext()) {
      MessageHandler handler = handlerIterator.next();
      try {
         handler.handleMessage(message);
         success = true; // we have a winner.
      }
      catch (Exception e) {
         RuntimeException runtimeException = IntegrationUtils.wrapInDeliveryExceptionIfNecessary(message,
               () -> "Dispatcher failed to deliver Message", e);
         exceptions.add(runtimeException);
         this.handleExceptions(exceptions, message, !handlerIterator.hasNext());
      }
   }
   return success;
}

遍历所有的MessageHandler,调用handleMessage()处理消息,那么MessageHandler是从哪来的呢?

AbstractMessageChannelBinder在初始化Binding时,会创建并初始化SendingHandler,调用subscribe()方法添加到handlers列表。

AbstractMessageChannelBinder的初始化由AbstractBindingLifecycle在Spring容器加载所有Bean并完成初始化之后完成。

RocketMQMessageChannelBinder集成消息发送

AbstractMessageChannelBinder类提供创建MessageHandler规范,createProducerMessageHandler()方法在初始化Binder的时候会加载。

RocketMQMessageChannelBinder继承AbstractMessageChannelBinder,完成RocketMQMessageHandler的创建和初始化,RocketMQMessageHandler的消息处理器MessageHandler的具体实现,RocketMQMessageHandler在RocketMQBinder中的作用就是转化消息格式并发送消息。

RocketMQMessageChannelBinder的createProducerMessageHandler方法:

这个方法就是创建MessageHandler的

@Override
protected MessageHandler createProducerMessageHandler(ProducerDestination destination,
      ExtendedProducerProperties<RocketMQProducerProperties> producerProperties,
      MessageChannel channel, MessageChannel errorChannel) throws Exception {
   if (producerProperties.getExtension().getEnabled()) {

      // if producerGroup is empty, using destination
      String extendedProducerGroup = producerProperties.getExtension().getGroup();
      String producerGroup = StringUtils.isEmpty(extendedProducerGroup)
            ? destination.getName()
            : extendedProducerGroup;

      RocketMQBinderConfigurationProperties mergedProperties = RocketMQBinderUtils
            .mergeProperties(rocketBinderConfigurationProperties,
                  rocketMQProperties);

      RocketMQTemplate rocketMQTemplate;
      if (producerProperties.getExtension().getTransactional()) {
         Map<String, RocketMQTemplate> rocketMQTemplates = getBeanFactory()
               .getBeansOfType(RocketMQTemplate.class);
         if (rocketMQTemplates.size() == 0) {
            throw new IllegalStateException(
                  "there is no RocketMQTemplate in Spring BeanFactory");
         }
         else if (rocketMQTemplates.size() > 1) {
            throw new IllegalStateException(
                  "there is more than 1 RocketMQTemplates in Spring BeanFactory");
         }
         rocketMQTemplate = rocketMQTemplates.values().iterator().next();
      }
      else {
         rocketMQTemplate = new RocketMQTemplate();
         rocketMQTemplate.setObjectMapper(this.getApplicationContext()
               .getBeansOfType(ObjectMapper.class).values().iterator().next());
          //初始化DefaultMQProducer
         DefaultMQProducer producer;
         String ak = mergedProperties.getAccessKey();
         String sk = mergedProperties.getSecretKey();
         if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) {
            RPCHook rpcHook = new AclClientRPCHook(
                  new SessionCredentials(ak, sk));
            producer = new DefaultMQProducer(producerGroup, rpcHook,
                  mergedProperties.isEnableMsgTrace(),
                  mergedProperties.getCustomizedTraceTopic());
            producer.setVipChannelEnabled(false);
            producer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook,
                  destination.getName() + "|" + UtilAll.getPid()));
         }
         else {
            producer = new DefaultMQProducer(producerGroup);
            producer.setVipChannelEnabled(
                  producerProperties.getExtension().getVipChannelEnabled());
         }
         producer.setNamesrvAddr(mergedProperties.getNameServer());
         producer.setSendMsgTimeout(
               producerProperties.getExtension().getSendMessageTimeout());
         producer.setRetryTimesWhenSendFailed(
               producerProperties.getExtension().getRetryTimesWhenSendFailed());
         producer.setRetryTimesWhenSendAsyncFailed(producerProperties
               .getExtension().getRetryTimesWhenSendAsyncFailed());
         producer.setCompressMsgBodyOverHowmuch(producerProperties.getExtension()
               .getCompressMessageBodyThreshold());
         producer.setRetryAnotherBrokerWhenNotStoreOK(
               producerProperties.getExtension().isRetryNextServer());
         producer.setMaxMessageSize(
               producerProperties.getExtension().getMaxMessageSize());
         rocketMQTemplate.setProducer(producer);
         if (producerProperties.isPartitioned()) {
            rocketMQTemplate
                  .setMessageQueueSelector(new PartitionMessageQueueSelector());
         }
      }

      RocketMQMessageHandler messageHandler = new RocketMQMessageHandler(
            rocketMQTemplate, destination.getName(), producerGroup,
            producerProperties.getExtension().getTransactional(),
            instrumentationManager, producerProperties,
            ((AbstractMessageChannel) channel).getChannelInterceptors().stream()
                  .filter(channelInterceptor -> channelInterceptor instanceof MessageConverterConfigurer.PartitioningInterceptor)
                  .map(channelInterceptor -> ((MessageConverterConfigurer.PartitioningInterceptor) channelInterceptor))
                  .findFirst().orElse(null));
      messageHandler.setBeanFactory(this.getApplicationContext().getBeanFactory());
      messageHandler.setSync(producerProperties.getExtension().getSync());
      messageHandler.setHeaderMapper(createHeaderMapper(producerProperties));
      if (errorChannel != null) {
         messageHandler.setSendFailureChannel(errorChannel);
      }
      return messageHandler;
   }
   else {
      throw new RuntimeException("Binding for channel " + destination.getName()
            + " has been disabled, message can't be delivered");
   }
}

RocketMQMessageHandler中持有RocketMQTemplate对象,RocketMQTemplate是对RocketMQ客户端API的封装

DefaultMQProducer由RocketMQ客户端提供的API,发送消息到RocketMQ消息服务器都是由它来完成。

RocketMQMessageHandler是消息发送的处理逻辑,解析Message对象头中的参数,调用RocketMQTemplate中不同的发送消息接口。

RocketMQMessageHandler的handleMessageInternal()方法

RocketMQMessageHandler用来处理消息

RocketMQMessageHandler的handleMessageInternal方法:

protected void handleMessageInternal(org.springframework.messaging.Message<?> message)
      throws Exception {
   try {
      // issue 737 fix
      Map<String, String> jsonHeaders = headerMapper
            .fromHeaders(message.getHeaders());
      message = org.springframework.messaging.support.MessageBuilder
            .fromMessage(message).copyHeaders(jsonHeaders).build();

      final StringBuilder topicWithTags = new StringBuilder(destination);
      String tags = Optional
            .ofNullable(message.getHeaders().get(RocketMQHeaders.TAGS)).orElse("")
            .toString();
      if (!StringUtils.isEmpty(tags)) {
         topicWithTags.append(":").append(tags);
      }

      SendResult sendRes = null;
       //发送事务消息
      if (transactional) {
         sendRes = rocketMQTemplate.sendMessageInTransaction(groupName,
               topicWithTags.toString(), message, message.getHeaders()
                     .get(RocketMQBinderConstants.ROCKET_TRANSACTIONAL_ARG));
         log.debug("transactional send to topic " + topicWithTags + " " + sendRes);
      }
      else {
          //设置定时消息参数
         int delayLevel = 0;
         try {
            Object delayLevelObj = message.getHeaders()
                  .getOrDefault(MessageConst.PROPERTY_DELAY_TIME_LEVEL, 0);
            if (delayLevelObj instanceof Number) {
               delayLevel = ((Number) delayLevelObj).intValue();
            }
            else if (delayLevelObj instanceof String) {
               delayLevel = Integer.parseInt((String) delayLevelObj);
            }
         }
         catch (Exception e) {
            // ignore
         }
         boolean needSelectQueue = message.getHeaders()
               .containsKey(BinderHeaders.PARTITION_HEADER);
          //同步发送
         if (sync) {
             //顺序消息
            if (needSelectQueue) {
               sendRes = rocketMQTemplate.syncSendOrderly(
                     topicWithTags.toString(), message, "",
                     rocketMQTemplate.getProducer().getSendMsgTimeout());
            }
             //普通消息
            else {
               sendRes = rocketMQTemplate.syncSend(topicWithTags.toString(),
                     message,
                     rocketMQTemplate.getProducer().getSendMsgTimeout(),
                     delayLevel);
            }
            log.debug("sync send to topic " + topicWithTags + " " + sendRes);
         }
          //异步消息
         else {
            Message<?> finalMessage = message;
            SendCallback sendCallback = new SendCallback() {
               @Override
               public void onSuccess(SendResult sendResult) {
                  log.debug("async send to topic " + topicWithTags + " "
                        + sendResult);
               }

               @Override
               public void onException(Throwable e) {
                  log.error("RocketMQ Message hasn't been sent. Caused by "
                        + e.getMessage());
                  if (getSendFailureChannel() != null) {
                     getSendFailureChannel().send(
                           RocketMQMessageHandler.this.errorMessageStrategy
                                 .buildErrorMessage(new MessagingException(
                                       finalMessage, e), null));
                  }
               }
            };
            if (needSelectQueue) {
               rocketMQTemplate.asyncSendOrderly(topicWithTags.toString(),
                     message, "", sendCallback,
                     rocketMQTemplate.getProducer().getSendMsgTimeout());
            }
            else {
               rocketMQTemplate.asyncSend(topicWithTags.toString(), message,
                     sendCallback);
            }
         }
      }
      if (sendRes != null && !sendRes.getSendStatus().equals(SendStatus.SEND_OK)) {
         if (getSendFailureChannel() != null) {
            this.getSendFailureChannel().send(message);
         }
         else {
            throw new MessagingException(message,
                  new MQClientException("message hasn't been sent", null));
         }
      }
   }
   catch (Exception e) {
      log.error("RocketMQ Message hasn't been sent. Caused by " + e.getMessage());
      if (getSendFailureChannel() != null) {
         getSendFailureChannel().send(this.errorMessageStrategy
               .buildErrorMessage(new MessagingException(message, e), null));
      }
      else {
         throw new MessagingException(message, e);
      }
   }

}

代码有点长,但整体还是很好理解的

  1. 获取消息的目的地,也就是代码中的tags变量
  2. 判断是否为事务消息,如果是的话就发送事务消息
  3. 如果不是事务消息,先设置定时消息的参数,判断是否为同步同步消息,如果是的话再判断是顺序消息还是普通消息,顺序消息,同样异步消息还是分为异步顺序消息和异步的普通消息
  4. 根据发送结果,如果发送消息失败的话就把消息发送到失败队列中。

发送普通消息、事务消息、定时消息还是顺序消息,由Message对象的消息头Header中的属性决定,在业务代码创建Message对象时设置。

总结

这篇文章我们讲了Spring Cloud Stream 消息发送的基本流程,先是业务发送消息,经过AbstractSubscribableChannel类的doSend()方法,方法中调用UnicastingDispatcher的doDispatch()方法进行分发遍历所有的MessageHandler进行处理消息,RocketMQMessageHandler是其中之一,它根据消息头的header信息判断是什么类型的消息,然后发送对应的消息,发送失败的消息进行失败的队列中。

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。