你好,在这篇文章中,我将简要介绍Nats JetStream,然后展示使用Golang进行跨服务消息流的示例。这是我在Medium上的第一篇文章,我很兴奋。我希望这篇文章和我接下来写的内容对你有用。
什么是Nats JetStream?
要了解Nats JetStream,首先让我谈谈Nats生态系统及其演进(Core Nats → Nats Streaming → Nats JetStream)。
Core Nats 是一个云原生、高性能的PubSub消息系统。它使用“至多一次”交付模型。因此,发布的消息没有交付或持久性的保证。
Nats Streaming(STAN)使用“至少一次”交付模型。消息的持久性和交付是有保证的。但是,由于其架构:
- 无法水平扩展
- 不能Nack消息
- 没有消费者的“拉订阅”等。
NATS Streaming Server即将停用。关键的错误修复和安全修复将一直持续到2023年6月。因此,建议使用Nats JetStream进行新的应用程序开发。
Nats JetStream 包括新的持久性特性和消息交付策略。它具有水平可扩展性,并针对非常大的数据集进行了优化。你还可以在Nats JetStream文档中查看设计目标。
我们正在公司的生产环境中积极使用Nats JetStream(该公司在全球拥有超过1.5亿用户)。到目前为止,我们还没有遇到任何问题。在开始使用JetStream之前,我建议你查看Nats Server和Nats Go Client的问题。可能会涉及到你的问题。
让我们开始Nats JetStream的安装。然后,我们将使用Go客户端进行一个小的演示。
安装和运行Nats服务器和JetStream
NATS的哲学是简单。安装只需解压缩zip文件并将二进制文件复制到适当的目录;你还可以使用自己喜欢的包管理器。
我将使用brew进行安装。你可以使用其他安装方式。
brew install nats-server
你可以简单地运行nats-server
可执行文件。我们应该使用-js
标志启用JetStream,并使用-m
设置http监视端口。你还可以使用配置文件。
nats-server -js -m 8080
Nats服务器正在使用默认端口4222运行
设置Nats JetStream Go客户端
让我们首先创建一个JetStream上下文来管理流(创建、发布、订阅)。我将使用Nats Go Client v1.16.0和Go v1.18。
package main
import (
"github.com/nats-io/nats.go"
)
func JetStreamInit() (nats.JetStreamContext, error) {
// Connect to NATS
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
return nil, err
}
// Create JetStream Context
js, err := nc.JetStream(nats.PublishAsyncMaxPending(256))
if err != nil {
return nil, err
}
return js, nil
}
`PublishAsyncMaxPending`设置可以同时传输的异步发布的最大数量。
默认URL为`nats://127.0.0.1:4222`
现在我们可以创建一个新的流并将消息发布到流中。我们创建了一个REVIEWS
流,用于REVIEWS.*
主题。
package main
import (
"github.com/nats-io/nats.go"
"log"
)
const (
StreamName = "REVIEWS"
StreamSubjects = "REVIEWS.*"
)
func CreateStream(jetStream nats.JetStreamContext) error {
stream, err := jetStream.StreamInfo(StreamName)
// stream not found, create it
if stream == nil {
log.Printf("Creating stream: %s\n", StreamName)
_, err = jetStream.AddStream(&nats.StreamConfig{
Name: StreamName,
Subjects: []string{StreamSubjects},
})
if err != nil {
return err
}
}
return nil
}
让我们使用商店的虚拟用户评论数据。假设我们有两个不同的服务,一个发布用户评论,另一个消费用户评论。
[
{
"_id": "58c03ac18060197ca0b52d51",
"author": "58c039018060197ca0b52d4c",
"store": "58c03a958060197ca0b52d50",
"text": "I tried this place last week and it was incredible! ",
"rating": 5,
"created": "2017-03-08T17:09:21.627Z"
},
{
"_id": "58c03af28060197ca0b52d53",
"author": "58c03ada8060197ca0b52d52",
"store": "58c03a958060197ca0b52d50",
"text": "hipsters everywhere",
"rating": 1,
"created": "2017-03-08T17:10:10.426Z"
}
]
创建一个Review
模型:
package models
type Review struct {
Id string `json:"_id"`
Author string `json:"author"`
Store string `json:"store"`
Text string `json:"text"`
Rating int `json:"rating"`
Created string `json:"created"`
}
向流中发布消息
现在,我们可以使用Nats JetStream发布虚拟评论。我放入了随机等待毫秒,以便更清楚地查看控制台输出。我们将所有评论数据发布到REVIEWS.rateGiven
主题上。顺便说一下,你可以发送结构化数据,而不是编组和解组。
package main
import (
"encoding/json"
"github.com/nats-io/nats.go"
"go-nats-jetstream/models"
"io/ioutil"
"log"
"math/rand"
"time"
)
const (
SubjectNameReviewCreated = "REVIEWS.rateGiven"
)
func publishReviews(js nats.JetStreamContext) {
reviews, err := getReviews()
if err != nil {
log.Println(err)
return
}
for _, oneReview := range reviews {
// create random message intervals to slow down
r := rand.Intn(1500)
time.Sleep(time.Duration(r) * time.Millisecond)
reviewString, err := json.Marshal(oneReview)
if err != nil {
log.Println(err)
continue
}
// publish to REVIEWS.rateGiven subject
_, err = js.Publish(SubjectNameReviewCreated, reviewString)
if err != nil {
log.Println(err)
} else {
log.Printf("Publisher => Message:%s\n", oneReview.Text)
}
}
}
func getReviews() ([]models.Review, error) {
rawReviews, _ := ioutil.ReadFile("./reviews.json")
var reviewsObj []models.Review
err := json.Unmarshal(rawReviews, &reviewsObj)
return reviewsObj, err
}
理解消费者类型
Nats JetStream提供了两种类型的消费者,拉取式和推送式。
拉取式消费者
从主题中拉取一批消息。它必须要求系统提供下一个可用的消息,因此你可以根据服务的可用性进行扩展。
推送式消费者
的控制在Nats服务器中。服务器将消息发送到推送式消费者。
Nats JetStream还支持通配符订阅:
jetStream.Subscribe(“REVIEWS.*”, func(m *nats.Msg)
从流中消费消息
让我们创建一个推送式消费者并订阅REVIEW
流。我们将从之前发布的REVIEWS.rateGiven
主题中获取评论虚拟数据。如果我们想,我们可以将REVIEWS.rateAnswer
发布为答案。
package main
import (
"encoding/json"
"github.com/nats-io/nats.go"
"go-nats-jetstream/models"
"log"
)
const (
SubjectNameReviewCreated = "REVIEWS.rateGiven"
)
func consumeReviews(js nats.JetStreamContext) {
_, err := js.Subscribe(SubjectNameReviewCreated, func(m *nats.Msg) {
err := m.Ack()
if err != nil {
log.Println("Unable to Ack", err)
return
}
var review models.Review
err = json.Unmarshal(m.Data, &review)
if err != nil {
log.Fatal(err)
}
log.Printf("Consumer => Subject: %s - ID:%s - Author: %s - Rating:%d\n", m.Subject, review.Id, review.Author, review.Rating)
// send answer via JetStream using another subject if you need
// js.Publish(config.SubjectNameReviewAnswered, []byte(review.Id))
})
if err != nil {
log.Println("Subscribe failed")
return
}
}
现在是将所有部分组合在一起的时候了 :) 当我们同时运行消费者和发布者时,我们应该得到如下结果:
2022/06/28 00:24:19 Consumer => Subject: REVIEWS.rateGiven — ID:58c03ac18060197ca0b52d51 — Author: 58c039018060197ca0b52d4c — Rating:52022/06/28 00:24:19 Publisher => Message: I tried this place last week and it was incredible! 2022/06/28 00:24:20 Consumer => Subject: REVIEWS.rateGiven — ID:58c03af28060197ca0b52d53 — Author: 58c03ada8060197ca0b52d52 — Rating:12022/06/28 00:24:20 Publisher => Message: hipsters everywhere
你可以在这里找到完整的源代码。你可以尝试使用代码中的微小更改进行通配符订阅和拉取订阅。
GitHub-ebubekiryigit/go-nats-jetstream-example
总结
我最喜欢的部分是它的简单易用。它的横向扩展性和高性能也是一个很大的优点。当我创建一个新的微服务时,Nats JetStream是我在PubSub结构中首先想到的技术。
如果你有问题,他们在Slack groups中会非常快速地回应。## 参考资料
评论(0)