QoS 服务质量:
At most once QoS: Core NATS 提供最多一次的服务质量。如果订阅者没有订阅主题(没有主题匹配),或者在发送消息时未处于活动状态,则不会收到消息。这与TCP/IP提供的保证级别相同。Core NATS是一个即发即弃的消息传递系统。它只会将消息保存在内存中,并且永远不会将消息直接写入磁盘。
At-least / exactly once QoS: 如果您需要更高质量的服务(至少一次且恰好一次),或者诸如持久化消息,实现发布者与订阅者的解耦,键/值存储等功能,您可以使用NATS Jetstream,它内置在NATS服务器中(但需要启用)。
如何实现exactly once
要实现这种,确保百分百是一次消费,那我们要主要两个地方,一个是消息的发送端,可能会由于网络等各种原因导致发送多次的情况,那我们怎么样来避免不发送多次;另外在接收端怎么样来确保消息不会被再次消费。这个时候就需要使用到jetstream的队列模式。
jetstream的队列模式,消费端消费完,它就直接从stream里把它给移除了。这就可以保证消费端是一次消费。
在发送端我们可能要用到Header,确保在指定的时间窗口内,我如果多发几次,它也是只做一条消息来处理。
- 创建队列模式的stream
$ nats stream add
? Stream Name: QTEST
? Subjects to consume: qtest.*
? Storage backend: file
? Retention Policy: Work Queue
? Discard Policy: New
? Stream Messages Limit: 10
? Per Subject Message Limit: -1
? Stream size limit: -1
? Maximum message age limit: -1
? Maximum individual message size: -1
? Duplicate tracking time window: 2m0s
? Allow message Roll-ups: Yes
? Allow message deletion: Yes
? Allow purging subjects or the entire stream: Yes
? Replicas: 1
- 创建消费者
$ nats con add QTEST cjCon
? Delivery target (empty for Pull Consumers)
? Start policy (all, new, last, subject, 1h, msg sequence): all
? Acknowledgement policy: explicit
? Replay policy: instant
? Filter Stream by subject (blank for all): qtest.cj
? Maximum Allowed Deliveries: -1
? Maximum Acknowledgements Pending: 0
? Deliver headers only without bodies: No
? Add a Retry Backoff Policy: No
- 使用命令来查看stream的详情:
$ nats stream info QTEST
- 往stream中发消息
$ nats pub qtest.cj hello0
由于这个stream 队列的长度为10,所以装满10条之后,再往里面发消息,会报“maximum messages exceeded”的错误。
- 消费消息
$ nats con next QTEST cjCon
消费了队列的消息之后,这条消息就从stream中移除了,这就确保了每条消息只能被消费一次。
$ nats con next QTEST cjCon --no-ack
如果不做ack的确认,这条消息就不会被stream移除。
如何防止客户端误操作,多次写入同一条消息
func main() {
nc, _ := nats.Connect("localhost:4222")
defer nc.Close()
js, _ := nc.JetStream()
header := nats.Header{}
header.Add("Nats-Msg-Id", "1")
msg := nats.Msg{Subject: "qtest.cj", Header: header, Data: []byte("hello11")}
_, err := js.PublishMsg(&msg)
if err != nil {
log.Println(err)
return
}
log.Println("发送成功")
}
我们只需要在Header头里,加上个msg id的标识,然后我们就可以推这个消息。
多次推的消息,如果msg id相同,jetstram就会认为这是同一条消息,只接受第一次接受到的消息,并拒绝后面的消息。
这就能够防止在客户端,比如说网络等各种原因出错,它在重试,就会多发消息的情况。那NATS会帮你处理,就是相同Header它认为就是一条。当然,这个时间就是刚才我们在创建stream的时候限制的2分钟之内,2分钟之内,如果这个Header一直是1,那它就是认为就是同一条。
如何把消息卷起来
$ nats pub qtest.cj hello0
$ nats pub qtest.cj hello15
$ nats pub qtest.cj hello16
现在假设我,发送第四条消息的时候,我想把前面,在我之前发送的所有消息都清掉。
也可以通过配置一个Header来实现这个:
func main() {
nc, _ := nats.Connect("localhost:4222")
defer nc.Close()
js, _ := nc.JetStream()
header := nats.Header{}
header.Add{"Nats-Rollup", "all"}
msg := nats.Msg{Subject: "qtest.cj", Header: header, Data: []byte{"hello11-16"}}
_, err := js.PublishMsg(&msg)
if err != nil {
log.Println(err)
return
}
log.Println("发送成功")
}
执行上面的代码之后,这个stream就只剩"hello11-16"这一条消息了。
本文发表于 0001-01-01,最后修改于 0001-01-01。
本站永久域名「 jiavvc.top 」,也可搜索「 后浪笔记一零二四 」找到我。