我把一个太大的消息推到我本地机器的kafka消息主题中,现在我得到一个错误:
kafka.common.InvalidMessageSizeException: invalid message size
增加取回。size在这里并不理想,因为我实际上并不想接受那么大的消息。
我把一个太大的消息推到我本地机器的kafka消息主题中,现在我得到一个错误:
kafka.common.InvalidMessageSizeException: invalid message size
增加取回。size在这里并不理想,因为我实际上并不想接受那么大的消息。
当前回答
要清除队列,可以删除主题:
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
然后重新创造它:
bin/kafka-topics.sh --create --zookeeper localhost:2181 \
--replication-factor 1 --partitions 1 --topic test
其他回答
要清除队列,可以删除主题:
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
然后重新创造它:
bin/kafka-topics.sh --create --zookeeper localhost:2181 \
--replication-factor 1 --partitions 1 --topic test
user644265在这个答案中建议的临时减少主题的保留时间的解决方法仍然有效,但最新版本的kafka-configs会警告——zookeeper选项已被弃用:
警告:——zookeeper已弃用,并将在Kafka的未来版本中删除
使用——bootstrap-server代替;例如
kafka-configs --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name my_topic --add-config retention.ms=100
and
kafka-configs --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name my_topic --delete-config retention.ms
您必须在配置中启用此功能
echo "delete.topic.enable=true" >> /opt/kafka/config/server.properties
sudo systemctl stop kafka
sudo systemctl start kafka
清除主题
/opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic flows
创建主题
# /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:2181 --replication-factor 1 --partitions 1 --topic Test
阅读主题
# /opt/kafka/bin/kafka-console-consumer.sh localhost:9092 --topic flows --from-beginning
如果你使用confluentinc/cp-kafka容器,下面是删除主题的命令。
docker exec -it <kafka-container-id> kafka-topics --zookeeper zookeeper:2181 --delete --topic <topic-name>
成功的回应:
Topic <topic-name> is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
使用应用程序组(GroupName应该与应用程序kafka组名相同)清理来自特定主题的所有消息。
./kafka-path/bin/kafka-console-consumer.sh——zookeeper localhost:2181——topic topicName——from-beginning——group application-group .sh