首页
Preview

[Java] 教程 | 使用 Kafka 的 Spring Boot

Kafka是一款高度可扩展的事件流平台,我们可以使用它创建消费者来读取事件,创建生产者来写入事件。

定义:

  • Topic:用于标记或分类消息的方式
  • Consumer:接收Kafka消息的任何应用程序

我们将使用Docker来上传Kafka环境。

docker-compose.yaml

version: '3'

services:
  zookeeper:
    image: 'bitnami/zookeeper:latest'
    ports:
      - '2181:2181'
    volumes:
      - 'zookeeper_data:/bitnami'
    environment:
      ALLOW_ANONYMOUS_LOGIN: yes
  kafka:
    image: 'bitnami/kafka:latest'
    depends_on:
      - 'zookeeper'
    ports:
      - '9092:9092'
    volumes:
      - 'kafka_data:/bitnami'
    environment:
      ALLOW_PLAINTEXT_LISTENER: yes
      KAFKA_ENABLE_KRAFT: no
      KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CFG_LISTENERS: PLAINTEXT://:9092
  kafka-ui:
    image: 'provectuslabs/kafka-ui:latest'
    ports:
      - '8080:8080'
    depends_on:
      - 'kafka'
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
      KAFKA_CLUSTERS_0_METRICS_PORT: 9997

volumes:
  zookeeper_data:
    driver: local
  kafka_data:
    driver: local

你可以使用docker-compose up -d来加载Docker Compose。

Zookeeper,是什么?

Zookeeper是一个集中式开源服务器,用于维护和管理分布式集群环境中的配置信息、命名约定和同步。

Kafka UI,是什么?

Kafka UI是一个用于查看Kafka信息的接口。消费者、主题、消息等。

加载Docker Compose后,你可以在URL <a class="af ne" href="http://localhost:8080" rel="noopener ugc nofollow" target="_blank">http://localhost:8080</a>中打开Kafka界面。

接下来进入Spring配置。

application.yaml

spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

现在我们转向Java代码。

读取主题消息:

@Service
@Slf4j
public class KafkaMessageReader {

  @KafkaListener(topics = "topic.here", groupId = "consumer.name.here")
  public void receiveKafkaMessage(String message) {
     log.info("I received the message '{}'", message);
  }
}

groupId:消费者的标识

写入主题消息:

@Service
@Slf4j
public class KafkaMessageWriter {

  private KafkaTemplate<String, String> kafkaTemplate;

  public KafkaMessageWriter(KafkaTemplate<String, String> kafkaTemplate) {
      this.kafkaTemplate = kafkaTemplate;
  }
 
  public void writeKafkaMessage(String message) {
      this.kafkaTemplate.send("topic.here", "key", message);
      log.info("I wrote the message '{}'", message);
  }
}

kafkaTemplate字段是由Spring依赖注入注入的。 “key”字段是用于标识消息的键。

结果:

这样,我们就有了一个在Kafka中读取和写入消息的Spring应用程序。感谢阅读。

版权声明:本文内容由TeHub注册用户自发贡献,版权归原作者所有,TeHub社区不拥有其著作权,亦不承担相应法律责任。 如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

点赞(0)
收藏(0)
阿波
The minute I see you, I want your clothes gone!

评论(0)

添加评论