后浪笔记一零二四

1iot设备接入

边缘节点的部署

边缘节点对网络的要求不是很高,边缘节点只需要能够正常的访问我们远端的nats集群就可以了。

不需要有公网的入口,也就是不需要被集群访问,它是属于一个单向的连接。

一个边缘节点支持连接多个nats集群,可以配置本地的消息路由到多个远程的nats。

边缘节点使用本地的策略对客户端进行身份认证和授权,和远端的nats集群没有关系。

边缘节点和远端nats集群的通信,使用的是专门的边缘端口,默认是7422。

  1. 先给远端nats集群开启边缘端口7422:
leafnodes {
  port: 7422
  tls {
    handshake_first: true
    # other TLS fields...
  }
}
  1. 给边缘节点配置要连接的nats集群,可以配置多个
leafnodes {
  remotes [
    {
      urls: ["tls://example:7422"]
      credentials: "path of creds file"
      # 只和远端nats集群共享如下租户下的所有消息
      account: "ACPW7542CB3ZAHWBBJ2LRKSDJSOI5ASHHATPY6IBDBH3POD4FHO34EQ"
    }
  ]
}

边缘节点上的jetstream

Domain: Jestream划分域的配置,每一个域都是相互独立的,域之间可实现stream的镜像、消息聚集。

  • 边缘节点的domain可以镜像远端集群的domain到本地来。nats称之为“mirror”
  • 从所有的边缘节点,把消息汇聚到远端集群。nats称之为“source”
  • 这个主要是解决,如果远端集群和边缘节点之间的网络出现异常,在网络恢复之后,远端的消息可以正常同步到边缘节点,边缘节点的消息也可以把消息汇集到远端集群。

Operator:配置相同的operator,把系统管理的SYS租户打通。

  • 查看当前处于哪个operator:nsc env
  • 列出所有的operator: nsc list operators
  • 切换operator: nsc env -o operator-name
  • 列出所有的租户: nsc list accounts
  • 切换租户: nsc env -a account-name
  • 查看当前租户下有哪些用户:nsc list users

给边缘节点添加jetstream域,远端集群也是同样的方式添加:

jetstream {
    domain: hello
}

有了域,就可以执行mirros和source操作了。

查看所有的server信息:

之所以能同时查看到远端集群和边缘集群的信息,是因为远端集群和边缘集群用的是同一套sys.creds。

$ nats server ls --creds sys.creds文件的路径

我们先给远端的集群建一个stream,由于目前整个系统管理的sys这个租户是打通的,所以我只要能够连上我本地的这个服务,我就可以远程的给远端集群创建stream。

创建stream的时候,后面需要加一个--js-domain这个参数,就能指定在哪个边缘节点或者集群上去创建这个stream。

假设这里远端集群的域是mainbj,两个边缘节点的域分别是cjhome 和 cjhome2。

1.1 给远端集群创建stream:

$ nats stream add --js-domain mainbj
? Stream Name: cjtest
? Subject: cjtest.*
? Storage: file
? Replication: 1
? 后面参数全部是默认值

1.2 给本地的两个边缘节点创建镜像

# 注意,这里创建的stream是没有Subject的
$ nats stream add --mirror cjtest --js-domain cjhome
? Stream Name: cjtest-mr
? Storage: file
? Replication: 1
? Retention Policy: Limits
? Discard Policy: Old
? Stream Messages Limit: -1
? Total Stream Size: -1
? Message TTL: -1
? Max Message Size: -1
? Allow message Roll-ups: No
? Allow message deletion: Yes
? Allow purging subjects or the entire stream: Yes
? Adjust mirror start: No
? Import mirror from a different JetStream domain: Yes
? Foreign Jestream domain name: mainbj
? Delivery prefix: 默认值

# 注意,这里创建的stream是没有Subject的
$ nats stream add --mirror cjtest --js-domain cjhome2
? Stream Name: cjtest-mr
? Storage: file
? Replication: 1
? Retention Policy: Limits
? Discard Policy: Old
? Stream Messages Limit: -1
? Total Stream Size: -1
? Message TTL: -1
? Max Message Size: -1
? Allow message Roll-ups: No
? Allow message deletion: Yes
? Allow purging subjects or the entire stream: Yes
? Adjust mirror start: No
? Import mirror from a different JetStream domain: Yes
? Foreign Jestream domain name: mainbj
? Delivery prefix: 默认值

这个时候,给远端集群推送10条消息,可以发现两个边缘节点分别都有收到10条消息。

