常用操作
查看topic
kafka-topics --list --zookeeper device1:2181
查看topic partitions状态
kafka-topics --describe --zookeeper device1:2181 --topic zhihu_comment
这个命令很有用。有一次我的consumer全部设置正常,但无论如何都不能commit current offset。就想这是不会zookeeper的问题呢?
因为zookeeper是存放commit offset 信息的地方
经过仔细盘查,我发现在另外的topic却可以正常的commit,所以这是该topic的问题,于是要检查下这个
topic哪里出了问题,就用上了这个命令。结果发现有2个partition莫名的消失了,也不知道是为什么。但
总的来说还是把问题定位出来了。
因为,这个问题,我花了不少精力,时间不说,我还特意花了50美金去upwork上找人才解决的。
后来又发现,因为我是用kafka-python来创建的topic,这样的创建导致了 committed offset 没有显示。
但用console创建的topic则不会。
创建topic
kafka-topics –create –zookeeper device1:2181 –replication-factor 1 –partitions 1 –topic test
查看group每个partition offset的状态
|
|
在console启动 producer (一般可用来测试)
删除某个topic的数据
Solution1
1 kafka-topics.sh --delete --zookeeper device1:2181 --topic zhihu_testSolution2
#删除zhihu_profile topic 的所有数据
kafka-configs –zookeeper device1:2181 –entity-type topics –alter –entity-name zhihu_profile –add-config retention.ms=1000再把retention设置调回来
kafka-configs –zookeeper device1:2181 –entity-type topics –alter –entity-name zhihu_profile –add-config retention.ms=8640000000
1 但这种方法不能清除offset 和 commit的记录,因为我查了下某个group对应的offset如下:
topic: zhihu_profile partition: 0 committed: 2515555 last: 2515558 lag: 3
topic: zhihu_profile partition: 1 committed: 2519850 last: 2519853 lag: 3
topic: zhihu_profile partition: 2 committed: 2520594 last: 2520596 lag: 2
|
|
def get_current_offset(topic, group):
consumer = KafkaConsumer(
bootstrap_servers=broker_list,
group_id=group,
enable_auto_commit=False
)
for p in consumer.partitions_for_topic(topic):
tp = TopicPartition(topic, p)
consumer.assign([tp])
committed = consumer.committed(tp)
consumer.seek_to_end(tp)
last_offset = consumer.position(tp)
print("topic: %s partition: %s committed: %s last: %s lag: %s" % (topic, p, committed, last_offset, (last_offset - committed)))
consumer.close(autocommit=False)
|
|
kafka-consumer-groups –bootstrap-server device1:9092 \
–group my-group –reset-offsets –to-earliest –all-topics –execute
241
It is a bit more complex than you described. The auto.offset.reset config kicks in ONLY if your consumer group does not have a valid offset committed somewhere (2 supported offset storages now are Kafka and Zookeeper). And it also depends on what sort of consumer you use.
If you use a high-level java consumer then imagine following scenarios:
You have a consumer in a consumer group group1 that has consumed 5 messages and died. Next time you start this consumer it won’t even use that auto.offset.reset config and will continue from the place it died because it will just fetch the stored offset from the offset storage (Kafka or ZK as I mentioned).
You have messages in a topic (like you described) and you start a consumer in a new consumer group group2. There is no offset stored anywhere and this time the auto.offset.reset config will decide whether to start from the beginning of the topic (smallest) or from the end of the topic (largest)
One more thing that affects what offset value will correspond to smallest and largest configs is log retention policy. Imagine you have a topic with retention configured to 1 hour. You produce 5 messages, and then an hour later you post 5 more messages. The largest offset will still remain the same as in previous example but the smallest one won’t be able to be 0 because Kafka will already remove these messages and thus the smallest available offset will be 5.
Everything mentioned above is not related to SimpleConsumer and every time you run it, it will decide where to start from using the auto.offset.reset config.
```