(此文在Confluent.io博客上发布)
这个问题在StackOverflow等地方经常被问到,所以我来试着帮助一下。
简而言之:你需要设置advertised.listeners
(或者如果你使用Docker镜像则为KAFKA_ADVERTISED_LISTENERS
),将其设置为外部地址(主机/IP),以便客户端可以正确地连接到它。否则,它们将尝试连接到内部主机地址,如果无法访问,则会出现问题。
换句话说,引用Spencer Ruport的话:
LISTENERS
是Kafka绑定的接口。ADVERTISED_LISTENERS
是客户端可以连接的方式。
在本文中,我将讨论为什么这是必要的,然后根据几种场景(Docker和AWS)展示如何进行设置。
是否有人在监听?
Kafka是一个分布式系统。数据从给定分区的Leader读取和写入,该分区可以位于集群中的任何代理中。当客户端(生产者/消费者)启动时,它将请求关于哪个代理是该分区的Leader的元数据,并且可以从任何代理中获取这些信息。返回的元数据将包括Leader代理可用的端点,并且客户端将使用这些端点来连接到代理以根据需要读取/写入数据。
正是这些端点给人们带来了麻烦。在单机上运行‘裸机’(没有虚拟机,没有Docker)的情况下,一切可能都是主机名(或仅为*localhost
*),这很简单。但是一旦你进入更复杂的网络设置和多个节点,你就需要更加注意。
让我们假设你有多个网络。这可以是以下情况:
- Docker内部网络和主机机器
- 云中的代理(例如AWS EC2)以及本地的机器(甚至是另一个云中的机器)
你需要告诉Kafka代理如何相互连接,但也要确保外部客户端(生产者/消费者)可以连接到所需的代理。 关键是,当你运行一个客户端时,你向其传递的代理仅仅是用于获取集群中的代理的元数据的地方。实际用于读取/写入数据的主机和IP是基于代理在初始连接中返回的数据的,即使它只是一个单节点,返回的代理与连接的代理相同。 为了正确配置这一点,你需要了解Kafka代理可以有多个监听器。一个监听器是以下三个组件的组合:
- 主机/IP
- 端口
- 协议
让我们来看一些配置。通常,协议也用于监听器名称,但在这里,我们使用抽象名称来使其更加清晰:
KAFKA_LISTENERS: LISTENER_BOB://kafka0:29092,LISTENER_FRED://localhost:9092
KAFKA_ADVERTISED_LISTENERS: LISTENER_BOB://kafka0:29092,LISTENER_FRED://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_BOB:PLAINTEXT,LISTENER_FRED:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_BOB
我使用的是Docker配置名称-如果你直接配置server.properties
(例如在AWS上等)的等效配置显示在以下列表的缩进中
KAFKA_LISTENERS
是一个以逗号分隔的监听器列表,以及Kafka绑定用于监听的主机/IP和端口。对于更复杂的网络,这可能是与机器上的给定网络接口关联的IP地址。默认值为0.0.0.0,表示监听所有接口。listeners
KAFKA_ADVERTISED_LISTENERS
是一个以逗号分隔的监听器列表,以及它们的主机/IP和端口。这是传递给客户端的元数据。advertised.listeners
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
为每个监听器名称定义安全协议的键/值对。listener.security.protocol.map
注意:配置Docker镜像的脚本使用监听器名称来确定是否包含SSL配置项。如果你想使用SSL,你需要在监听器名称中包含SSL(例如LISTENER_BOB_SSL
)。感谢Russ Sayers指出这一点。
Kafka代理彼此之间进行通信,通常在内部网络上(例如Docker网络,AWS VPC等)。要定义要使用的监听器,需要指定KAFKA_INTER_BROKER_LISTENER_NAME
(inter.broker.listener.name
)。所使用的主机/IP必须从代理机器对其他机器可访问。
Kafka 客户端可能不是连接到代理的网络的本地客户端,这就是其他监听器的作用。
每个监听器在连接时都会报告其可以访问的地址。你连接到的代理的地址取决于所使用的网络。如果你是从内部网络连接到代理,它的主机/IP将与从外部连接时不同。
连接到代理时,将向客户端返回与你连接的监听器相对应的监听器。kafkacat
是一个探索此功能的有用工具。使用-L
,你可以查看你连接到的监听器的元数据。基于上述相同的监听器配置(LISTENER_BOB
/ LISTENER_FRED
),查看**broker 0 at
**的相关条目: -
-
在端口9092上连接(我们将其映射为
LISTENER_FRED
),代理的地址将返回为localhost
$ kafkacat -b kafka0:9092 \ -L Metadata for all topics (from broker -1: kafka0:9092/bootstrap): 1 brokers: broker 0 at localhost:9092
-
在端口29092上连接(我们将其映射为
LISTENER_BOB
),代理的地址将返回为kafka0
:$ kafkacat -b kafka0:29092 \ -L Metadata for all topics (from broker 0: kafka0:29092/0): 1 brokers: broker 0 at kafka0:29092
你也可以使用tcpdump
来检查从连接到代理的客户端的流量,并查看代理返回的主机名。
为什么我可以连接到代理,但客户端仍然失败?
简而言之:即使你可以建立与代理的初始连接,元数据中返回的地址仍然可能是客户端无法访问的主机名。 让我们一步一步地来解释。
-
我们在AWS上有一个代理。我们想从我们的笔记本电脑向其发送一条消息。我们知道EC2实例的外部主机名(
ec2-54-191-84-122.us-west-2.compute.amazonaws.com
)。我们在安全组中创建了必要的条目,以打开代理的端口以接收我们的入站流量。我们做了一些聪明的事情,比如检查我们的本地机器是否可以连接到AWS实例上的端口:$ nc -vz ec2-54-191-84-122.us-west-2.compute.amazonaws.com 9092 found 0 associations found 1 connections: 1: flags=82<CONNECTED,PREFERRED> outif utun5 src 172.27.230.23 port 53352 dst 54.191.84.122 port 9092 rank info not available TCP aux info available Connection to ec2-54-191-84-122.us-west-2.compute.amazonaws.com port 9092 [tcp/XmlIpcRegSvc] succeeded!
一切看起来都很好!我们运行:
echo "test"|kafka-console-producer --broker-list ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 --topic test
现在...接下来会发生什么?
-
我们的笔记本电脑成功解析了
ec2-54-191-84-122.us-west-2.compute.amazonaws.com
(解析为IP地址54.191.84.122),并连接到了AWS机器上的端口9092 -
代理在端口9092上接收到入站连接。它向客户端返回带有主机名
ip-172-31-18-160.us-west-2.compute.internal
的元数据,因为这是代理的主机名并且是listeners
的默认值。 -
客户端尝试使用它收到的元数据将数据发送到代理。由于
ip-172-31-18-160.us-west-2.compute.internal
无法从互联网解析,所以连接失败。$ echo "test"|kafka-console-producer --broker-list ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 --topic test >>[2018-07-30 15:08:41,932] ERROR Error when sending message to topic test with key: null, value: 4 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for test-0: 1547 ms has passed since batch creation plus linger time
-
困惑之后,我们尝试从代理机器本身执行相同的操作:
$ echo "foo"|kafka-console-producer --broker-list ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 --topic test >> $ kafka-console-consumer --bootstrap-server ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 --topic test --from-beginning foo
这可以正常工作!这是因为我们连接到端口9092,该端口被配置为内部监听器,因此在元数据中返回主机名
ip-172-31-18-160.us-west-2.compute.internal
,因为它可以从代理机器解析(因为它是自己的主机名!) -
通过使用
kafkacat
,我们可以更轻松地进行操作。使用-L
标志,我们可以看到代理返回的元数据:$ kafkacat -b ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 -L Metadata for all topics (from broker -1: ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092/bootstrap): 1 brokers: broker 0 at ip-172-31-18-160.us-west-2.compute.internal:9092
很明显,返回的是内部主机名。这也使得这个看似困惑的错误变得更加合理-连接到一个主机名,另一个主机名却出现了查找错误:
$ kafkacat -b ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 -C -t test % ERROR: Local: Host resolution failure: ip-172-31-18-160.us-west-2.compute.internal:9092/0: Failed to resolve 'ip-172-31-18-160.us-west-2.compute.internal:9092': nodename nor servname provided, or not known
在这里,我们在本地机器上以消费者模式(
-C
)使用kafkacat
尝试从主题中读取。与之前一样,因为我们从代理在元数据中获取了内部监听器主机名,客户端无法解析该主机名以进行读取/写入操作。
我看到一个StackOverflow答案建议只需更新我的hosts文件...那不是更容易吗?
这只是一个解决错误配置的hack,而不是真正解决问题。
如果代理向客户端报告一个客户端无法连接的主机名,那么将主机名/IP组合硬编码到本地的/etc/hosts
文件中可能看起来是一个不错的解决方法。但这是一个非常脆弱和手动的解决方案。当IP地址发生变化时,当你更换主机并忘记将这个小hack带上时,当其他人也想要做同样的事情时会发生什么?
更好的方法是理解并实际修复网络的advertised.listeners
设置。
如何连接Docker上的Kafka
在Docker中运行时,你需要为Kafka配置两个监听器:
-
在Docker网络内部通信。这可以是代理之间的内部通信(即代理之间),以及在Docker中运行的其他组件(如Kafka Connect)或第三方客户端或生产者之间的通信。 对于这些通信,我们需要使用Docker容器的主机名。在同一个Docker网络上的每个Docker容器将使用Kafka代理容器的主机名来连接它
-
非Docker网络流量。这可以是本地在Docker主机机器上运行的客户端,例如。假设他们将在
localhost
上连接到从Docker容器中暴露出来的端口。 这是docker-compose的片段:kafka0: image: "confluentinc/cp-enterprise-kafka:5.2.1" ports: - '9092:9092' - '29094:29094' depends_on: - zookeeper environment: KAFKA_BROKER_ID: 0 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_LISTENERS: LISTENER_BOB://kafka0:29092,LISTENER_FRED://kafka0:9092,LISTENER_ALICE://kafka0:29094 KAFKA_ADVERTISED_LISTENERS: LISTENER_BOB://kafka0:29092,LISTENER_FRED://localhost:9092,LISTENER_ALICE://never-gonna-give-you-up:29094 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_BOB:PLAINTEXT,LISTENER_FRED:PLAINTEXT,LISTENER_ALICE:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_BOB
原始查看docker-compose.yml 由GitHub提供支持❤
- 在Docker网络内部的客户端使用监听器“BOB”,端口29092和主机名
kafka0
进行连接。通过这样做,它们将获得主机名kafka0
用于连接。每个Docker容器将使用Docker的内部网络解析kafka0
,并能够访问代理。 - 外部于Docker网络的客户端使用监听器“FRED”,端口9092和主机名
localhost
进行连接。端口9092由Docker容器公开,因此可以连接。当客户端连接时,它们将获得代理的元数据中的主机名localhost
,因此在读取/写入数据时连接到该主机名。 - 上述配置将不会处理既在Docker之外又在主机机器之外的客户端想要连接的场景。这是因为
kafka0
(内部Docker主机名)或者localhost
(Docker主机机器的环回地址)将无法解析。
如何连接AWS/IaaS上的Kafka
我提到AWS是因为大多数人使用它,但这适用于任何IaaS/云解决方案。 这里与Docker的情况完全相同。主要区别在于,虽然在Docker中,外部连接可能只是在localhost上(如上所述),但在托管在云中的Kafka(如在AWS上)上,外部连接将来自于不与代理本地连接的机器,并且需要能够连接到代理。 进一步的复杂性在于,虽然Docker网络与主机的网络隔离程度很高,但在IaaS上通常外部主机名在内部是可解析的,这使得你可能会遇到这些问题。 根据你将要连接到代理的外部地址是否在网络上(例如VPC)可本地解析,有两种方法。
选项1 - 外部地址可在本地解析
在这种情况下,你可以只使用一个监听器。现有的监听器称为PLAINTEXT
,只需要覆盖以设置传递给入站客户端的广告主机名。
advertised.listeners=PLAINTEXT://ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092
现在,内部和外部的连接都将使用ec2-54-191-84-122.us-west-2.compute.amazonaws.com
进行连接。因为ec2-54-191-84-122.us-west-2.compute.amazonaws.com
在本地和外部都可以解析,所以一切正常。
选项2 - 外部地址在本地不可解析
你将需要为Kafka配置两个监听器:
- 在AWS网络(VPC)内部通信。这可能是代理之间的内部通信(即代理之间),以及在VPC中运行的其他组件(如Kafka Connect)或第三方客户端或生产者之间的通信。 对于这些通信,我们需要使用EC2机器的内部IP(如果配置了DNS,则为主机名)。
- AWS的外部流量。这可以是来自笔记本电脑的连接,或者来自不在Amazon中托管的计算机。在这两种情况下,都需要使用实例的外部IP(如果配置了DNS,则为主机名)。
这是一个示例配置:
listeners=INTERNAL://0.0.0.0:19092,EXTERNAL://0.0.0.0:9092
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
advertised.listeners=INTERNAL://ip-172-31-18-160.us-west-2.compute.internal:19092,EXTERNAL://ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092
inter.broker.listener.name=INTERNAL
使用Docker探索监听器
查看https://github.com/rmoff/kafka-listeners。这包括一个docker-compose文件,用于启动一个带有配置了多个监听器的Zookeeper实例和Kafka代理。
-
用于Docker网络内部流量的监听器“BOB”(端口29092)
$ docker-compose exec kafkacat \ kafkacat -b kafka0:29092 \ -L Metadata for all topics (from broker 0: kafka0:29092/0): 1 brokers: broker 0 at kafka0:29092
-
用于来自Docker主机机器(
localhost
)的流量的监听器“FRED”(端口9092)$ docker-compose exec kafkacat \ kafkacat -b kafka0:9092 \ -L Metadata for all topics (from broker -1: kafka0:9092/bootstrap): 1 brokers: broker 0 at localhost:9092
-
用于来自外部的流量(通过DNS名称
never-gonna-give-you-up
到达Docker主机)的监听器“ALICE”(端口29094)$ docker run -t --network kafka-listeners_default \ confluentinc/cp-kafkacat \ kafkacat -b kafka0:29094 \ -L Metadata for all topics (from broker -1: kafka0:29094/bootstrap): 1 brokers: broker 0 at never-gonna-give-you-up:29094
总结
我最近在我给出的StackOverflow答案中引用了这篇文章,并重新阐述了解决方案。如果你仍然不太明白,请查看它,也许第二次我解释得更好了:)
参考资料
- https://kafka.apache.org/documentation/#brokerconfigs
- https://cwiki.apache.org/confluence/display/KAFKA/KIP-103%3A+Separation+of+Internal+and+External+traffic
- https://cwiki.apache.org/confluence/display/KAFKA/KIP-2+-+Refactor+brokers+to+allow+listening+on+multiple+ports+and+IPs
- https://cwiki.apache.org/confluence/display/KAFKA/Multiple+Listeners+for+Kafka+Brokers
- https://stackoverflow.com/questions/42998859/kafka-server-configuration-listeners-vs-advertised-listeners
仍然不确定?
请查看Confluent社区Slack群组或Apache Kafka用户邮件列表以获取更多帮助。
评论(0)