首页
Preview

使用Golang开始使用Apache Kafka

之前我曾经作为一名 DevOps 工程师使用过 Apache Kafka。我的队列系统(Kafka/RabbitMQ)经验大多是在部署和维护监控堆栈方面。像 Kafka 这样的系统能够每小时摄入数百万个度量数据点而不会降低性能,这让我对它产生了好奇心。我想知道如何减少系统的高延迟并在流量激增或大量数据的情况下提高响应时间?以及 DevOps 工程师如何利用它为他们的自定义服务提供帮助。

PS:这只是我的学习经验。在这里,我将仅介绍使用 Go 和 Apache Kafka 编写简单的生产者和消费者程序。

Apache Kafka 是一种开源的分布式事件流处理平台,被数千家公司用于高性能数据管道、流处理分析、数据集成和关键业务应用。

使用 Apache Kafka 的原因

提高可靠性 队列使你的数据持久化。如果系统的某个部分无法访问,其他部分仍然可以继续与队列交互。

提高性能 消息队列使异步通信成为可能,生产和消费消息的终端点与队列交互,而不是彼此之间。

可扩展性 当工作量达到峰值时,消息队列有助于精确扩展你需要的工作量。

你可以在这里了解更多关于使用消息队列的原因:Message Queues & You — 12 Reasons to Use Message Queuing

让我们来看代码

这是一个简单的系统,通过 REST API 添加消息,然后在生产者中生成它,并在消费者/工作者中进行处理。可以是任何耗时的过程,例如将数据保存到数据存储中并执行聚合或某些计算。

在本地运行 Kafka

本文中所有示例都在我的本地运行的 Kafka 上进行了测试。你也可以使用 Docker 和 docker-compose 实现这一点。

curl -sSL https://raw.githubusercontent.com/bitnami/bitnami-docker-kafka/master/docker-compose.yml > docker-compose.yml
docker-compose up -d

编写生产者以及 REST API

我使用的是 Sarama,这是一个用于 Apache Kafka 的 Go 库。还有其他选择,例如 kafka-go。选择 sarama 的原因是它比 kakfa-go 更,并且 sarama 周围的 Go 社区和代码有助于选择 sarama。

使用 sarama 连接 Apache Kafka 作为生产者

func ConnectProducer(brokersUrl []string) (sarama.SyncProducer,error) {    config := sarama.NewConfig()    config.Producer.Return.Successes = true    config.Producer.RequiredAcks = sarama.WaitForAll    config.Producer.Retry.Max = 5    // NewSyncProducer creates a new SyncProducer using the given broker addresses and configuration.    conn, err := sarama.NewSyncProducer(brokersUrl, config)    if err != nil {        return nil, err
    }   return conn, nil}

**使用 sarama 推送评论到队列(主题)**这个函数非常简单,它以主题和消息作为参数,使用上面的 ConnectProducer 函数连接到 Kafka,并将该消息推送到给定的主题。

func PushCommentToQueue(topic string, message []byte) error {    brokersUrl := []string{"kafkahost1:9092", "kafkahost2:9092"}
    producer, err := ConnectProducer(brokersUrl)    if err != nil {
        return err
    }    defer producer.Close()    msg := &sarama.ProducerMessage{
        Topic: topic,
        Value: sarama.StringEncoder(message),
    }    partition, offset, err := producer.SendMessage(msg)    if err != nil {
        return err
    }    fmt.Printf("Message is stored in topic(%s)/partition(%d)/offset(%d)\n", topic, partition, offset)    return nil}

这就是我们连接和推送数据到 Kafka 所需的全部内容。让我们编写一个简单的 API 服务器,该服务器以 JSON 作为输入并将其传递给上述 PushCommentToQueue 函数。

我正在使用 fiber 编写 REST 服务器,你可以使用任何你喜欢的服务器。

type Comment struct {
    Text string `form:"text" json:"text"`
}func createComment(c *fiber.Ctx) error {// Instantiate new Message struct
  cmt := new(Comment)  if err := c.BodyParser(cmt); err != nil {
      c.Status(400).JSON(&fiber.Map{
        "success": false,
        "message": err,
      })
      return err
    }    // convert body into bytes and send it to kafka
    cmtInBytes, err := json.Marshal(cmt)
    PushCommentToQueue("comments", cmtInBytes)    // Return Comment in JSON format
    err = c.JSON(&fiber.Map{
      "success": true,
      "message": "Comment pushed successfully",
      "comment": cmt,
    })
    if err != nil {
      c.Status(500).JSON(&fiber.Map{
      "success": false,
      "message": "Error creating product",
    })
    return err
  }  return err
}
func main() {
   app := fiber.New()
   api := app.Group("/api/v1")
   api.Post("/comment", createComment)
   app.Listen(":3000")
}

让我们向 Kafka 推送一些数据

curl --location --request POST '0.0.0.0:3000/api/v1/comments' \
--header 'Content-Type: application/json' \
--data-raw '{ "text":"nice boy" }'curl --location --request POST '0.0.0.0:3000/api/v1/comments' \
--header 'Content-Type: application/json' \
--data-raw '{ "text":"keep up the good work" }'

编写消费者/工作者使用 sarama

使用 sarama 连接 Apache Kafka 作为消费者

func connectConsumer(brokersUrl []string) (sarama.Consumer, error) {    config := sarama.NewConfig()
    config.Consumer.Return.Errors = true    // NewConsumer creates a new consumer using the given broker addresses and configuration
    conn, err := sarama.NewConsumer(brokersUrl, config)    if err != nilPushCommentToQueue {
        return nil, err
    }    return conn, nil
}

然后使用 connectConsumer 我们将作为消费者连接到 Kafka。之后我们将调用 ConsumePartition,它将使用给定的偏移量在给定的主题/分区上创建 PartitionConsumer。然后我们将打开一个信号 chan 来读取消息。在信号中,使用了 consumer.Messages 方法,该方法返回经由 broker 返回的消息的读取通道。

topic := "comments"    worker, err := connectConsumer([]string{"kafkahost1:9092", "kafkahost1:9092"})    if err != nil {
        panic(err)
    }    // calling ConsumePartition. It will open one connection per broker
    // and share it for all partitions that live on it.    consumer, err := worker.ConsumePartition(topic, 0,sarama.OffsetOldest)

工作者的消费者消息队列输出

有关完整代码,请参见 github 链接

译自:https://medium.com/swlh/apache-kafka-with-golang-227f9f2eb818

版权声明:本文内容由TeHub注册用户自发贡献,版权归原作者所有,TeHub社区不拥有其著作权,亦不承担相应法律责任。 如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

点赞(0)
收藏(0)
菜鸟一只
你就是个黄焖鸡,又黄又闷又垃圾。

评论(0)

添加评论