首页
Preview

一文搞懂kafka监听

(此文在Confluent.io博客上发布)


这个问题在StackOverflow等地方经常被问到,所以我来试着帮助一下。 简而言之:你需要设置advertised.listeners(或者如果你使用Docker镜像则为KAFKA_ADVERTISED_LISTENERS),将其设置为外部地址(主机/IP),以便客户端可以正确地连接到它。否则,它们将尝试连接到内部主机地址,如果无法访问,则会出现问题。 换句话说,引用Spencer Ruport的话:

LISTENERS是Kafka绑定的接口。ADVERTISED_LISTENERS是客户端可以连接的方式。

images/docker01.png 在本文中,我将讨论为什么这是必要的,然后根据几种场景(Docker和AWS)展示如何进行设置。

是否有人在监听?

Kafka是一个分布式系统。数据从给定分区的Leader读取和写入,该分区可以位于集群中的任何代理中。当客户端(生产者/消费者)启动时,它将请求关于哪个代理是该分区的Leader的元数据,并且可以从任何代理中获取这些信息。返回的元数据将包括Leader代理可用的端点,并且客户端将使用这些端点来连接到代理以根据需要读取/写入数据。 正是这些端点给人们带来了麻烦。在单机上运行‘裸机’(没有虚拟机,没有Docker)的情况下,一切可能都是主机名(或仅为*localhost*),这很简单。但是一旦你进入更复杂的网络设置和多个节点,你就需要更加注意。 让我们假设你有多个网络。这可以是以下情况:

  • Docker内部网络和主机机器
  • 云中的代理(例如AWS EC2)以及本地的机器(甚至是另一个云中的机器)

你需要告诉Kafka代理如何相互连接,但也要确保外部客户端(生产者/消费者)可以连接到所需的代理。 关键是,当你运行一个客户端时,你向其传递的代理仅仅是用于获取集群中的代理的元数据的地方。实际用于读取/写入数据的主机和IP是基于代理在初始连接中返回的数据的,即使它只是一个单节点,返回的代理与连接的代理相同。 为了正确配置这一点,你需要了解Kafka代理可以有多个监听器。一个监听器是以下三个组件的组合:

  1. 主机/IP
  2. 端口
  3. 协议

让我们来看一些配置。通常,协议也用于监听器名称,但在这里,我们使用抽象名称来使其更加清晰:

KAFKA_LISTENERS: LISTENER_BOB://kafka0:29092,LISTENER_FRED://localhost:9092
KAFKA_ADVERTISED_LISTENERS: LISTENER_BOB://kafka0:29092,LISTENER_FRED://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_BOB:PLAINTEXT,LISTENER_FRED:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_BOB

我使用的是Docker配置名称-如果你直接配置server.properties(例如在AWS上等)的等效配置显示在以下列表的缩进中

  • KAFKA_LISTENERS 是一个以逗号分隔的监听器列表,以及Kafka绑定用于监听的主机/IP和端口。对于更复杂的网络,这可能是与机器上的给定网络接口关联的IP地址。默认值为0.0.0.0,表示监听所有接口。
    • listeners
  • KAFKA_ADVERTISED_LISTENERS 是一个以逗号分隔的监听器列表,以及它们的主机/IP和端口。这是传递给客户端的元数据。
    • advertised.listeners
  • KAFKA_LISTENER_SECURITY_PROTOCOL_MAP 为每个监听器名称定义安全协议的键/值对。
    • listener.security.protocol.map

