Springboot 2 集成 Nsq 消息中间件实现生产消费应用

  |     |  

什么是NSQ?

NSQ是一个实时分布式消息传递平台。具体特性请前往Nsq官网查看了解,下面是我平常在SpringBoot项目对Nsq使用的一些技巧。

简介

Nsq集成Java可基于JavaNSQClient实现,该jar包帮助我们封装了操作nsq的一些api。
下面我们构建两个SpringBoot测试项目,分别为SpringBoot-Nsq-Consumer,SpringBoot-Nsq-Poducer,
实现基于Nsq消息中间件的简单生产消费应用。

消息生产者

  1. 配置product 的 nsq信息,如果是集群版的,可请求http://ip:4161/nodes,获取可用节点,在添加进去。
    1
    2
    3
    nsq.topic=nimeio
    nsq.address=120.79.12.138
    nsq.port=4150
  2. 初始化jar包提供的NSQProduct,
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    @Configuration
    public class NsqProductConfiguration {
    @Value("${nsq.topic}")
    String topic;
    @Value("${nsq.address}")
    String nsqAddress;
    @Value("${nsq.port}")
    Integer nsqPort;

    @Bean
    public NSQProducer nsqProducer() {
    NSQProducer nsqProducer = new NSQProducer();
    nsqProducer.addAddress(nsqAddress, nsqPort);
    nsqProducer.start();
    return nsqProducer;
    }
    }
    由于Java Nsq Client 生产者不能选择基于特定的Channel接收消费,日常我在项目中都是自己为Topic制定特定传输格式,
    在通过逻辑判断指定特定的Consumer Channel消费处理。
    如下图:

设计的消息体如下,有一个特定id,action(相当于Channel)以及消息内容body,此处只是简单示例,有特殊要求可自行改善:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class NsqMessage implements Serializable {

@SerializedName("id")
@JsonProperty("id")
private Long id;

/**
* 相当于nsq消费者的channel名称
*/
@SerializedName("action")
@JsonProperty("action")
private String action;

@SerializedName("body")
@JsonProperty("body")
private String body;

public Long getId() {
return id;
}

public void setId(Long id) {
this.id = id;
}

public String getAction() {
return action;
}

public void setAction(String action) {
this.action = action;
}

public String getBody() {
return body;
}

public void setBody(String body) {
this.body = body;
}
}

接下来,测试发送消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public String sendTestMessageByChannel(String body) {
try {
NsqMessage message = new NsqMessage();
message.setId(UUID.randomUUID().getLeastSignificantBits());
message.setAction("testChannel");
message.setBody(body);
String msg = JSONParseUtils.object2JsonString(message);
nsqProducer.produce(topic, msg.getBytes());
log.info("消息发送特定Channel成功!时间:" + new Date());
return "Success";
} catch (NSQException e) {
log.error("nsq 连接异常!msg={}", e.getMessage());
return "Error:" + e.getMessage();
} catch (TimeoutException e) {
log.error("nsq 发送消息超时!msg={}", e.getMessage());
return "Error:" + e.getMessage();
} catch (Exception e) {
log.error("出现未知异常!", e);
return "Error:" + e.getMessage();
}
}

消息生产者到这里就结束了,下面构建一个消息消费者:

消息消费者

同样的,消费的配置如下,注意要与生产者的Topic相同,并且由于消费者需要通过nsqlookupd发现生产服务,
所以使用nsqlookupd的端口4161(默认的,有改的需注意更换):

1
2
3
4
server.port=8081
nsq.topic=nimeio
nsq.address=120.79.12.138
nsq.port=4161

消费消息逻辑如下,主要在于判断是否为特定Channel,如不是就确定即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public void mqConsumer() {
NSQLookup lookup = new DefaultNSQLookup();
lookup.addLookupAddress(nsqAddress, nsqPort);
NSQConsumer consumer = new NSQConsumer(lookup, topic, NsqChannelConst.DEFAULT_CHANNEL, (message) -> {
if (message != null) {
String msg = new String(message.getMessage());
NsqMessage nsqMessage = null;
try {
nsqMessage = JSONParseUtils.json2Object(msg, NsqMessage.class);
} catch (Exception e) {
log.error("消息无法转换,存在问题");
message.finished();
return;
}
if (nsqMessage == null) {
message.finished();
log.error("消息为空,瞎发的消息,确认即可");
message.finished();
return;
}
if (!NsqChannelConst.DEFAULT_CHANNEL.equals(nsqMessage.getAction())) {
// 如果nsq消息体中的action不等于当前的chanel名称,说明不是当前消费者需要处理的数据,确认消费即可
message.finished();
return;
}
try {
log.info("消费特定消息: " + nsqMessage.getBody());
//确认消息
message.finished();
return;
} catch (Exception e) {
//异常,重试
message.requeue();
}
return;
}
message.finished();
return;
});
//默认是启动一次开启200个线程,处理200个任务,实际用不了那么多,根据需求配置就行了。
consumer.setExecutor(Executors.newFixedThreadPool(nsqThreadCount));
consumer.setMessagesPerBatch(nsqThreadCount);
consumer.start();
log.info("nsq 消费者启动成功!");
}

具体代码以上传至GitHub:
https://github.com/liaozihong/SpringBoot-Learning/tree/master/SpringBoot-Nsq-Consumer
https://github.com/liaozihong/SpringBoot-Learning/tree/master/SpringBoot-Nsq-Producer

参考链接:
NSQ官方文档
NSQ相关curl API
JAVANSQClient
https://zhuanlan.zhihu.com/p/37081073

文章目录
  1. 1. 什么是NSQ?
    1. 1.1. 简介
    2. 1.2. 消息生产者
    3. 1.3. 消息消费者
作者共写了53.1k个字 本站总访问量  |   您是访问本站的第个小伙伴