如果将边缘节点和远端节点的网络断开,这时候,给远端集群推送10条消息,过段时间恢复网络之后,可以发现两个边缘节点分别都有收到10条消息。

2.1. 给本地的两个边缘节点创建stream

$ nats stream add -s localhost:4222
? Stream Name: aggtest1
? Subject: cjhome.*
? Storage: file
? Replication: 1
? 后面参数全部是默认值

$ nats stream add -s localhost:4333
? Stream Name: aggtest2
? Subject: cjhome2.*
? Storage: file
? Replication: 1
? 后面参数全部是默认值

2.2. 在远端集群,将两个边缘节点的消息收集起来

$ nats stream add aggtest --source aggtest1 --source aggtest2 --creds creds文件路径
? Storage: file
? 默认值...
? Import "aggtest1" from a different JetStream domain: yes
? aggtest1 Source foreign JetStream domain name: cjhome
? 默认值...
? Import "aggtest2" from a different JetStream domain: yes
? aggtest1 Source foreign JetStream domain name: cjhome2

这个时候,分别往两个边缘节点分别都推一条消息,可以发现远端集群有收到2条消息(哪怕网络断开一段时间,只要网络恢复了,远端集群最终还是会收到2条消息)。

  1. 两个边缘节点之间是没有创建任何连接的,那这两个边缘节点能不能将远端节点作为代理,从而实现消息的mirror呢?
$ nats stream add --mirror aggtest1 --js-stream cjhome2
? Stream Name: aggtest1-mr-backup
? 默认值...
? Import mirror from a different JetStream domain: yes
? Foreign JetStream domain name: cjhome
? 默认值...

上面,我在cjhome2创建了一个指向cjhome这个域的一个镜像。

跨租户交换Jetstream数据

nats这个stream的接口可能和其他系统的api接口不太一样,nats的所有接口都是nats本身的这个request response这种消息模型来做这个客户端和服务端的交互的。也就是说别的系统,比如说我调一个api的接口,可能是一个http request的到服务端,服务端再返回这个结果。 但是nats它是通过推消息,然后服务端再返回结果,是这样的模式。

它的所有和api相关的这些接口,它都是一个js打头的一个主题,我们就来订阅这个主题。我们可以试一下,在订阅这个主题后,在调用api的时候,这个订阅会收到什么内容。

  1. 订阅所有的api
$ nats sub '$JS.API.>'
  1. 看stream的信息
$ nats stream info

执行后,可以发现第1步中订阅的主题,有返回消息。

这就是打破了传统的这种客户端和服务端交互的这种模式。

  1. 使用request的方式,来发送api请求呢
$ nats req '$JS.API.STREAM.INFO.aggtest1'

这时,可以发现该命令的返回结果,和第2步中返回的结果,基本上是一致的。

从这可以看出,jetstream所有的api的接口其实都是可以通过request response这个消息模型来调用的。

  1. nats官网上都有哪些api接口呢?

文档链接: https://docs.nats.io/reference/reference-protocols/nats_api_reference

  1. 如何在多个jetstream租户之间交换数据

假设有两个租户: oa 和 hr, hr下有个hruser的用户

下面所有操作,都是对远端集群做的,其域是mainbj

5.1 在oa这个租户里,把相应的api权限给共享出去

$ nsc add export --account oa --name Consumer-API --service --response-type Stream --subject '$JS.mainbj.API.CONSUMER.>'

从oa这个租户,把mainbj这个域里的consumer相关的api全部export出去。

5.2 在hr这个租户下,导入上面导出的这个权限:

$ nsc add import --account hr --src-account oa --name Remote-Consumer-API --service --remote-subject '$JS.mainbj.API.CONSUMER.>' --local-subject 'JS.oa@mainbj.API.CONSUMER.>'

在hr这租户下添加一个import,其中远端的主题是$JS.mainbj.API.CONSUMER.>,但是到hr这个租户之后,将其重命名为JS.oa@mainbj.API.CONSUMER.>,这个主要是为了防止主题的这个重复。

这样,api的权限,就从oa这个account导出到,然后在hr这个租户里导入。

5.3 在oa这个租户,导出数据的主题

$ nsc add export --account oa --name Data-Path --subject 'delivery.>'

5.4 在hr这个租户里,导入数据的权限

$ nsc add import --account hr --src-account oa --name Remote-Data-Path --remote-subject 'deliver.hr.>'

把前4步的配置推到远端节点中:

$ nats push -A -u nats://localhost:4222

5.5 在hr这个租户里,做oa这个租户的主题镜像,这一步需要到远端集群中的pod里做

