-
添加RockemtMQ docker环境镜像。
-
developing
- 优化用户使用,去掉必须实现的接口以及方法
- 用户自定义方法,实现动态参数注入
- 重构代码
- 支持jdk版本为1.8
- SpringBoot版本为1.5.6
- maven仓库地址新aliyun maven
- 构建项目
mvn install
- 添加依赖:
<dependency>
<groupId>org.shieldproject.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.0-RELEASE</version>
</dependency>
- 添加必要配置application.yml
spring:
mq:
rocket:
config:
namesrv-addr: 192.168.2.26:9876
instance-name: test
config配置可以参考rocketMQ自带的config配置进行补充。
-
启用MQConfig
在Application class头部添加@EnableMQConfiguration注解,使其框架自动处理messageProducer以及messageConsumer。
@SpringBootApplication
@EnableRocketMQConfiguration
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
- 配置MQConsumer listener
方法参数选配:框架自动识别参数进行注入
- String msg 注入单条消息,produce发送的单条消息
- List<String> msgs 注入多条消息,produce发送的是多条消息
- ConsumeConcurrentlyContext 注入ConsumeConcurrentlyContext如果当前消费模式是Concurrently
- ConsumeOrderlyContext 注入ConsumeConcurrentlyContext如果当前消费模式是Concurrently
返回值选配
- void 如果无需处理事务回滚操作(自动处理事务)
- ConsumeConcurrentlyStatus 如果消费模式是concurrently
- ConsumeOrderlyStatus 如果消费模式是Orderly
@Component
public class MessageListener {
@RocketMQListener(instance = "testfor1",
topic = "test", tags = "test",
consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET)
public void consumeMessage(String msgs) {
System.out.println(msgs + System.currentTimeMillis());
}
}
- 注入MessageProducer
public class ProducerController {
@Autowired
MQProducer mqProducer;
@GetMapping("/pro")
public String msg(String content) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
mqProducer.send(new Message("test", "test", content.getBytes(Charset.forName("UTF-8"))));
return content;
}
}