RedisMQ实现
# 发布过期通知
public enum PubType {
/**
* 发布计划
* 此次测试使用 需要新增通知类型需要自行添加
*/
PLAN("pub:plan"),
/**
* 到期通知
* 此次测试使用 需要新增通知类型需要自行添加
*/
EXPIRED_NOTICE("expired:notice"),
//.....
;
private final String prefix;
}
发布消息
PubType.PLAN.pub(key,10);
//或者
PubType.EXPIRED_NOTICE.pub(key,10);
处理消息
@Slf4j
//标注消息类型 【重要】
@RedisMessage(PubType.PLAN)
//注册为bean
@Component
public class PlanRedisHandler implements IRedisMessage {
@Override
public void exec(String key) {
//具体代码实现 TODO
log.info("类型:{},唯一标识:{} 执行过期操作",PubType.PLAN,key);
}
}
# pubsub
定义消息
/**
* 自定义消息 继承 AbstractChannelMessage
*/
public class StringChannel extends AbstractChannelMessage {
@Override
public String getChannel() {
return "pub-test";
}
}
定义消息生产者
@Component
public class StringChannelMessageProducer implements IMessageChannelProducer<StringChannel> {
@Resource
private RedisMQTemplate redisMQTemplate;
@Override
public void send(Object message) {
}
@Override
public void send(StringChannel message) {
redisMQTemplate.send(message);
}
}
定义消息消费者
@Component
public class StringChannelMessageConsumer extends AbstractChannelMessageListener<StringChannel> {
/**
* 处理消息
*
* @param message 消息
*/
@Override
public void onMessage(StringChannel message) {
log.info("消费 MQ Pub {},消息:{}", message.getChannel(), JsonUtils.toJson(message.getHeaders()));
}
}
# stream mq
定义消息
/**
* 自定义消息 继承 AbstractStreamMessage
*/
public class StringStreamChannel extends AbstractStreamMessage {
@Override
public String getStreamKey() {
return "pub-stream";
}
}
定义消息生产者
@Component
public class StringStreamMessageProducer implements IMessageStreamProducer<StringStreamChannel> {
@Resource
private RedisMQTemplate redisMQTemplate;
@Override
public void send(Object message) {
}
@Override
public RecordId send(StringStreamChannel message) {
return redisMQTemplate.send(message);
}
}
定义消息消费者
@Component
public class StringStreamMessageConsumer extends AbstractStreamMessageListener<StringStreamChannel> {
/**
* 处理消息
* @param message 消息
*/
@Override
public void onMessage(StringStreamChannel message) {
log.info("消费 MQ Stream Key {},消息:{}", message.getStreamKey(), JsonUtils.toJson(message.getHeaders()));
}
}
上次更新: 2024/11/05, 08:29:31