我把一个太大的消息推到我本地机器的kafka消息主题中,现在我得到一个错误:

kafka.common.InvalidMessageSizeException: invalid message size

增加取回。size在这里并不理想,因为我实际上并不想接受那么大的消息。


当前回答

从Java,使用新的AdminZkClient代替已弃用的AdminUtils:

  public void reset() {
    try (KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181", false, 200_000,
        5000, 10, Time.SYSTEM, "metricGroup", "metricType")) {

      for (Map.Entry<String, List<PartitionInfo>> entry : listTopics().entrySet()) {
        deleteTopic(entry.getKey(), zkClient);
      }
    }
  }

  private void deleteTopic(String topic, KafkaZkClient zkClient) {

    // skip Kafka internal topic
    if (topic.startsWith("__")) {
      return;
    }

    System.out.println("Resetting Topic: " + topic);
    AdminZkClient adminZkClient = new AdminZkClient(zkClient);
    adminZkClient.deleteTopic(topic);

    // deletions are not instantaneous
    boolean success = false;
    int maxMs = 5_000;
    while (maxMs > 0 && !success) {
      try {
        maxMs -= 100;
        adminZkClient.createTopic(topic, 1, 1, new Properties(), null);
        success = true;
      } catch (TopicExistsException ignored) {
      }
    }

    if (!success) {
      Assert.fail("failed to create " + topic);
    }
  }

  private Map<String, List<PartitionInfo>> listTopics() {
    Properties props = new Properties();
    props.put("bootstrap.servers", kafkaContainer.getBootstrapServers());
    props.put("group.id", "test-container-consumer-group");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    Map<String, List<PartitionInfo>> topics = consumer.listTopics();
    consumer.close();

    return topics;
  }

其他回答

另一种手动清除主题的方法是:

在经纪人中:

停止kafka broker Sudo服务kafka停止 删除所有分区日志文件(应该在所有代理上执行) sudo rm - r /kafka-storage/kafka-logs/<some_topic_name>-* .

动物园管理员:

运行zookeeper命令行界面 sudo /usr/lib/zookeeper/bin/zkCli.sh 使用zkCli删除主题元数据 rmr /代理/主题/ < some_topic_name >

在经纪人那里:

重新启动代理服务 Sudo服务kafka启动

更新:这个答案与Kafka 0.6相关。对于Kafka 0.8和以后参见@Patrick的回答。

是的,停止kafka,手动删除相应子目录下的所有文件(在kafka数据目录下很容易找到)。kafka重启后,主题将为空。

以防有人正在寻找一个更新的答案(在2022年),我发现以下将适用于Kafka 3.3.1版本。这将改变“your-topic”的配置,使消息保留1000毫秒。在清除消息之后,可以将其设置为不同的值。

bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name your-topic  --alter --add-config retention.ms=1000

您必须在配置中启用此功能

echo "delete.topic.enable=true" >> /opt/kafka/config/server.properties 
sudo systemctl stop kafka 
sudo systemctl start kafka 

清除主题

/opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic flows

创建主题

# /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:2181 --replication-factor 1 --partitions 1 --topic Test

阅读主题

# /opt/kafka/bin/kafka-console-consumer.sh  localhost:9092 --topic flows --from-beginning

kafka没有清除/清理主题(队列)的直接方法,但可以通过删除该主题并重新创建它来实现。

首先,确保服务器。属性文件有,如果没有添加delete.topic.enable=true

然后,删除主题 bin/kafka-topics.sh——zookeeper localhost:2181——delete——topic myTopic

然后重新创建它。

bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic myTopic --partitions 10 --replication-factor 2