Kafka日常操作指南

常用操作

查看topic

kafka-topics --list --zookeeper device1:2181

查看topic partitions状态

kafka-topics --describe --zookeeper device1:2181 --topic zhihu_comment

这个命令很有用。有一次我的consumer全部设置正常,但无论如何都不能commit current offset。就想这是不会zookeeper的问题呢?
因为zookeeper是存放commit offset 信息的地方
经过仔细盘查,我发现在另外的topic却可以正常的commit,所以这是该topic的问题,于是要检查下这个
topic哪里出了问题,就用上了这个命令。结果发现有2个partition莫名的消失了,也不知道是为什么。但
总的来说还是把问题定位出来了。
因为,这个问题,我花了不少精力,时间不说,我还特意花了50美金去upwork上找人才解决的。

后来又发现,因为我是用kafka-python来创建的topic,这样的创建导致了 committed offset 没有显示。
但用console创建的topic则不会。

1
2
3
4
5
def create_topic(topic):
admin_client = KafkaAdminClient(bootstrap_servers=broker_list, client_id='test')
topic_list = []
topic_list.append(NewTopic(name=topic, num_partitions=3, replication_factor=1))
admin_client.create_topics(new_topics=topic_list, validate_only=False)

创建topic

kafka-topics –create –zookeeper device1:2181 –replication-factor 1 –partitions 1 –topic test

查看group每个partition offset的状态

1
kafka-consumer-groups --bootstrap-server device1:9092 --group test --describe

在console启动 producer (一般可用来测试)

删除某个topic的数据

Solution1

1
kafka-topics.sh --delete --zookeeper device1:2181 --topic zhihu_test

Solution2

#删除zhihu_profile topic 的所有数据
kafka-configs –zookeeper device1:2181 –entity-type topics –alter –entity-name zhihu_profile –add-config retention.ms=1000

再把retention设置调回来

kafka-configs –zookeeper device1:2181 –entity-type topics –alter –entity-name zhihu_profile –add-config retention.ms=8640000000

1
但这种方法不能清除offset 和 commit的记录,因为我查了下某个group对应的offset如下:

topic: zhihu_profile partition: 0 committed: 2515555 last: 2515558 lag: 3
topic: zhihu_profile partition: 1 committed: 2519850 last: 2519853 lag: 3
topic: zhihu_profile partition: 2 committed: 2520594 last: 2520596 lag: 2

1
查询某个group offset的代码:

def get_current_offset(topic, group):

consumer = KafkaConsumer(
        bootstrap_servers=broker_list,
        group_id=group,
        enable_auto_commit=False
    )   

for p in consumer.partitions_for_topic(topic):
    tp = TopicPartition(topic, p)
    consumer.assign([tp])
    committed = consumer.committed(tp)
    consumer.seek_to_end(tp)
    last_offset = consumer.position(tp)
    print("topic: %s partition: %s committed: %s last: %s lag: %s" % (topic, p, committed, last_offset, (last_offset - committed)))

consumer.close(autocommit=False)
1
2
3
### reset offset

kafka-consumer-groups –bootstrap-server device1:9092 \
–group my-group –reset-offsets –to-earliest –all-topics –execute

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
> Reset offset of topic foo partition 0 to 1
`--reset-offsets --group test.group --topic foo:0 --to-offset 1`
> Reset offset of topic foo partition 0,1,2 to earliest
`--reset-offsets --group test.group --topic foo:0,1,2 --to-earliest`
### 增加broker节点
增加节点后,要重新新建Topic才能用到该broker。另外,kafka要依赖zookeeper,因为会把
offset写入到zk。这样,如果zk的集群很大(zk节点多)的时候,会影响zk的写入速度。建议是用
SSD配置给ZK(只给zk就行),然后这组zk专门服务于kafka,不要做其他用。
# TL;DR
## 关于 auto.offset.reset 和 offset

241

It is a bit more complex than you described. The auto.offset.reset config kicks in ONLY if your consumer group does not have a valid offset committed somewhere (2 supported offset storages now are Kafka and Zookeeper). And it also depends on what sort of consumer you use.

If you use a high-level java consumer then imagine following scenarios:

You have a consumer in a consumer group group1 that has consumed 5 messages and died. Next time you start this consumer it won’t even use that auto.offset.reset config and will continue from the place it died because it will just fetch the stored offset from the offset storage (Kafka or ZK as I mentioned).

You have messages in a topic (like you described) and you start a consumer in a new consumer group group2. There is no offset stored anywhere and this time the auto.offset.reset config will decide whether to start from the beginning of the topic (smallest) or from the end of the topic (largest)

One more thing that affects what offset value will correspond to smallest and largest configs is log retention policy. Imagine you have a topic with retention configured to 1 hour. You produce 5 messages, and then an hour later you post 5 more messages. The largest offset will still remain the same as in previous example but the smallest one won’t be able to be 0 because Kafka will already remove these messages and thus the smallest available offset will be 5.

Everything mentioned above is not related to SimpleConsumer and every time you run it, it will decide where to start from using the auto.offset.reset config.
```

Reference

https://stackoverflow.com/questions/29791268/how-to-change-start-offset-for-topic?utm_medium=organic&utm_source=google_rich_qa&utm_campaign=google_rich_qa