注意:配置Docker镜像的脚本使用监听器名称来确定是否包含SSL配置项。如果你想使用SSL,你需要在监听器名称中包含SSL(例如LISTENER_BOB_SSL)。感谢Russ Sayers指出这一点。 Kafka代理彼此之间进行通信,通常在内部网络上(例如Docker网络,AWS VPC等)。要定义要使用的监听器,需要指定KAFKA_INTER_BROKER_LISTENER_NAMEinter.broker.listener.name)。所使用的主机/IP必须从代理机器对其他机器可访问。 Kafka 客户端可能不是连接到代理的网络的本地客户端,这就是其他监听器的作用。 每个监听器在连接时都会报告其可以访问的地址。你连接到的代理的地址取决于所使用的网络。如果你是从内部网络连接到代理,它的主机/IP将与从外部连接时不同。 连接到代理时,将向客户端返回与你连接的监听器相对应的监听器。kafkacat是一个探索此功能的有用工具。使用-L,你可以查看你连接到的监听器的元数据。基于上述相同的监听器配置(LISTENER_BOB / LISTENER_FRED),查看**broker 0 at**的相关条目: -

  • 在端口9092上连接(我们将其映射为LISTENER_FRED),代理的地址将返回为localhost

      $ kafkacat -b kafka0:9092 \
                 -L
      Metadata for all topics (from broker -1: kafka0:9092/bootstrap):
      1 brokers:
        broker 0 at localhost:9092
    
    
  • 在端口29092上连接(我们将其映射为LISTENER_BOB),代理的地址将返回为kafka0

      $ kafkacat -b kafka0:29092 \
                 -L
      Metadata for all topics (from broker 0: kafka0:29092/0):
      1 brokers:
        broker 0 at kafka0:29092
    
    

你也可以使用tcpdump来检查从连接到代理的客户端的流量,并查看代理返回的主机名。

为什么我可以连接到代理,但客户端仍然失败?

简而言之:即使你可以建立与代理的初始连接,元数据中返回的地址仍然可能是客户端无法访问的主机名。 让我们一步一步地来解释。

  1. 我们在AWS上有一个代理。我们想从我们的笔记本电脑向其发送一条消息。我们知道EC2实例的外部主机名(ec2-54-191-84-122.us-west-2.compute.amazonaws.com)。我们在安全组中创建了必要的条目,以打开代理的端口以接收我们的入站流量。我们做了一些聪明的事情,比如检查我们的本地机器是否可以连接到AWS实例上的端口:

     $ nc -vz ec2-54-191-84-122.us-west-2.compute.amazonaws.com 9092
     found 0 associations
     found 1 connections:
         1:	flags=82<CONNECTED,PREFERRED>
       outif utun5
       src 172.27.230.23 port 53352
       dst 54.191.84.122 port 9092
       rank info not available
       TCP aux info available
    
     Connection to ec2-54-191-84-122.us-west-2.compute.amazonaws.com port 9092 [tcp/XmlIpcRegSvc] succeeded!
    
    

    一切看起来都很好!我们运行:

     echo "test"|kafka-console-producer --broker-list ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 --topic test
    
    

    现在...接下来会发生什么?

  2. 我们的笔记本电脑成功解析了ec2-54-191-84-122.us-west-2.compute.amazonaws.com(解析为IP地址54.191.84.122),并连接到了AWS机器上的端口9092

  3. 代理在端口9092上接收到入站连接。它向客户端返回带有主机名ip-172-31-18-160.us-west-2.compute.internal的元数据,因为这是代理的主机名并且是listeners的默认值。

  4. 客户端尝试使用它收到的元数据将数据发送到代理。由于ip-172-31-18-160.us-west-2.compute.internal无法从互联网解析,所以连接失败。

     $ echo "test"|kafka-console-producer --broker-list ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 --topic test
     >>[2018-07-30 15:08:41,932] ERROR Error when sending message to topic test with key: null, value: 4 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
     org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for test-0: 1547 ms has passed since batch creation plus linger time
    
    
  5. 困惑之后,我们尝试从代理机器本身执行相同的操作:

     $ echo "foo"|kafka-console-producer --broker-list ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 --topic test
     >>
     $ kafka-console-consumer --bootstrap-server ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 --topic test --from-beginning
     foo
    
    

    这可以正常工作!这是因为我们连接到端口9092,该端口被配置为内部监听器,因此在元数据中返回主机名ip-172-31-18-160.us-west-2.compute.internal,因为它可以从代理机器解析(因为它是自己的主机名!)

  6. 通过使用kafkacat,我们可以更轻松地进行操作。使用-L标志,我们可以看到代理返回的元数据:

     $ kafkacat -b ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 -L
     Metadata for all topics (from broker -1: ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092/bootstrap):
     1 brokers:
       broker 0 at ip-172-31-18-160.us-west-2.compute.internal:9092
    
    

    很明显,返回的是内部主机名。这也使得这个看似困惑的错误变得更加合理-连接到一个主机名,另一个主机名却出现了查找错误:

     $ kafkacat -b ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 -C -t test
     % ERROR: Local: Host resolution failure: ip-172-31-18-160.us-west-2.compute.internal:9092/0: Failed to resolve 'ip-172-31-18-160.us-west-2.compute.internal:9092': nodename nor servname provided, or not known
    
    

    在这里,我们在本地机器上以消费者模式(-C)使用kafkacat尝试从主题中读取。与之前一样,因为我们从代理在元数据中获取了内部监听器主机名,客户端无法解析该主机名以进行读取/写入操作。

