Press "Enter" to skip to content

RocketMQTemplate基本使用

本站内容均来自兴趣收集,如不慎侵害的您的相关权益,请留言告知,我们将尽快删除.谢谢.

 

文章目录

 

1、环境准备

 

pom引入依赖

 

<dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.1.1</version>
        </dependency>

 

application.properties中增加mq的配置内容

 

rocketmq.name-server=http://localhost:9876
rocketmq.producer.group=ysx-group
rocketmq.producer.send-message-timeout= 30000
rocketmq.producer.access-key=
rocketmq.producer.secret-key=

 

2、生产者

 

普通消息

 

普通消息无返回值,只负责发送消息⽽不等待服务器回应且没有回调函数触发

 

@Resource
    RocketMQTemplate rocketMQTemplate;
    
    @RequestMapping("mq/template/send")
    public void sendMessage1(){
 
        rocketMQTemplate.convertAndSend("ysx-topic","this is a template message");
    }

 

同步消息

 

同步消息有返回值SendResult,等到消息发送成功后才算结束。

 

@RequestMapping("mq/template/send/sync")
    public SendResult sendMessage2(){
 
        SendResult sendResult = rocketMQTemplate.syncSend("ysx-topic", "this is a template sync message");
        return sendResult;
    }

 

syncSend()最终也是调用的producer.send()。

 

public SendResult syncSend(String destination, Message<?> message, long timeout, int delayLevel) {
 
...
  SendResult sendResult = producer.send(rocketMsg, timeout);
...
}

 

异步消息

 

异步消息无返回值,需要传入回调类。无需等待消息是否发送成功。

 

@RequestMapping("mq/template/send/async")
    public void sendMessage3(){
 
        rocketMQTemplate.asyncSend("ysx-topic",
                "this is a template async message", new SendCallback() {
 
                    @Override
                    public void onSuccess(SendResult sendResult) {
 
                        System.out.println("send success:"+sendResult);
                    }
                    @Override
                    public void onException(Throwable throwable) {
 
                        System.out.println("send fail:"+throwable.getMessage());
                    }
                });
    }

 

同理,最终调用的也是producer的方法。

 

public void asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout,        int delayLevel) {
 
 ...            
    producer.send(rocketMsg, sendCallback, timeout);
...
}

 

3、消费者

 

(1)基础示例

 

需要注意:

@Component交由Spring容器接管
实现RocketMQListener接口,使用@RocketMQMessageListener注册监听,需指定消费者组和Topic。

@Component
@RocketMQMessageListener(consumerGroup = "ysx-consumer-group", topic = "ysx-topic")
public class MQConsumerListener implements RocketMQListener<String> {
 
    @Override
    public void onMessage(String s) {
 
        System.out.println("consume message:"+s);
    }
}

 

(2)顺序消费

 

一个Topic下可能会有多个消息队列queue,在发送消息时可以指定要发送到哪个消息队列中。

 

生产消息:

 

@RequestMapping("mq/template/send/order")
    public void sendMessageOrderly(){
 
        rocketMQTemplate.setMessageQueueSelector(new MessageQueueSelector() {
 
            @Override
            public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
 
                //可以自定义规则,取list的第几个,这里取第一个
                return list.get(1);
            }
        });
        rocketMQTemplate.syncSendOrderly("order-topic","this is order message","hashKey001");
    }

 

消费消息:

 

可通过ConsumeMode.ORDERLY进行配置。

 

consumeThreadMax 消费最大线程

 

@Component
@RocketMQMessageListener(consumerGroup = "order-topic-consumer", topic = "order-topic",consumeMode = ConsumeMode.ORDERLY,consumeThreadMax = 1)
public class MQConsumerListenerOrder implements RocketMQListener<String> {
 
    @Override
    public void onMessage(String s) {
 
        System.out.println("consumer group order message:"+s);
    }
}

 

其余配置如下:

 

public @interface RocketMQMessageListener {
 
    String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}";
    String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}";
    String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}";
    String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.consumer.customized-trace-topic:}";
    String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}";
    /**
     * Consumers of the same role is required to have exactly same subscriptions and consumerGroup to correctly achieve
     * load balance. It's required and needs to be globally unique.
     *
     *
     * See <a href="http://rocketmq.apache.org/docs/core-concept/">here</a> for further discussion.
     */    String consumerGroup();
    /**
     * Topic name.
     */    String topic();
    /**
     * Control how to selector message.
     *
     * @see SelectorType
     */    SelectorType selectorType() default SelectorType.TAG;
    /**
     * Control which message can be select. Grammar please see {@link SelectorType#TAG} and {@link SelectorType#SQL92}
     */    String selectorExpression() default "*";
    /**
     * Control consume mode, you can choice receive message concurrently or orderly.
     */    ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY;
    /**
     * Control message mode, if you want all subscribers receive message all message, broadcasting is a good choice.
     */    MessageModel messageModel() default MessageModel.CLUSTERING;
    /**
     * Max consumer thread number.
     */    int consumeThreadMax() default 64;
    /**
     * Maximum amount of time in minutes a message may block the consuming thread.
     */    long consumeTimeout() default 15L;
    /**
     * The property of "access-key".
     */    String accessKey() default ACCESS_KEY_PLACEHOLDER;
    /**
     * The property of "secret-key".
     */    String secretKey() default SECRET_KEY_PLACEHOLDER;
    /**
     * Switch flag instance for message trace.
     */    boolean enableMsgTrace() default true;
    /**
     * The name value of message trace topic.If you don't config,you can use the default trace topic name.
     */    String customizedTraceTopic() default TRACE_TOPIC_PLACEHOLDER;
    /**
     * The property of "name-server".
     */    String nameServer() default NAME_SERVER_PLACEHOLDER;
    /**
     * The property of "access-channel".
     */    String accessChannel() default ACCESS_CHANNEL_PLACEHOLDER;
}

 

