我把一个太大的消息推到我本地机器的kafka消息主题中,现在我得到一个错误:
kafka.common.InvalidMessageSizeException: invalid message size
增加取回。size在这里并不理想,因为我实际上并不想接受那么大的消息。
我把一个太大的消息推到我本地机器的kafka消息主题中,现在我得到一个错误:
kafka.common.InvalidMessageSizeException: invalid message size
增加取回。size在这里并不理想,因为我实际上并不想接受那么大的消息。
当前回答
有时,如果您有一个饱和的集群(分区太多,或使用加密的主题数据,或使用SSL,或控制器在一个坏的节点上,或连接不稳定),清除该主题将花费很长时间。
我遵循这些步骤,特别是在使用TLS时。
1:使用kafka工具运行:
kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=1 --entity-name <topic-name>
2:运行:
kafka-控制台-消费者——消费者-财产安全。SSL .truststore.location=/etc/schema-registry/secrets/trust。JKS——消费者-属性ssl.truststore。Password = Password——consumer-property ssl.keystore.location=/etc/schema-registry/secrets/identity。JKS——消费者-属性ssl.keystore。密码=密码——consumer-property ssl.key。Password = Password——bootstrap-server broker01.kafka.com:9092——topic <topic-name>——new-consumer——from-beginning
3:当主题为空时,将主题保留设置回初始设置。
kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=604800000 --entity-name <topic-name>
希望这能帮助到一些人,因为它不容易宣传。
其他回答
从Java,使用新的AdminZkClient代替已弃用的AdminUtils:
public void reset() {
try (KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181", false, 200_000,
5000, 10, Time.SYSTEM, "metricGroup", "metricType")) {
for (Map.Entry<String, List<PartitionInfo>> entry : listTopics().entrySet()) {
deleteTopic(entry.getKey(), zkClient);
}
}
}
private void deleteTopic(String topic, KafkaZkClient zkClient) {
// skip Kafka internal topic
if (topic.startsWith("__")) {
return;
}
System.out.println("Resetting Topic: " + topic);
AdminZkClient adminZkClient = new AdminZkClient(zkClient);
adminZkClient.deleteTopic(topic);
// deletions are not instantaneous
boolean success = false;
int maxMs = 5_000;
while (maxMs > 0 && !success) {
try {
maxMs -= 100;
adminZkClient.createTopic(topic, 1, 1, new Properties(), null);
success = true;
} catch (TopicExistsException ignored) {
}
}
if (!success) {
Assert.fail("failed to create " + topic);
}
}
private Map<String, List<PartitionInfo>> listTopics() {
Properties props = new Properties();
props.put("bootstrap.servers", kafkaContainer.getBootstrapServers());
props.put("group.id", "test-container-consumer-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
Map<String, List<PartitionInfo>> topics = consumer.listTopics();
consumer.close();
return topics;
}
最简单的方法是将各个日志文件的日期设置为比保留期更早的日期。然后经纪人会在几秒钟内为你清理并移除它们。这有几个优点:
不需要关闭代理,这是一个运行时操作。 避免出现无效偏移异常的可能性(下文将详细介绍)。
In my experience with Kafka 0.7.x, removing the log files and restarting the broker could lead to invalid offset exceptions for certain consumers. This would happen because the broker restarts the offsets at zero (in the absence of any existing log files), and a consumer that was previously consuming from the topic would reconnect to request a specific [once valid] offset. If this offset happens to fall outside the bounds of the new topic logs, then no harm and the consumer resumes at either the beginning or the end. But, if the offset falls within the bounds of the new topic logs, the broker attempts to fetch the message set but fails because the offset doesn't align to an actual message.
还可以通过清除zookeeper中针对该主题的消费者偏移量来缓解这一问题。但如果你不需要一个处女主题,只是想删除现有的内容,那么简单地“触摸”一些主题日志要比停止代理、删除主题日志和清除某些zookeeper节点容易得多,也更可靠。
除了更新用户留存。Ms和保留率。字节,我注意到主题清理策略应该是“delete”(默认),如果是“compact”,它将保留消息更长的时间,即,如果它是“compact”,你必须指定delete.retention.ms。
$ ./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics
Configs for topics:test-topic-3-100 are retention.ms=1000,delete.retention.ms=10000,cleanup.policy=delete,retention.bytes=1
还得监控最早/最新的偏移量,以确认这一成功发生,也可以检查du -h /tmp/kafka-logs/test-topic-3-100-*
$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -1 | awk -F ":" '{sum += $3} END {print sum}'
26599762
$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -2 | awk -F ":" '{sum += $3} END {print sum}'
26599762
另一个问题是,你必须先获得当前配置,以便在删除成功后记得恢复: ./bin/kafka-config .sh——zookeeper localhost:2181——describe——entity-name test-topic-3-100——entity-type topic
在Kafka 0.8.2中测试,作为快速启动的例子: 首先,添加一行到服务器。配置文件夹下的属性文件:
delete.topic.enable=true
然后,执行以下命令:
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
然后重新创建它,以便客户端继续对空主题进行操作
下面是删除名为MyTopic的主题的步骤:
描述主题,并记下代理id 为列出的每个代理ID停止Apache Kafka守护进程。 连接到每个代理(从步骤1开始),并删除主题数据文件夹,例如rm -rf /tmp/kafka-logs/MyTopic-0。对其他分区和所有副本重复此操作 删除主题元数据:zkCli.sh,然后删除rmr /brokers/MyTopic 为每台停止的机器启动Apache Kafka守护进程
如果你错过了第3步,Apache Kafka将继续报告当前的主题(例如当你运行Kafka -list-topic.sh时)。
使用Apache Kafka 0.8.0测试。