我看到一个StackOverflow答案建议只需更新我的hosts文件...那不是更容易吗?

这只是一个解决错误配置的hack,而不是真正解决问题。 如果代理向客户端报告一个客户端无法连接的主机名,那么将主机名/IP组合硬编码到本地的/etc/hosts文件中可能看起来是一个不错的解决方法。但这是一个非常脆弱和手动的解决方案。当IP地址发生变化时,当你更换主机并忘记将这个小hack带上时,当其他人也想要做同样的事情时会发生什么? 更好的方法是理解并实际修复网络的advertised.listeners设置。

如何连接Docker上的Kafka

images/docker01.png 在Docker中运行时,你需要为Kafka配置两个监听器:

  1. 在Docker网络内部通信。这可以是代理之间的内部通信(即代理之间),以及在Docker中运行的其他组件(如Kafka Connect)或第三方客户端或生产者之间的通信。 对于这些通信,我们需要使用Docker容器的主机名。在同一个Docker网络上的每个Docker容器将使用Kafka代理容器的主机名来连接它

  2. 非Docker网络流量。这可以是本地在Docker主机机器上运行的客户端,例如。假设他们将在localhost上连接到从Docker容器中暴露出来的端口。 这是docker-compose的片段:

      kafka0:
     image: "confluentinc/cp-enterprise-kafka:5.2.1"
     ports:
       - '9092:9092'
       - '29094:29094'
     depends_on:
       - zookeeper
     environment:
       KAFKA_BROKER_ID: 0
       KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
       KAFKA_LISTENERS: LISTENER_BOB://kafka0:29092,LISTENER_FRED://kafka0:9092,LISTENER_ALICE://kafka0:29094
       KAFKA_ADVERTISED_LISTENERS: LISTENER_BOB://kafka0:29092,LISTENER_FRED://localhost:9092,LISTENER_ALICE://never-gonna-give-you-up:29094
       KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_BOB:PLAINTEXT,LISTENER_FRED:PLAINTEXT,LISTENER_ALICE:PLAINTEXT
       KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_BOB
    

    原始查看docker-compose.yml GitHub提供支持❤

  • 在Docker网络内部的客户端使用监听器“BOB”,端口29092和主机名kafka0进行连接。通过这样做,它们将获得主机名kafka0用于连接。每个Docker容器将使用Docker的内部网络解析kafka0,并能够访问代理。
  • 外部于Docker网络的客户端使用监听器“FRED”,端口9092和主机名localhost进行连接。端口9092由Docker容器公开,因此可以连接。当客户端连接时,它们将获得代理的元数据中的主机名localhost,因此在读取/写入数据时连接到该主机名。
  • 上述配置将不会处理既在Docker之外在主机机器之外的客户端想要连接的场景。这是因为kafka0(内部Docker主机名)或者localhost(Docker主机机器的环回地址)将无法解析。

如何连接AWS/IaaS上的Kafka

