《RocketMq》五、Client-Producer生产者详解
flypeng 发布时间:08-08 来源:0 浏览:0次

Producer的用途大家都很清楚,主要就是生产消息,那么分布式模式下与单队列模式不一样,如何能够充分利用分布式的优势,将生产的消息分布到不同的队列下呢?Rocket-MQ提供了3种不同模式的Producer:

1. NormalProducer

2. OrderProducer

3. TransactionProducer

一、数据结构

1. TopicPublicInfo:仅仅是Producer使用,保存了messageQueue列表

2.MessageQueue:公用组件,保存了Queue的标识信息,<topic、brokerName、queueId>

3.QueueData:包含了Queue的brokerName,queue的读写数目等信息

4.BrokerData:包含了broker的名字,IP,等信息

 

二、主要模块

根据<clientId(IP@instanceId), group>每一个DefaultMQProducer有2层的分发:

首先,根据不同的clientId,MQClientManager将给出不同的MQClientInstance;

其次,根据不同的group,MQClientInstance将给出不同的MQProducer和MQConsumer

 

DefaultMQProducer: 主要保存了配置信息,包括clientId,instantName和groupName,以及发送的网络参数等;

DefaultMQProducerImpl:主要保存了<topic,TopicPublishInfo>信息,SendMessageHook、CheckForbiddenHook、RPCHook以及MQClientInstance信息;

MQClientManager:保存了<clientId,MQClientInstance>信息,clientId(IP@instanceName);

MQClientInstance:保存了producerTable、consumerTable,adminExtTable,NettyClientConfig,<Topic,TopicRouteData>,<brokerName, <brokerId, address>>,RebalanceService, ClientRemotingProcessor, ConsumerStatsManager等信息;

 

三、三种处理模式

这三种处理主要是select broker queue的方法不一样

1. NormalProducer:普通模式;这里的select调用TopicPublishInfo的selectOneMessageQueue,他的机制如下:正常情况下,顺序选择TopicPublishInfo中messageQueueList的queue进行发送;如果某一个节点发生了超时,则下次选择queue时,跳过相同的brokerName;

 

2. OrderProducer:顺序模式;假设相同订单号的支付,退款需要放到同一个队列,那么就可以在send的时候,自己实现MessageQueueSelector,根据里面的arg字段来选择queue

 

3. TransactionProducer:

 

四、处理逻辑

1. 首先获取进程pid,将它作为instanceName,系统中所有相同instanceName的Producer和Consumer共用同一个MQClientInstance

2. 将Consumer或者Producer注册到MQClientInstance

3. 调用MQClientInstance.start,启动如下定时任务

3.1 fetchNameServerAddr:不断遍历nameSrv的地址

3.2 updateTopicRouteInfoFromNameServer:从NameSrv遍历TopicRouteInfo,然后更新producer和consumer的topic信息

3.3cleanOfflineBroker:清除离线broker

如果你有好的win10资讯或者win10教程,以及win10相关的问题想要获得win10系统下载的关注与报道。
欢迎加入发送邮件到657025171#qq.com(#替换为@)。期待你的好消息!