Kafka是一款高度可扩展的事件流平台,我们可以使用它创建消费者来读取事件,创建生产者来写入事件。
定义:
- Topic:用于标记或分类消息的方式
- Consumer:接收Kafka消息的任何应用程序
我们将使用Docker来上传Kafka环境。
docker-compose.yaml
version: '3'
services:
zookeeper:
image: 'bitnami/zookeeper:latest'
ports:
- '2181:2181'
volumes:
- 'zookeeper_data:/bitnami'
environment:
ALLOW_ANONYMOUS_LOGIN: yes
kafka:
image: 'bitnami/kafka:latest'
depends_on:
- 'zookeeper'
ports:
- '9092:9092'
volumes:
- 'kafka_data:/bitnami'
environment:
ALLOW_PLAINTEXT_LISTENER: yes
KAFKA_ENABLE_KRAFT: no
KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092
kafka-ui:
image: 'provectuslabs/kafka-ui:latest'
ports:
- '8080:8080'
depends_on:
- 'kafka'
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
KAFKA_CLUSTERS_0_METRICS_PORT: 9997
volumes:
zookeeper_data:
driver: local
kafka_data:
driver: local
你可以使用docker-compose up -d
来加载Docker Compose。
Zookeeper,是什么?
Zookeeper是一个集中式开源服务器,用于维护和管理分布式集群环境中的配置信息、命名约定和同步。
Kafka UI,是什么?
Kafka UI是一个用于查看Kafka信息的接口。消费者、主题、消息等。
加载Docker Compose后,你可以在URL <a class="af ne" href="http://localhost:8080" rel="noopener ugc nofollow" target="_blank">http://localhost:8080</a>
中打开Kafka界面。
接下来进入Spring配置。
application.yaml
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
现在我们转向Java代码。
读取主题消息:
@Service
@Slf4j
public class KafkaMessageReader {
@KafkaListener(topics = "topic.here", groupId = "consumer.name.here")
public void receiveKafkaMessage(String message) {
log.info("I received the message '{}'", message);
}
}
groupId:消费者的标识
写入主题消息:
@Service
@Slf4j
public class KafkaMessageWriter {
private KafkaTemplate<String, String> kafkaTemplate;
public KafkaMessageWriter(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void writeKafkaMessage(String message) {
this.kafkaTemplate.send("topic.here", "key", message);
log.info("I wrote the message '{}'", message);
}
}
kafkaTemplate
字段是由Spring依赖注入注入的。 “key”字段是用于标识消息的键。
结果:
这样,我们就有了一个在Kafka中读取和写入消息的Spring应用程序。感谢阅读。
评论(0)