我提到AWS是因为大多数人使用它,但这适用于任何IaaS/云解决方案。 这里与Docker的情况完全相同。主要区别在于,虽然在Docker中,外部连接可能只是在localhost上(如上所述),但在托管在云中的Kafka(如在AWS上)上,外部连接将来自于不与代理本地连接的机器,并且需要能够连接到代理。 进一步的复杂性在于,虽然Docker网络与主机的网络隔离程度很高,但在IaaS上通常外部主机名在内部是可解析的,这使得你可能会遇到这些问题。 根据你将要连接到代理的外部地址是否在网络上(例如VPC)可本地解析,有两种方法。

选项1 - 外部地址可在本地解析

images/aws01.png 在这种情况下,你可以只使用一个监听器。现有的监听器称为PLAINTEXT,只需要覆盖以设置传递给入站客户端的广告主机名。

advertised.listeners=PLAINTEXT://ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092

现在,内部和外部的连接都将使用ec2-54-191-84-122.us-west-2.compute.amazonaws.com进行连接。因为ec2-54-191-84-122.us-west-2.compute.amazonaws.com在本地和外部都可以解析,所以一切正常。

选项2 - 外部地址在本地不可解析

你将需要为Kafka配置两个监听器:

  1. 在AWS网络(VPC)内部通信。这可能是代理之间的内部通信(即代理之间),以及在VPC中运行的其他组件(如Kafka Connect)或第三方客户端或生产者之间的通信。 对于这些通信,我们需要使用EC2机器的内部IP(如果配置了DNS,则为主机名)。
  2. AWS的外部流量。这可以是来自笔记本电脑的连接,或者来自不在Amazon中托管的计算机。在这两种情况下,都需要使用实例的外部IP(如果配置了DNS,则为主机名)。

images/aws02.png 这是一个示例配置:

listeners=INTERNAL://0.0.0.0:19092,EXTERNAL://0.0.0.0:9092
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
advertised.listeners=INTERNAL://ip-172-31-18-160.us-west-2.compute.internal:19092,EXTERNAL://ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092
inter.broker.listener.name=INTERNAL

使用Docker探索监听器

查看https://github.com/rmoff/kafka-listeners。这包括一个docker-compose文件,用于启动一个带有配置了多个监听器的Zookeeper实例和Kafka代理。

  • 用于Docker网络内部流量的监听器“BOB”(端口29092)

      $ docker-compose exec kafkacat \
              kafkacat -b kafka0:29092 \
              -L
      Metadata for all topics (from broker 0: kafka0:29092/0):
      1 brokers:
        broker 0 at kafka0:29092
    
    
  • 用于来自Docker主机机器(localhost)的流量的监听器“FRED”(端口9092)

      $ docker-compose exec kafkacat \
              kafkacat -b kafka0:9092 \
                      -L
      Metadata for all topics (from broker -1: kafka0:9092/bootstrap):
      1 brokers:
        broker 0 at localhost:9092
    
    
  • 用于来自外部的流量(通过DNS名称never-gonna-give-you-up到达Docker主机)的监听器“ALICE”(端口29094)

      $ docker run -t --network kafka-listeners_default \
                  confluentinc/cp-kafkacat \
                  kafkacat -b kafka0:29094 \
                          -L
      Metadata for all topics (from broker -1: kafka0:29094/bootstrap):
      1 brokers:
        broker 0 at never-gonna-give-you-up:29094
    
    

总结

我最近在我给出的StackOverflow答案中引用了这篇文章,并重新阐述了解决方案。如果你仍然不太明白,请查看它,也许第二次我解释得更好了:)

参考资料

仍然不确定?

请查看Confluent社区Slack群组Apache Kafka用户邮件列表以获取更多帮助。

版权声明:本文内容由TeHub注册用户自发贡献,版权归原作者所有,TeHub社区不拥有其著作权,亦不承担相应法律责任。 如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

点赞(0)
收藏(0)
jimmy
悟已往之不谏 知来者之可追

评论(0)

添加评论