📄 整合 RocketMQ 使用
内部资料,请刷新扫码登录
pigcloud
# 环境说明
环境组件 | 版本 | 备注 |
---|---|---|
RocketMQ | 5.3.0 | |
PigX | 5.5 | |
JDK | 17 | 分支:jdk17 |
# 安装 RocketMQ
- 下载脚本文件
git clone https://github.com/pig-mesh/rocketmq-docker-compose.git
- 执行启动 RocketMQ
docker compose up -d
- 创建临时 topic
docker exec -it rmqbroker bash
sh mqadmin updatetopic -t TestTopic -c DefaultCluster
# 代码整合
- 目标服务增加 RocketMQ 依赖 jar
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-v5-client-spring-boot</artifactId>
<version>2.3.1</version>
</dependency>
- Nacos 对应的服务配置文件增加链接相关信息
rocketmq:
producer:
topic: TestTopic
endpoints: 127.0.0.1:8081
tag: '*'
push-consumer:
endpoints: 127.0.0.1:8081
tag: '*'
- 配置队列监听消费者
@Slf4j
@Service
@RocketMQMessageListener(topic = "TestTopic", consumerGroup = "test-group", tag = "*", filterExpressionType = "tag")
public class MyConsumer1 implements RocketMQListener {
@Override
public ConsumeResult consume(MessageView messageView) {
ByteBuffer body = messageView.getBody();
String message = StandardCharsets.UTF_8.decode(body).toString();
log.info("received message: {}", message);
return ConsumeResult.SUCCESS;
}
}
- 测试消息发送
@SpringBootTest
class DemoApplicationTests {
@Autowired
private RocketMQClientTemplate rocketMQTemplate;
@Test
void contextLoads() {
rocketMQTemplate.convertAndSend("TestTopic", "Hello, World!");
}
}
# 特殊说明
- 在 MQ 消费监听逻辑中调用 Feign , 参考 :pigx token 传递及 feign 调用中的无token 调用
A 服务并没有 token 去请求 B 服务,pigx 也对这种情况进行了兼容。类似于 A 对外暴露 API,但是又安全限制。参考日志插入情况
FeignClient 需要带一个请求 token,FROM_IN 声明是内部调用
remoteLogService.saveLog(sysLog, SecurityConstants.FROM_IN);
目标接口对内外调用进行限制 @Inner 注解,这样就避免接口对外暴露的安全问题。只能通过内部调用才能使用,浏览器不能直接访问该接口
@Inner
@PostMapping
public R save(@Valid @RequestBody SysLog sysLog) {
return new R<>(sysLogService.save(sysLog));
}
- 在 MQ 消费监听逻辑中调用数据库查询需要手动指定租户编号查询指定租户的信息,不然均是查询的租户 1
TenantContextHolder.set(id);
- 在 MQ 消费监听逻辑中不能使用数据权限等