首页
Preview

使用Nats JetStream在Golang中实现分布式消息流处理

你好,在这篇文章中,我将简要介绍Nats JetStream,然后展示使用Golang进行跨服务消息流的示例。这是我在Medium上的第一篇文章,我很兴奋。我希望这篇文章和我接下来写的内容对你有用。

什么是Nats JetStream?

要了解Nats JetStream,首先让我谈谈Nats生态系统及其演进(Core Nats → Nats Streaming → Nats JetStream)。

Core Nats 是一个云原生、高性能的PubSub消息系统。它使用“至多一次”交付模型。因此,发布的消息没有交付或持久性的保证。

Nats Streaming(STAN)使用“至少一次”交付模型。消息的持久性和交付是有保证的。但是,由于其架构:

  • 无法水平扩展
  • 不能Nack消息
  • 没有消费者的“拉订阅”等。

NATS Streaming Server即将停用。关键的错误修复和安全修复将一直持续到2023年6月。因此,建议使用Nats JetStream进行新的应用程序开发。

Nats JetStream 包括新的持久性特性和消息交付策略。它具有水平可扩展性,并针对非常大的数据集进行了优化。你还可以在Nats JetStream文档中查看设计目标

我们正在公司的生产环境中积极使用Nats JetStream(该公司在全球拥有超过1.5亿用户)。到目前为止,我们还没有遇到任何问题。在开始使用JetStream之前,我建议你查看Nats ServerNats Go Client的问题。可能会涉及到你的问题。

让我们开始Nats JetStream的安装。然后,我们将使用Go客户端进行一个小的演示。

安装和运行Nats服务器和JetStream

NATS的哲学是简单。安装只需解压缩zip文件并将二进制文件复制到适当的目录;你还可以使用自己喜欢的包管理器。

我将使用brew进行安装。你可以使用其他安装方式

brew install nats-server

你可以简单地运行nats-server可执行文件。我们应该使用-js标志启用JetStream,并使用-m设置http监视端口。你还可以使用配置文件

nats-server -js -m 8080

Nats服务器正在使用默认端口4222运行

设置Nats JetStream Go客户端

让我们首先创建一个JetStream上下文来管理流(创建、发布、订阅)。我将使用Nats Go Client v1.16.0和Go v1.18。

package main

import (
	"github.com/nats-io/nats.go"
)

func JetStreamInit() (nats.JetStreamContext, error) {
	// Connect to NATS
	nc, err := nats.Connect(nats.DefaultURL)
	if err != nil {
		return nil, err
	}

	// Create JetStream Context
	js, err := nc.JetStream(nats.PublishAsyncMaxPending(256))
	if err != nil {
		return nil, err
	}

	return js, nil
}

`PublishAsyncMaxPending`设置可以同时传输的异步发布的最大数量。
默认URL为`nats://127.0.0.1:4222`

现在我们可以创建一个新的流并将消息发布到流中。我们创建了一个REVIEWS流,用于REVIEWS.*主题。

package main

import (
	"github.com/nats-io/nats.go"
	"log"
)

const (
	StreamName     = "REVIEWS"
	StreamSubjects = "REVIEWS.*"
)

func CreateStream(jetStream nats.JetStreamContext) error {
	stream, err := jetStream.StreamInfo(StreamName)

	// stream not found, create it
	if stream == nil {
		log.Printf("Creating stream: %s\n", StreamName)

		_, err = jetStream.AddStream(&nats.StreamConfig{
			Name:     StreamName,
			Subjects: []string{StreamSubjects},
		})
		if err != nil {
			return err
		}
	}
	return nil
}

让我们使用商店的虚拟用户评论数据。假设我们有两个不同的服务,一个发布用户评论,另一个消费用户评论。

[
  {
    "_id": "58c03ac18060197ca0b52d51",
    "author": "58c039018060197ca0b52d4c",
    "store": "58c03a958060197ca0b52d50",
    "text": "I tried this place last week and it was incredible! ",
    "rating": 5,
    "created": "2017-03-08T17:09:21.627Z"
  },
  {
    "_id": "58c03af28060197ca0b52d53",
    "author": "58c03ada8060197ca0b52d52",
    "store": "58c03a958060197ca0b52d50",
    "text": "hipsters everywhere",
    "rating": 1,
    "created": "2017-03-08T17:10:10.426Z"
  }
]

创建一个Review模型:

package models

type Review struct {
	Id      string `json:"_id"`
	Author  string `json:"author"`
	Store   string `json:"store"`
	Text    string `json:"text"`
	Rating  int    `json:"rating"`
	Created string `json:"created"`
}

向流中发布消息