(2)消费模式

 

RocketMQ消费模式有两种:集群模式和广播模式。

 

a.集群模式

 

【默认模式】

 

针对同一个ConsumerGroup中,同一个消息只有一个消费者消费即可。

 

针对不同的ConsumerGroup中,同一个消息,每个消费组中都有一个消费者需要消费。

 

示例:

 

一个生产者组,其中有两个生产者。

 

两个消费者组 groupA、groupB,每组中有两个消费者: groupA-1 groupA-2 groupB-1 groupB-2。

 

使用任一生产者发送消息,每个消费者组都能收到消息。每组只需有一个消费者收到并消费即可。

 

consumer group a-1 message:this is a template sync message

 

consumer group b-1 message:this is a template sync message

 

consumer group a-2 message:this is a template sync message

 

consumer group b-2 message:this is a template sync message

 

b.广播模式

 

同一个消息,每个消费者都需要消息。类似发布-订阅模式。

 

消费模式,可通过MessageModel进行配置。

 

public enum MessageModel {
 
    BROADCASTING("BROADCASTING"),
    CLUSTERING("CLUSTERING");
    private final String modeCN;
    MessageModel(String modeCN) {
 
        this.modeCN = modeCN;
    }
    public String getModeCN() {
 
        return this.modeCN;
    }
}

 

将消费者组groupA中的两个消费者改为广播模式,其余保持不变。

 

@Component
@RocketMQMessageListener(consumerGroup = "consumer-group-a", topic = "message-mode-topic",messageModel = MessageModel.BROADCASTING)
public class MQConsumerListenerA1 implements RocketMQListener<String> {
 
    @Override
    public void onMessage(String s) {
 
        System.out.println("consumer group a-1 message:"+s);
    }
}
@Component
@RocketMQMessageListener(consumerGroup = "consumer-group-a", topic = "message-mode-topic",messageModel = MessageModel.BROADCASTING)
public class MQConsumerListenerA1 implements RocketMQListener<String> {
 
    @Override
    public void onMessage(String s) {
 
        System.out.println("consumer group a-2 message:"+s);
    }
}

 

使用任一生产者发出消息,groupA中的两个消费者都可以收到消息,groupB中的随机一个消费者也可以收到消息。

 

consumer group b-1 message:this is a template sync message

 

consumer group a-1 message:this is a template sync message

 

consumer group a-2 message:this is a template sync message

 

4、accessKey和secretKey

 

类似访问的用户名和密码。在acl项目中,以plain_acl.yml为例,配置了用户权限。

 

accounts:
  - accessKey: RocketMQ
    secretKey: 12345678
    whiteRemoteAddress: 192.168.0.*
    admin: false
    defaultTopicPerm: DENY
    defaultGroupPerm: SUB
    topicPerms:
      - topicA=DENY
      - topicB=PUB|SUB
      - topicC=SUB
    groupPerms:
      # the group should convert to retry topic
      - groupA=DENY
      - groupB=SUB
      - groupC=SUB
  - accessKey: rocketmq2
    secretKey: 12345678
    whiteRemoteAddress: 192.168.1.*
    # if it is admin, it could access all resources
    admin: true

 

启用aksk

 

在broker.conf中,配置aclEnable= true。

 

项目中使用

 

如果启用accessKey和secretKey,需要在创建DefaultMQProducer时传入。

 

//生产者
 DefaultMQProducer producer = new DefaultMQProducer("ysx-group", new AclClientRPCHook(new SessionCredentials(
            "accessKey", "secretKey")), true,"");
//消费者
DefaultMQPushConsumer consumer =new DefaultMQPushConsumer("ysx-acl-consumer-group",new AclClientRPCHook(new SessionCredentials("accessKey", "secretKey")),
 new AllocateMessageQueueAveragely());

 

如果key配置正确,可正常发送/消费消息。否则会报错,如下所示:

 

org.apache.rocketmq.client.exception.MQClientException: Send [3] times, still failed, cost [19]ms, Topic: ysx-topic-acl, BrokersSent: [broker-a, broker-a, broker-a]  Caused by: org.apache.rocketmq.client.exception.MQBrokerException: CODE: 1 DESC: org.apache.rocketmq.acl.common.AclException: No accessKey is configured, org.apache.rocketmq.acl.plain.PlainPermissionManager.validate(PlainPermissionManager.java:636)

Be First to Comment

发表评论

您的电子邮箱地址不会被公开。