博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
rocketmq-producer
阅读量:6425 次
发布时间:2019-06-23

本文共 9518 字,大约阅读时间需要 31 分钟。

hot3.png

producer业务流程

 

1.选择namesrv

/** * 选择namesrv * @return * @throws InterruptedException */private Channel getAndCreateNameserverChannel() throws InterruptedException {    String addr = this.namesrvAddrChoosed.get();    if (addr != null) {        ChannelWrapper cw = this.channelTables.get(addr);        if (cw != null && cw.isOK()) {            return cw.getChannel();        }    }    //获取namesrv所有地址    final List
addrList = this.namesrvAddrList.get(); if (this.lockNamesrvChannel.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { addr = this.namesrvAddrChoosed.get(); if (addr != null) { ChannelWrapper cw = this.channelTables.get(addr); if (cw != null && cw.isOK()) { return cw.getChannel(); } } //顺序选择namesrv if (addrList != null && !addrList.isEmpty()) { for (int i = 0; i < addrList.size(); i++) { int index = this.namesrvIndex.incrementAndGet(); index = Math.abs(index); index = index % addrList.size(); String newAddr = addrList.get(index); this.namesrvAddrChoosed.set(newAddr); log.info("new name server is chosen. OLD: {} , NEW: {}. namesrvIndex = {}", addr, newAddr, namesrvIndex); Channel channelNew = this.createChannel(newAddr); if (channelNew != null) { return channelNew; } } } } catch (Exception e) { log.error("getAndCreateNameserverChannel: create name server channel exception", e); } finally { this.lockNamesrvChannel.unlock(); } } else { log.warn("getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms", LOCK_TIMEOUT_MILLIS); } return null;}

2.获取默认的topic(TBW102)

第一次由于broker上面不存在自定义topic,会进行第二次获取,根据TBW102去获取。

public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,    DefaultMQProducer defaultMQProducer) {    try {        if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {            try {                TopicRouteData topicRouteData;                if (isDefault && defaultMQProducer != null) {                    // 获取topic = TBW102 的broker                    topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),                        1000 * 3);                    if (topicRouteData != null) {                        for (QueueData data : topicRouteData.getQueueDatas()) {                            //默认设置为4个队列                            int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());                            data.setReadQueueNums(queueNums);                            data.setWriteQueueNums(queueNums);                        }                    }                } else {                    //根据topic查询                    topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);                }                if (topicRouteData != null) {                    TopicRouteData old = this.topicRouteTable.get(topic);                    boolean changed = topicRouteDataIsChange(old, topicRouteData);                    if (!changed) {                        changed = this.isNeedUpdateTopicRouteInfo(topic);                    } else {                        log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);                    }                    if (changed) {                        TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();                        for (BrokerData bd : topicRouteData.getBrokerDatas()) {                            this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());                        }                        // Update Pub info                        {                            TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);                            publishInfo.setHaveTopicRouterInfo(true);                            Iterator
> it = this.producerTable.entrySet().iterator(); while (it.hasNext()) { Entry
entry = it.next(); MQProducerInner impl = entry.getValue(); if (impl != null) { impl.updateTopicPublishInfo(topic, publishInfo); } } } // Update sub info { Set
subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData); Iterator
> it = this.consumerTable.entrySet().iterator(); while (it.hasNext()) { Entry
entry = it.next(); MQConsumerInner impl = entry.getValue(); if (impl != null) { impl.updateTopicSubscribeInfo(topic, subscribeInfo); } } } log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData); this.topicRouteTable.put(topic, cloneTopicRouteData); return true; } } else { log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic); } } catch (Exception e) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) { log.warn("updateTopicRouteInfoFromNameServer Exception", e); } } finally { this.lockNamesrv.unlock(); } } else { log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LOCK_TIMEOUT_MILLIS); } } catch (InterruptedException e) { log.warn("updateTopicRouteInfoFromNameServer Exception", e); } return false;}

3.选择messageQueue

/** * 随机取模获取messageQueue * @return */public MessageQueue selectOneMessageQueue() {    int index = this.sendWhichQueue.getAndIncrement();    int pos = Math.abs(index) % this.messageQueueList.size();    if (pos < 0)        pos = 0;    return this.messageQueueList.get(pos);}

4.sendMessage

组装requestHeader

SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());requestHeader.setTopic(msg.getTopic());requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());requestHeader.setQueueId(mq.getQueueId());requestHeader.setSysFlag(sysFlag);requestHeader.setBornTimestamp(System.currentTimeMillis());requestHeader.setFlag(msg.getFlag());requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));requestHeader.setReconsumeTimes(0);requestHeader.setUnitMode(this.isUnitMode());requestHeader.setBatch(msg instanceof MessageBatch);if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {    String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);    if (reconsumeTimes != null) {        requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);    }    String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);    if (maxReconsumeTimes != null) {        requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);    }}
switch (communicationMode) {    case ASYNC:        Message tmpMessage = msg;        if (msgBodyCompressed) {            //If msg body was compressed, msgbody should be reset using prevBody.            //Clone new message using commpressed message body and recover origin massage.            //Fix bug:https://github.com/apache/rocketmq-externals/issues/66            tmpMessage = MessageAccessor.cloneMessage(msg);            msg.setBody(prevBody);        }        long costTimeAsync = System.currentTimeMillis() - beginStartTime;        if (timeout < costTimeAsync) {            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");        }        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(            brokerAddr,            mq.getBrokerName(),            tmpMessage,            requestHeader,            timeout - costTimeAsync,            communicationMode,            sendCallback,            topicPublishInfo,            this.mQClientFactory,            this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),            context,            this);        break;    case ONEWAY:    case SYNC:        long costTimeSync = System.currentTimeMillis() - beginStartTime;        if (timeout < costTimeSync) {            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");        }        //发送        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(            brokerAddr,            mq.getBrokerName(),            msg,            requestHeader,            timeout - costTimeSync,            communicationMode,            context,            this);        break;    default:        assert false;        break;}//RequestCode.SEND_MESSAGERemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);

 

转载于:https://my.oschina.net/penghaozhong/blog/3050481

你可能感兴趣的文章
Android ListView展示不同的布局
查看>>
iOS宏(自己使用,持续更新)
查看>>
手把手玩转win8开发系列课程(3)
查看>>
NGINX引入线程池 性能提升9倍
查看>>
《淘宝技术这十年》读书笔记 (四). 分布式时代和中间件
查看>>
linux下mongodb定时备份指定的集合
查看>>
oVirt JBAS server start failed, ajp proxy cann't server correct. ovirt-engine URL cann't open
查看>>
CDP WebConsole上线公告
查看>>
ubuntu下安装摄像头应用程序xawtv
查看>>
PostgreSQL 如何比较两个表的定义是否一致
查看>>
Ambari安装Hadoop集群
查看>>
WCF学习之旅—基于ServiceDebug的异常处理(十七)
查看>>
CLREX
查看>>
再也不用担心this指向的问题了
查看>>
使用putty远程连接linux
查看>>
【comparator, comparable】小总结
查看>>
Node 版本管理
查看>>
34、重分布配置实验之分发列表distribute-list
查看>>
命令模式-对象行为型
查看>>
VS2017配置、提高生产力、代码辨识度 (工欲善其事必先利其器)新手必备!
查看>>