后浪笔记一零二四

1jetstream理论篇

NATS JetStream的设计目标

  • The system must be easy to configure and operate and be observable.

    系统必须容易配置,易于操作,易于被发现或者被观察。

  • The system must be secure and operate well with NATS 2.0 security models.

    系统必须是安全的,并且基于NATS2.0的安全模型能够良好的运行。

  • The system must be scale horizontally and be applicable to a high ingestion rate.

    系统必须支持水平扩展,支持高吞吐。

  • The system must support multiple use cases.

    系统必须支持多种用户场景。

  • The system must self-heal and always be available.

    系统必须能自我修复,并且保持高可用

  • The system must have an API that is closer to core NATS.

    系统必须和我们前面介绍的这个NATS部分的API保持一致。

  • The system must allow NATS messages to be part of a stream as desired. The system must display payload agnostic behavior.

    系统必须能够复用Core NATS,之前的一些这个,消息,并且具备系统这种负载无关的特性。

  • The system must not have third party dependencies.

    系统不能依赖第三方软件

JetStream

JetStream是NATS内置的分布式持久化存储系统(这样理解非常重要),构建在JetStream存储系统上的“core NATS”功能更多,消息可靠性更高。

+---------+                     +---->{subscriber}
|publisher|-------->(exchange)--+---->{subscriber}
+---------+             |       +---->{subscriber}
                        |
                        v
                  +------------+
+---------+       |JetStream   |------->|pull    |
|publisher|--pub->|(distributed|<--ack--|consumer|
|         |<-ack--|persistence |
+---------+       |system)     |-------->|push    |-------->|subs- |
                  +------------+         |consumer|<--ack---|criber|

JetStream是一套分布式的存储系统,主要用来存储消息,实现发布者和消费者之间的这种解耦。

NATS在JetStream之上,构建了一套完整的消息系统,消息发布者可以直接调用JetStream的消息发布接口发布消息,同时也可以根据JetStream订阅的主题,截获NATS原有的消息进行存储。

消息的消费端,有这种拉的模式和推的模式,用了JetStream之后,我们建议还是以这种拉的模式来消费消息。

推的模式和上面的消息处理方式一样,如果有消费者在线,JetStream就会将消息推送到消费者定义的这个主题里。我们在创建消费者的时候,我们就会看到他会有个target参数会专门来负责定义这个主题的名称。 同时,我的订阅者消费成功之后,会回复ack的确认消息,这样JetStream就不会再一次推送消息。如果没有收到确认的话,服务端会再次推送。真正实现了所谓的消息必达的这种功能。

创建stream

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
$ nats stream add COURSES(名称) --subjects(订阅的主题) "COURSES.*" --storage(存储类型:file/memory) file \
        --retention(持久化的方式:limits/interest/workq) limits \
        --discard=old(Stream消息数量达到limits限制数量后,消息如何处理) \
        --max-msgs=-1(最大消息条数,-1为无限制) \
        --max-msgs-per-subject=-1(每一个主题最大消息条数,-1为无限制,超过会删除老的消息,如果discard参数为old的话) \
        --max-bytes=-1(允许存储的所有消息的大小,-1为无限制) \
        --max-age=1y(消息存储的最长时间,超时会自动删除, y,s,m,h) \
        --max-msg-size=-1(单条消息允许的大小,-1为无限制) \
        --dupe-window=2m0s(根据Msg-Id的header头信息判断唯一消息的时间) \
        --allow-rollup(是否允许通过header“卷起”消息--no-allow-rollup) \
        --deny-delete(是否允许删除消息,--no-deny-delete) \
        --deny-purge(是否允许清空stream消息, --no-deny-purge) \
        --replicas=1(在集群状态下非常有用,就是我们整个消息副本的数量) \

# 创建stream所涉及到的各种参数:
# 1. 订阅的主题:例如,以COURSES开头的消息,我都会把它存储到stream里。
# 2. 存储类型:
# 3. 持久化方式: limits, 可以对这个stream做很多的限制条件,例如存储的消息数量、大小等; 
#                interest, 已经有消费者的时候,才会存储这些消息,如果没有消费者,我就不存储这消息;
#                workq, 就是队列的模式,消息进来的时候,我以队列形式存储,队列满了我可以把老的给删了。
# 4. discard丢弃消息的策略:old, 如果消息满了,把老的消息直接删了;
#                         new,如果消息满了,就不再接收新消息了。
# 5. allow-rollup: 假设我给这个主题连发了9条消息,然后我第10条消息,我加了一个header,
#                  可以把前面9条消息卷起来,相当于“移除”,用第10条消息来代表前面的消息。

创建Consumer

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
$ nats consumer add COURSES(stream名称) beijing-region(Consumer名称) --filter COURSE.created(过滤stream中的消息) \
        --deliver all(如何处理stream中的消息,见下表) \
        --ack explicit(消息确认方式: none/all/explicit) \
        --replay=instant(消息回放机制:instant/original) \
        --max-deliver=-1(消息投递的最大次数,-1表示没有正确的ack就不停止投递消息) \
        --max-pending=0(最多允许存在多少条正在投递但未投递成功的消息,如果到达这个数,将不再投递新消息) \
        --headers-only(是否只投递Header消息) \
        --target "beijing.course.created"(在“推送”模式下,消息实时发送到目标NATS主题,可以作为普通消息订阅beijing.course.created主题,pull类型的consumer该值为空)

# 1. 消息确认方式:none不需要确认;
#                 all,比如说我拉了10条消息过来,我只需要确认最后一条,那就相当于前面的所有消息都确认过了;
#                 explicit,我每一条消息都要作ack的这个确认,消息处理成功的回复。
# 2. 消息回放机制:instant,立即消费,我创建完这个consumer就可以立即消费所有消息;
#                 original,按照stream收到消息的这个频率来推送消息给我
DeliverPolicy 指定如何处理stream里的消息
DeliverAll 默认配置,consumer需要处理从开始的所有消息
DeliverLast 从处理流中的最后一条消息开始
DeliverLastPerSubject 第一次使用消息时,从当前流中每个主题的最后一条消息开始
DeliverNew 只处理consumer创建后的新消息,历史消息就不处理了
DeliverByStartSequence 从指定的sequence开始处理
DeliverByStartTime 从指定的时间开始处理

抽象出一层Consumer主要是为了解决相同的stream,不同的consumer消费方式不一样的场景。

在实际业务中,一般情况stream是提前设计规划好的,但consumer大多数情况是由具体的服务来创建的,当然也要根据你具体的业务来决定,如果业务单一,也可以提前nats命令创建好,程序逻辑就不需要创建consumer了。

详解ack

消费端:客户端消费完消息之后,给消息系统回复一条ack的确认信息,那消息系统就不会再次推送。如果我不回复ack确认我的消费已经完成的话,它还会再次推送。

发送端:客户端往jetstream系统写入消息的时候,它也会给这个写消息的客户端回复一条ack消息,确保这条消息百分百已经进行了持久化存储。


专题:

本文发表于 2022-01-02,最后修改于 2022-01-02。

本站永久域名「 jiavvc.top 」,也可搜索「 后浪笔记一零二四 」找到我。


上一篇 « 4主题(Subject)命名及通配符的使用 下一篇 » 2jetstream实操篇

赞赏支持

请我吃鸡腿 =^_^=

i ysf

云闪付

i wechat

微信

推荐阅读

Big Image