我把一个太大的消息推到我本地机器的kafka消息主题中,现在我得到一个错误:
kafka.common.InvalidMessageSizeException: invalid message size
增加取回。size在这里并不理想,因为我实际上并不想接受那么大的消息。
我把一个太大的消息推到我本地机器的kafka消息主题中,现在我得到一个错误:
kafka.common.InvalidMessageSizeException: invalid message size
增加取回。size在这里并不理想,因为我实际上并不想接受那么大的消息。
当前回答
以防有人正在寻找一个更新的答案(在2022年),我发现以下将适用于Kafka 3.3.1版本。这将改变“your-topic”的配置,使消息保留1000毫秒。在清除消息之后,可以将其设置为不同的值。
bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name your-topic --alter --add-config retention.ms=1000
其他回答
下面是删除名为MyTopic的主题的步骤:
描述主题,并记下代理id 为列出的每个代理ID停止Apache Kafka守护进程。 连接到每个代理(从步骤1开始),并删除主题数据文件夹,例如rm -rf /tmp/kafka-logs/MyTopic-0。对其他分区和所有副本重复此操作 删除主题元数据:zkCli.sh,然后删除rmr /brokers/MyTopic 为每台停止的机器启动Apache Kafka守护进程
如果你错过了第3步,Apache Kafka将继续报告当前的主题(例如当你运行Kafka -list-topic.sh时)。
使用Apache Kafka 0.8.0测试。
最简单的方法是将各个日志文件的日期设置为比保留期更早的日期。然后经纪人会在几秒钟内为你清理并移除它们。这有几个优点:
不需要关闭代理,这是一个运行时操作。 避免出现无效偏移异常的可能性(下文将详细介绍)。
In my experience with Kafka 0.7.x, removing the log files and restarting the broker could lead to invalid offset exceptions for certain consumers. This would happen because the broker restarts the offsets at zero (in the absence of any existing log files), and a consumer that was previously consuming from the topic would reconnect to request a specific [once valid] offset. If this offset happens to fall outside the bounds of the new topic logs, then no harm and the consumer resumes at either the beginning or the end. But, if the offset falls within the bounds of the new topic logs, the broker attempts to fetch the message set but fails because the offset doesn't align to an actual message.
还可以通过清除zookeeper中针对该主题的消费者偏移量来缓解这一问题。但如果你不需要一个处女主题,只是想删除现有的内容,那么简单地“触摸”一些主题日志要比停止代理、删除主题日志和清除某些zookeeper节点容易得多,也更可靠。
下面的命令可以删除kafka topic中所有已有的消息:
kafka-delete-records --bootstrap-server <kafka_server:port> --offset-json-file delete.json
删除的结构。Json文件应该如下:
{ “分区”:( { “主题”:“foo”, “分区”:1、 “抵消”:1 } ), “版本”:1 }
offset:-1将删除所有记录的地方 这个命令已经在kafka 2.0.1中测试过了
kafka没有清除/清理主题(队列)的直接方法,但可以通过删除该主题并重新创建它来实现。
首先,确保服务器。属性文件有,如果没有添加delete.topic.enable=true
然后,删除主题 bin/kafka-topics.sh——zookeeper localhost:2181——delete——topic myTopic
然后重新创建它。
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic myTopic --partitions 10 --replication-factor 2
要清除队列,可以删除主题:
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
然后重新创造它:
bin/kafka-topics.sh --create --zookeeper localhost:2181 \
--replication-factor 1 --partitions 1 --topic test