Producer的用途大家都很清楚,主要就是生产消息,那么分布式模式下与单队列模式不一样,如何能够充分利用分布式的优势,将生产的消息分布到不同的队列下呢?Rocket-MQ提供了3种不同模式的Producer:
1. NormalProducer
2. OrderProducer
3. TransactionProducer
1. TopicPublicInfo:仅仅是Producer使用,保存了messageQueue列表
2.MessageQueue:公用组件,保存了Queue的标识信息,<topic、brokerName、queueId>
3.QueueData:包含了Queue的brokerName,queue的读写数目等信息
4.BrokerData:包含了broker的名字,IP,等信息
根据<clientId(IP@instanceId), group>每一个DefaultMQProducer有2层的分发:
首先,根据不同的clientId,MQClientManager将给出不同的MQClientInstance;
其次,根据不同的group,MQClientInstance将给出不同的MQProducer和MQConsumer
DefaultMQProducer: 主要保存了配置信息,包括clientId,instantName和groupName,以及发送的网络参数等;
DefaultMQProducerImpl:主要保存了<topic,TopicPublishInfo>信息,SendMessageHook、CheckForbiddenHook、RPCHook以及MQClientInstance信息;
MQClientManager:保存了<clientId,MQClientInstance>信息,clientId(IP@instanceName);
MQClientInstance:保存了producerTable、consumerTable,adminExtTable,NettyClientConfig,<Topic,TopicRouteData>,<brokerName, <brokerId, address>>,RebalanceService, ClientRemotingProcessor, ConsumerStatsManager等信息;
这三种处理主要是select broker queue的方法不一样
1. NormalProducer:普通模式;这里的select调用TopicPublishInfo的selectOneMessageQueue,他的机制如下:正常情况下,顺序选择TopicPublishInfo中messageQueueList的queue进行发送;如果某一个节点发生了超时,则下次选择queue时,跳过相同的brokerName;
2. OrderProducer:顺序模式;假设相同订单号的支付,退款需要放到同一个队列,那么就可以在send的时候,自己实现MessageQueueSelector,根据里面的arg字段来选择queue
3. TransactionProducer:
1. 首先获取进程pid,将它作为instanceName,系统中所有相同instanceName的Producer和Consumer共用同一个MQClientInstance
2. 将Consumer或者Producer注册到MQClientInstance
3. 调用MQClientInstance.start,启动如下定时任务
3.1 fetchNameServerAddr:不断遍历nameSrv的地址
3.2 updateTopicRouteInfoFromNameServer:从NameSrv遍历TopicRouteInfo,然后更新producer和consumer的topic信息
3.3cleanOfflineBroker:清除离线broker