$ nats --creds creds文件路径 stream add cjtest-mr-crossaccount --mirror cjtest
? Storage: file
? 默认值...
? Import mirror from a different JetStream domain: No
? Import mirror from a different account: yes
? Foreign account API prefix: JS.oa@mainbj.API
? Foreign account delivery prefix: delivery.hr.cjtest-mr-crossaccount

给边缘节点配置mqtt端口监听

mqtt {
	port: 1883
}
server_name: homegw

启动mqtt之后,它要求必须有一个server_name的配置

mqtt本身不支持nkey和jwt的认证,所以,我需要对这个用户进行以下简单的配置。

  1. 查看远端集群的用户:
$ nsc list users
可以发现有两个用户,cjzhao和mqttuser
其中mqttuser启用了bearer token

$ nsc describe user mqttuser
从输出结果中,可以发现:Bearer Token配置的值是Yes
有了这个之后,我就可以在mqtt客户端里用jwt的token来作为这个用户的密码,用户名我可以随意填写。
  1. 查看该用户mqttuser的jwt的内容
$ cat mqttuser.creds
-----BEGIN NATS USER JWT-----
jwt格式的token,这个token需要放入下面的c++驱动代码里
------END NATS USER JWT------
  1. 如何启用bearer token功能:
$ nsc edit user --name U --account A --bearer

MQTT主题的命名,和nats有差异

MQTT character NATS character(s) Topic(MQTT) Subject(NATS)
/ between two levels . foo/bar foo.bar
/ as first level /. /foo/bar /.foo.bar
/ as last level ./ foo/bar/ foo.bar./
/ next to another ./ foo//bar foo./.bar
/ next to another /. //foo/bar /./.foo.bar
. Not Support foo.bar Not Supported

MQTT Wildcard : # , + NATS Wildcard : > , *

vscode插件

安装platformio的vscode插件,并创建一个名为MQTT的项目,该项目需要添加如下依赖:

# platformio.ini文件

platform = espressif8266
board = nodemcuv2
framework = arduino
lib_deps = ottowinter/AsyncMqttClient-esphome@^0.8.6
monitor_speed = 115200

代码

explorer:

MQTT:
  include
  lib
  src
    main.cpp
  platformio.ini

main.cpp:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
#include <ESP8266WiFi.h>
#include <Ticker.h>
#include <AsyncMqttClient.h>
#include <Arduino.h>

#define WIFI_SSID "Xiaomi_AD0D"
#define WIFI_PASSWORD "xin@1234"

#define USE_MQTT "mqttuser"
#define USE_MQTT_PASSWORD "jwt格式的token值"
// 软路由地址: 192.168.2.1
#define MQTT_HOST IPAddress(192, 168, 2, 1)
#define MQTT_PORT 1883

AsyncMqttClient  mqttClient;
Ticker  mqttReconnectTimer;

WiFiEventHandler wifiConnectHandler;
WiFiEventHandler wifiDisconnectHandler;
Ticker wifiReconnetTimer;

void connectToWifi() {
	Serial.println("Connecting to Wi-Fi...");
	WiFi.begin(WIFI_SSID, WIFI_PASSWORD);
}
void connectToMqtt() {
	Serial.println("Connecting to MQTT...");
	mqttClient.setCredentials(USE_MQTT, USE_MQTT_PASSWORD);
	mqttClient.connect();
}

void onWifiConnect(const WiFiEventStationModeGotIP& event) {
	Serial.println("Connected to Wi-Fi.");
	connectToMqtt();
}

void onWifiDisconnect(const WiFiEventStationModeDisconnected& event) {
	Serial.println("Disconnected from Wi-Fi.");
	mqttReconnectTimer.detach();  // ensure we don't reconnect to MQTT while reconnecting to Wi-Fi
	wifiReconnetTimer.once(2, connectToWifi);
}

void onMqttConnect(bool sessionPresent) {
	Serial.println("Connected to MQTT.");
	mqttClient.subscribe("test/lol", 2);
}

void onMqttDisconnect(AsyncMqttClientDisconnectReason reason) {
	Serial.println("Disconnected from MQTT.");

	if (WiFi.isConnected()) {
		mqttReconnectTimer.once(2, connectToMqtt);
	}
}

void onMqttMessage(char* topic, char* payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total) {
	Serial.println("Publish received.");
	Serial.print("    topic:  ");
	Serial.println(topic);
	Serial.print("  msg:  ");
	Serial.println(payload);
	if (1 == atoi(payload)) {
		// 如果我收到的是1,就关闭蓝色led的灯
		pinMode(LED_BUILTIN, INPUT);
	} else {
		// 如果我收到的是0,就打开蓝色led的灯
		pinMode(LED_BUILTIN, OUTPUT);
	}
}

