文章目录
1、环境准备
pom引入依赖
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.1.1</version> </dependency>
application.properties中增加mq的配置内容
rocketmq.name-server=http://localhost:9876 rocketmq.producer.group=ysx-group rocketmq.producer.send-message-timeout= 30000 rocketmq.producer.access-key= rocketmq.producer.secret-key=
2、生产者
普通消息
普通消息无返回值,只负责发送消息⽽不等待服务器回应且没有回调函数触发
@Resource RocketMQTemplate rocketMQTemplate; @RequestMapping("mq/template/send") public void sendMessage1(){ rocketMQTemplate.convertAndSend("ysx-topic","this is a template message"); }
同步消息
同步消息有返回值SendResult,等到消息发送成功后才算结束。
@RequestMapping("mq/template/send/sync") public SendResult sendMessage2(){ SendResult sendResult = rocketMQTemplate.syncSend("ysx-topic", "this is a template sync message"); return sendResult; }
syncSend()最终也是调用的producer.send()。
public SendResult syncSend(String destination, Message<?> message, long timeout, int delayLevel) { ... SendResult sendResult = producer.send(rocketMsg, timeout); ... }
异步消息
异步消息无返回值,需要传入回调类。无需等待消息是否发送成功。
@RequestMapping("mq/template/send/async") public void sendMessage3(){ rocketMQTemplate.asyncSend("ysx-topic", "this is a template async message", new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println("send success:"+sendResult); } @Override public void onException(Throwable throwable) { System.out.println("send fail:"+throwable.getMessage()); } }); }
同理,最终调用的也是producer的方法。
public void asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout, int delayLevel) { ... producer.send(rocketMsg, sendCallback, timeout); ... }
3、消费者
(1)基础示例
需要注意:
@Component交由Spring容器接管
实现RocketMQListener接口,使用@RocketMQMessageListener注册监听,需指定消费者组和Topic。
@Component @RocketMQMessageListener(consumerGroup = "ysx-consumer-group", topic = "ysx-topic") public class MQConsumerListener implements RocketMQListener<String> { @Override public void onMessage(String s) { System.out.println("consume message:"+s); } }
(2)顺序消费
一个Topic下可能会有多个消息队列queue,在发送消息时可以指定要发送到哪个消息队列中。
生产消息:
@RequestMapping("mq/template/send/order") public void sendMessageOrderly(){ rocketMQTemplate.setMessageQueueSelector(new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> list, Message message, Object o) { //可以自定义规则,取list的第几个,这里取第一个 return list.get(1); } }); rocketMQTemplate.syncSendOrderly("order-topic","this is order message","hashKey001"); }
消费消息:
可通过ConsumeMode.ORDERLY进行配置。
consumeThreadMax 消费最大线程
@Component @RocketMQMessageListener(consumerGroup = "order-topic-consumer", topic = "order-topic",consumeMode = ConsumeMode.ORDERLY,consumeThreadMax = 1) public class MQConsumerListenerOrder implements RocketMQListener<String> { @Override public void onMessage(String s) { System.out.println("consumer group order message:"+s); } }
其余配置如下:
public @interface RocketMQMessageListener { String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}"; String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}"; String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}"; String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.consumer.customized-trace-topic:}"; String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}"; /** * Consumers of the same role is required to have exactly same subscriptions and consumerGroup to correctly achieve * load balance. It's required and needs to be globally unique. * * * See <a href="http://rocketmq.apache.org/docs/core-concept/">here</a> for further discussion. */ String consumerGroup(); /** * Topic name. */ String topic(); /** * Control how to selector message. * * @see SelectorType */ SelectorType selectorType() default SelectorType.TAG; /** * Control which message can be select. Grammar please see {@link SelectorType#TAG} and {@link SelectorType#SQL92} */ String selectorExpression() default "*"; /** * Control consume mode, you can choice receive message concurrently or orderly. */ ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY; /** * Control message mode, if you want all subscribers receive message all message, broadcasting is a good choice. */ MessageModel messageModel() default MessageModel.CLUSTERING; /** * Max consumer thread number. */ int consumeThreadMax() default 64; /** * Maximum amount of time in minutes a message may block the consuming thread. */ long consumeTimeout() default 15L; /** * The property of "access-key". */ String accessKey() default ACCESS_KEY_PLACEHOLDER; /** * The property of "secret-key". */ String secretKey() default SECRET_KEY_PLACEHOLDER; /** * Switch flag instance for message trace. */ boolean enableMsgTrace() default true; /** * The name value of message trace topic.If you don't config,you can use the default trace topic name. */ String customizedTraceTopic() default TRACE_TOPIC_PLACEHOLDER; /** * The property of "name-server". */ String nameServer() default NAME_SERVER_PLACEHOLDER; /** * The property of "access-channel". */ String accessChannel() default ACCESS_CHANNEL_PLACEHOLDER; }
(2)消费模式
RocketMQ消费模式有两种:集群模式和广播模式。
a.集群模式
【默认模式】
针对同一个ConsumerGroup中,同一个消息只有一个消费者消费即可。
针对不同的ConsumerGroup中,同一个消息,每个消费组中都有一个消费者需要消费。
示例:
一个生产者组,其中有两个生产者。
两个消费者组 groupA、groupB,每组中有两个消费者: groupA-1 groupA-2 groupB-1 groupB-2。
使用任一生产者发送消息,每个消费者组都能收到消息。每组只需有一个消费者收到并消费即可。
consumer group a-1 message:this is a template sync message
consumer group b-1 message:this is a template sync message
consumer group a-2 message:this is a template sync message
consumer group b-2 message:this is a template sync message
b.广播模式
同一个消息,每个消费者都需要消息。类似发布-订阅模式。
消费模式,可通过MessageModel进行配置。
public enum MessageModel { BROADCASTING("BROADCASTING"), CLUSTERING("CLUSTERING"); private final String modeCN; MessageModel(String modeCN) { this.modeCN = modeCN; } public String getModeCN() { return this.modeCN; } }
将消费者组groupA中的两个消费者改为广播模式,其余保持不变。
@Component @RocketMQMessageListener(consumerGroup = "consumer-group-a", topic = "message-mode-topic",messageModel = MessageModel.BROADCASTING) public class MQConsumerListenerA1 implements RocketMQListener<String> { @Override public void onMessage(String s) { System.out.println("consumer group a-1 message:"+s); } } @Component @RocketMQMessageListener(consumerGroup = "consumer-group-a", topic = "message-mode-topic",messageModel = MessageModel.BROADCASTING) public class MQConsumerListenerA1 implements RocketMQListener<String> { @Override public void onMessage(String s) { System.out.println("consumer group a-2 message:"+s); } }
使用任一生产者发出消息,groupA中的两个消费者都可以收到消息,groupB中的随机一个消费者也可以收到消息。
consumer group b-1 message:this is a template sync message
consumer group a-1 message:this is a template sync message
consumer group a-2 message:this is a template sync message
4、accessKey和secretKey
类似访问的用户名和密码。在acl项目中,以plain_acl.yml为例,配置了用户权限。
accounts: - accessKey: RocketMQ secretKey: 12345678 whiteRemoteAddress: 192.168.0.* admin: false defaultTopicPerm: DENY defaultGroupPerm: SUB topicPerms: - topicA=DENY - topicB=PUB|SUB - topicC=SUB groupPerms: # the group should convert to retry topic - groupA=DENY - groupB=SUB - groupC=SUB - accessKey: rocketmq2 secretKey: 12345678 whiteRemoteAddress: 192.168.1.* # if it is admin, it could access all resources admin: true
启用aksk
在broker.conf中,配置aclEnable= true。
项目中使用
如果启用accessKey和secretKey,需要在创建DefaultMQProducer时传入。
//生产者 DefaultMQProducer producer = new DefaultMQProducer("ysx-group", new AclClientRPCHook(new SessionCredentials( "accessKey", "secretKey")), true,""); //消费者 DefaultMQPushConsumer consumer =new DefaultMQPushConsumer("ysx-acl-consumer-group",new AclClientRPCHook(new SessionCredentials("accessKey", "secretKey")), new AllocateMessageQueueAveragely());
如果key配置正确,可正常发送/消费消息。否则会报错,如下所示:
org.apache.rocketmq.client.exception.MQClientException: Send [3] times, still failed, cost [19]ms, Topic: ysx-topic-acl, BrokersSent: [broker-a, broker-a, broker-a] Caused by: org.apache.rocketmq.client.exception.MQBrokerException: CODE: 1 DESC: org.apache.rocketmq.acl.common.AclException: No accessKey is configured, org.apache.rocketmq.acl.plain.PlainPermissionManager.validate(PlainPermissionManager.java:636)
Be First to Comment