现在,我们可以使用Nats JetStream发布虚拟评论。我放入了随机等待毫秒,以便更清楚地查看控制台输出。我们将所有评论数据发布到REVIEWS.rateGiven主题上。顺便说一下,你可以发送结构化数据,而不是编组和解组。

package main

import (
	"encoding/json"
	"github.com/nats-io/nats.go"
	"go-nats-jetstream/models"
	"io/ioutil"
	"log"
	"math/rand"
	"time"
)

const (
	SubjectNameReviewCreated = "REVIEWS.rateGiven"
)

func publishReviews(js nats.JetStreamContext) {
	reviews, err := getReviews()
	if err != nil {
		log.Println(err)
		return
	}

	for _, oneReview := range reviews {
		
		// create random message intervals to slow down
		r := rand.Intn(1500)
		time.Sleep(time.Duration(r) * time.Millisecond)

		reviewString, err := json.Marshal(oneReview)
		if err != nil {
			log.Println(err)
			continue
		}

		// publish to REVIEWS.rateGiven subject
		_, err = js.Publish(SubjectNameReviewCreated, reviewString)
		if err != nil {
			log.Println(err)
		} else {
			log.Printf("Publisher  =>  Message:%s\n", oneReview.Text)
		}
	}
}

func getReviews() ([]models.Review, error) {
	rawReviews, _ := ioutil.ReadFile("./reviews.json")
	var reviewsObj []models.Review
	err := json.Unmarshal(rawReviews, &reviewsObj)

	return reviewsObj, err
}

理解消费者类型

Nats JetStream提供了两种类型的消费者,拉取式和推送式。

拉取式消费者从主题中拉取一批消息。它必须要求系统提供下一个可用的消息,因此你可以根据服务的可用性进行扩展。

推送式消费者的控制在Nats服务器中。服务器将消息发送到推送式消费者。

Nats JetStream还支持通配符订阅

jetStream.Subscribe(“REVIEWS.*”, func(m *nats.Msg)

从流中消费消息

让我们创建一个推送式消费者并订阅REVIEW流。我们将从之前发布的REVIEWS.rateGiven主题中获取评论虚拟数据。如果我们想,我们可以将REVIEWS.rateAnswer发布为答案。

package main

import (
	"encoding/json"
	"github.com/nats-io/nats.go"
	"go-nats-jetstream/models"
	"log"
)

const (
	SubjectNameReviewCreated = "REVIEWS.rateGiven"
)

func consumeReviews(js nats.JetStreamContext) {
	_, err := js.Subscribe(SubjectNameReviewCreated, func(m *nats.Msg) {
		err := m.Ack()

		if err != nil {
			log.Println("Unable to Ack", err)
			return
		}

		var review models.Review
		err = json.Unmarshal(m.Data, &review)
		if err != nil {
			log.Fatal(err)
		}

		log.Printf("Consumer  =>  Subject: %s  -  ID:%s  -  Author: %s  -  Rating:%d\n", m.Subject, review.Id, review.Author, review.Rating)
		
		// send answer via JetStream using another subject if you need
		// js.Publish(config.SubjectNameReviewAnswered, []byte(review.Id))
	})

	if err != nil {
		log.Println("Subscribe failed")
		return
	}
}

现在是将所有部分组合在一起的时候了 :) 当我们同时运行消费者和发布者时,我们应该得到如下结果:

2022/06/28 00:24:19 Consumer => Subject: REVIEWS.rateGiven — ID:58c03ac18060197ca0b52d51 — Author: 58c039018060197ca0b52d4c — Rating:52022/06/28 00:24:19 Publisher => Message: I tried this place last week and it was incredible! 2022/06/28 00:24:20 Consumer => Subject: REVIEWS.rateGiven — ID:58c03af28060197ca0b52d53 — Author: 58c03ada8060197ca0b52d52 — Rating:12022/06/28 00:24:20 Publisher => Message: hipsters everywhere

你可以在这里找到完整的源代码。你可以尝试使用代码中的微小更改进行通配符订阅和拉取订阅。

GitHub-ebubekiryigit/go-nats-jetstream-example

总结

我最喜欢的部分是它的简单易用。它的横向扩展性和高性能也是一个很大的优点。当我创建一个新的微服务时,Nats JetStream是我在PubSub结构中首先想到的技术。

如果你有问题,他们在Slack groups中会非常快速地回应。## 参考资料

译自:https://medium.com/vlmedia-tech/distributed-message-streaming-in-golang-using-nats-jetstream-29f28be66dc6

版权声明:本文内容由TeHub注册用户自发贡献,版权归原作者所有,TeHub社区不拥有其著作权,亦不承担相应法律责任。 如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

点赞(0)
收藏(0)
菜鸟一只
你就是个黄焖鸡,又黄又闷又垃圾。

评论(0)

添加评论