void setup() {
	Serial.begin(115200);
	wifiConnectHandler = WiFi.onStationModeGotIP(onWifiConnect);
	wifiDisconnectHandler = WiFi.onStationModeDisconnected(onWifiDisconnect);
	mqttClient.onConnect(onMqttConnect);
	mqttClient.onDisconnect(onMqttDisconnect);
	mqttClient.onMessage(onMqttMessage);
	mqttClient.setServer(MQTT_HOST, MQTT_PORT);
	connectToWifi();
}

void loop() {
	mqttClient.publish("iot/eps8266/report", 1, true, "ok");
	delay(2000);
}

代码没有错误之后,需要将其烧制到iot设备里。

然后在远端集群中,往转换格式后的test.lol主题推送消息:

$ nats pub test.lol 0 --creds creds文件的路径

cpython的编译

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
$ yum install -y libffi-devel zlib-devel bzip2-devel xz-devel

# 如果yum install zlib-devel时报错,就需要使用如下的编译安装了
# python依赖zlib,否则代码中引用zlib时会报fatal error zlib.h No such file or directory错误
$ wget http://www.zlib.net/zlib-1.2.11.tar.gz
$ tar -xzf zlib-1.2.11.tar.gz
$ cd zlib-1.2.11
$ ./configure --shared
$ make test
$ make install
$ cp zutil.h /usr/local/include
$ cp zutil.c /usr/local/include

# 编译openssl静态库
$ wget https://github.com/openssl/openssl/archive/refs/tags/OpenSSL_1_1_1w.tar.gz
$ tar -xzf OpenSSL_1_1_1w.tar.gz
$ cd openssl-OpenSSL_1_1_1w
$ ./config --prefix=/usr/local/openssl --openssldir=/usr/local/openssl no-shared threads
$ make -j16
$ make install

# 编译python
$ wget https://github.com/python/cpython/archive/refs/tags/v3.9.19.tar.gz
$ tar -xzf v3.9.19.tar.gz
$ cd cpython-3.9.19
# gcc编译可以分为四个主要阶段:预处理、编译、汇编和链接
# CPPFLAGS: 指定预处理器(c/c++ PreProcessor)的选项或标志,例如头文件路径(通过 -I 选项指定)、宏定义(通过 -D 选项指定)。
# CFLAGS: 指定C编译器的选项;CXXFLAGS: 指定C++编译器的选项
# LDFLAGS: 指定链接器(c/c++ linker)的选项或标志,例如库文件的搜索路径、需要链接的库以及链接器的行为控制选项。
# CPATH: 指定头文件搜索路径,适用于所有语言。类似于-I选项,但在命令行上用-I给出的任何路径之后使用
# LIBRARY_PATH: 在链接阶段使用的库搜索路径。这类似于-L选项,用来指定链接时需要搜索的库目录
# LD_LIBRARY_PATH: 运行时动态链接器使用的库搜索路径。这个环境变量影响程序在运行时如何找到所需的共享库(.so文件)
# LANG:指定编译器要使用的字符集;LC_ALL:覆盖所有局部化设置,包括字符分类等
# python给vim使用时,must be compiled with --enable-shared (or --enable-framework on macOS)
$ ./configure LDFLAGS="-L/usr/local/openssl/lib -Wl,-rpath,/usr/local/openssl/lib" CPPFLAGS="-I/usr/local/openssl/include" --with-openssl=/usr/local/openssl/  --prefix /home/jiaweicheng/.python39 --enable-shared
$ make -j16
$ make install

$ vim ~/.bashrc
export LD_LIBRARY_PATH=${LD_LIBRARY_PATH}:/home/jiaweicheng/.python/lib
$ source ~/.bashrc

# 测试: ~/.python/bin/python3 -c "import sysconfig; print(sysconfig.get_config_var( 'INCLUDEPY' ))"
# 编译参数的位置: ./lib/python3.9/config-3.9-x86_64-linux-gnu/Makefile

专题:

本文发表于 2022-01-05,最后修改于 2022-01-05。

本站永久域名「 jiavvc.top 」,也可搜索「 后浪笔记一零二四 」找到我。


上一篇 « 2jetstream实操篇 下一篇 » 后缀数组

赞赏支持

请我吃鸡腿 =^_^=

i ysf

云闪付

i wechat

微信

推荐阅读

Big Image