后浪笔记一零二四

QoS 服务质量:

At most once QoS: Core NATS 提供最多一次的服务质量。如果订阅者没有订阅主题(没有主题匹配),或者在发送消息时未处于活动状态,则不会收到消息。这与TCP/IP提供的保证级别相同。Core NATS是一个即发即弃的消息传递系统。它只会将消息保存在内存中,并且永远不会将消息直接写入磁盘。

At-least / exactly once QoS: 如果您需要更高质量的服务(至少一次且恰好一次),或者诸如持久化消息,实现发布者与订阅者的解耦,键/值存储等功能,您可以使用NATS Jetstream,它内置在NATS服务器中(但需要启用)。

如何实现exactly once

要实现这种,确保百分百是一次消费,那我们要主要两个地方,一个是消息的发送端,可能会由于网络等各种原因导致发送多次的情况,那我们怎么样来避免不发送多次;另外在接收端怎么样来确保消息不会被再次消费。这个时候就需要使用到jetstream的队列模式。

jetstream的队列模式,消费端消费完,它就直接从stream里把它给移除了。这就可以保证消费端是一次消费。

在发送端我们可能要用到Header,确保在指定的时间窗口内,我如果多发几次,它也是只做一条消息来处理。

  1. 创建队列模式的stream
$ nats stream add
? Stream Name: QTEST
? Subjects to consume: qtest.*
? Storage backend: file
? Retention Policy: Work Queue
? Discard Policy: New
? Stream Messages Limit: 10
? Per Subject Message Limit: -1
? Stream size limit: -1
? Maximum message age limit: -1
? Maximum individual message size: -1
? Duplicate tracking time window: 2m0s
? Allow message Roll-ups: Yes
? Allow message deletion: Yes
? Allow purging subjects or the entire stream: Yes
? Replicas: 1
  1. 创建消费者
$ nats con add QTEST cjCon
? Delivery target (empty for Pull Consumers)
? Start policy (all, new, last, subject, 1h, msg sequence): all
? Acknowledgement policy: explicit
? Replay policy: instant
? Filter Stream by subject (blank for all): qtest.cj
? Maximum Allowed Deliveries: -1
? Maximum Acknowledgements Pending: 0
? Deliver headers only without bodies: No
? Add a Retry Backoff Policy: No
  1. 使用命令来查看stream的详情:
$ nats stream info QTEST
  1. 往stream中发消息
$ nats pub qtest.cj hello0

由于这个stream 队列的长度为10,所以装满10条之后,再往里面发消息,会报“maximum messages exceeded”的错误。

  1. 消费消息
$ nats con next QTEST cjCon

消费了队列的消息之后,这条消息就从stream中移除了,这就确保了每条消息只能被消费一次。

$ nats con next QTEST cjCon --no-ack

如果不做ack的确认,这条消息就不会被stream移除。

如何防止客户端误操作,多次写入同一条消息

func main() {
  nc, _ := nats.Connect("localhost:4222")
  defer nc.Close()
  js, _ := nc.JetStream()
  header := nats.Header{}
  header.Add("Nats-Msg-Id", "1")
  msg := nats.Msg{Subject: "qtest.cj", Header: header, Data: []byte("hello11")}
  _, err := js.PublishMsg(&msg)
  if err != nil {
    log.Println(err)
    return
  }
  log.Println("发送成功")
}

我们只需要在Header头里,加上个msg id的标识,然后我们就可以推这个消息。

多次推的消息,如果msg id相同,jetstram就会认为这是同一条消息,只接受第一次接受到的消息,并拒绝后面的消息。

这就能够防止在客户端,比如说网络等各种原因出错,它在重试,就会多发消息的情况。那NATS会帮你处理,就是相同Header它认为就是一条。当然,这个时间就是刚才我们在创建stream的时候限制的2分钟之内,2分钟之内,如果这个Header一直是1,那它就是认为就是同一条。

如何把消息卷起来

$ nats pub qtest.cj hello0
$ nats pub qtest.cj hello15
$ nats pub qtest.cj hello16

现在假设我,发送第四条消息的时候,我想把前面,在我之前发送的所有消息都清掉。

也可以通过配置一个Header来实现这个:

func main() {
  nc, _ := nats.Connect("localhost:4222")
  defer nc.Close()
  js, _ := nc.JetStream()
  header := nats.Header{}
  header.Add{"Nats-Rollup", "all"}
  msg := nats.Msg{Subject: "qtest.cj", Header: header, Data: []byte{"hello11-16"}}
  _, err := js.PublishMsg(&msg)
  if err != nil {
    log.Println(err)
    return
  }
  log.Println("发送成功")
}

执行上面的代码之后,这个stream就只剩"hello11-16"这一条消息了。


本文发表于 0001-01-01,最后修改于 0001-01-01。

本站永久域名「 jiavvc.top 」,也可搜索「 后浪笔记一零二四 」找到我。


上一篇 « 下一篇 »

赞赏支持

请我吃鸡腿 =^_^=

i ysf

云闪付

i wechat

微信

推荐阅读

Big Image