我试着用卡夫卡。 所有配置都正确完成,但当我试图从控制台产生消息时,我一直得到以下错误

WARN Error while fetching metadata with correlation id 39 : 
     {4-3-16-topic1=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

Kafka版本:2.11-0.9.0.0


当前回答

对于我来说,我没有为Kafka实例指定代理id。 当它在Docker环境中重启时,有时会从zookeeper获取一个新的id。 如果您的代理id大于1000,只需指定环境变量KAFKA_BROKER_ID。

使用它可以查看代理、主题和分区。

brew install kafkacat
kafkacat -b [kafka_ip]:[kafka_poot] -L

其他回答

当我们试图订阅一个尚未创建的主题时,我们往往会收到这条消息。我们通常依赖于在部署环境中预先创建的主题,但我们有针对dockerized kafka实例运行的组件测试,每次都干净启动。

在这种情况下,我们在测试设置中使用AdminUtils来检查主题是否存在,如果不存在就创建它。有关设置AdminUtils的更多信息,请参阅另一个堆栈溢出。

如果你得到这样重复的错误消息:

Error while fetching metadata with correlation id 3991 : {your.topic=LEADER_NOT_AVAILABLE}

Or

Discovered group coordinator 172.25.1.2:9092 (id: 2147483645 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:677)
(Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:509)
Group coordinator 172.25.1.2:9092 (id: 2147483645 rack: null) is unavailable or invalid, will attempt rediscovery (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:729)
Discovered group coordinator 172.25.40.219:9092 (id: 2147483645 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:677)
Group coordinator 172.25.1.2:9092 (id: 2147483645 rack: null) is unavailable or invalid, will attempt rediscovery (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:729)

然后,你需要像这样在kafka server.properties中配置监听器:

 listeners=PLAINTEXT://your.server.ip:9092

这是在Kafka 2.5.0和confluent 5.4.1平台上尝试的解决方案。

我使用的是kafka_2.12-0.10.2.1:

vi problem / server . properties

加到下面一行:

listeners=PLAINTEXT://localhost:9092

不需要更改广告。当它拾取值时监听器 从STD侦听器属性。

代理将向生产者和消费者发布主机名和端口。如果没有设置,

如果配置了“监听器”,则使用该值

否则,它将使用从java.net.InetAddress.getCanonicalHostName()返回的值。

停止Kafka代理:

bin/kafka-server-stop.sh

重新启动代理:

bin/kafka-server-start.sh -daemon config/server.properties

现在你应该不会看到任何问题。

上述答案中提到的广告听众可能是原因之一。其他可能的原因有:

主题可能还没有创建。你可以使用bin/kafka-topics——list——zookeeper <zookeeper ip>:<zookeeper port>来检查 检查您提供给生成器以获取元数据的引导服务器。如果引导服务器不包含关于主题的最新元数据(例如,当它丢失其zookeeper声明时)。您必须添加多个引导服务器。

另外,确保已将所发布的侦听器设置为IP:9092而不是localhost:9092。后者意味着只能通过本地主机访问代理。

当我遇到错误时,我记得在引导服务器(或代理列表)列表中使用了PLAINTEXT://<ip>:<PORT>,它工作了,奇怪的是。

bin/kafka-console-producer --topic sample --broker-list PLAINTEXT://<IP>:<PORT>

对于任何试图在kubernetes上运行kafka并遇到这个错误的人来说,这是最终为我解决的问题:

你必须:

将主机名添加到pod规范中,这样kafka就可以找到自己。

or

如果使用hostPort,则需要hostNetwork: true和dnsPolicy: ClusterFirstWithHostNet

这样做的原因是因为Kafka需要与自身对话,它决定使用“广告”侦听器/主机名来找到自己,而不是使用localhost。 即使您有一个将所广告的主机名指向pod的Service,它在pod中也不可见。我真的不知道为什么会这样,但至少有一个变通办法。

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  name: zookeeper-cluster1
  namespace: default
  labels:
    app: zookeeper-cluster1
spec:
  replicas: 1
  selector:
    matchLabels:
      app: zookeeper-cluster1
  template:
    metadata:
      labels:
        name: zookeeper-cluster1
        app: zookeeper-cluster1
    spec:
      hostname: zookeeper-cluster1
      containers:
      - name: zookeeper-cluster1
        image: wurstmeister/zookeeper:latest
        imagePullPolicy: IfNotPresent
        ports:
        - containerPort: 2181
        - containerPort: 2888
        - containerPort: 3888

---

apiVersion: v1
kind: Service
metadata:
  name: zookeeper-cluster1
  namespace: default
  labels:
    app: zookeeper-cluster1
spec:
  type: NodePort
  selector:
    app: zookeeper-cluster1
  ports:
  - name: zookeeper-cluster1
    protocol: TCP
    port: 2181
    targetPort: 2181
  - name: zookeeper-follower-cluster1
    protocol: TCP
    port: 2888
    targetPort: 2888
  - name: zookeeper-leader-cluster1
    protocol: TCP
    port: 3888
    targetPort: 3888

---

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  name: kafka-cluster
  namespace: default
  labels:
    app: kafka-cluster
spec:
  replicas: 1
  selector:
    matchLabels:
      app: kafka-cluster
  template:
    metadata:
      labels:
        name: kafka-cluster
        app: kafka-cluster
    spec:
      hostname: kafka-cluster
      containers:
      - name: kafka-cluster
        image: wurstmeister/kafka:latest
        imagePullPolicy: IfNotPresent
        env:
        - name: KAFKA_ADVERTISED_LISTENERS
          value: PLAINTEXT://kafka-cluster:9092
        - name: KAFKA_ZOOKEEPER_CONNECT
          value: zookeeper-cluster1:2181
        ports:
        - containerPort: 9092

---

apiVersion: v1
kind: Service
metadata:
  name: kafka-cluster
  namespace: default
  labels:
    app: kafka-cluster
spec:
  type: NodePort
  selector:
    app: kafka-cluster
  ports:
  - name: kafka-cluster
    protocol: TCP
    port: 9092
    targetPort: 9092