代码版本、编译与常用命令

编译

  1. 下载sbt

  2. 下载scalaIDE

  3. 下载gradle 选择4版本,可以跳过一些问题 设置对应环境变量

  4. gradle -b build.gradle jar 编译kafka代码

  5. gradle -b build.gradle eclispe 生成eclipse的工程 然后导入到eclispe中, 导入后个别配置问题导致的报错fix一下就能没有错误了

  6. 选用的代码版本 是 1.1.1-RC3这个tag

  7. 修改日志级别

    # 修改kafka_2.11-1.1.1-00/config/log4j.properties 这两个地方调整成trace级别 
    # leader端处理请求的日志打开
    log4j.rootLogger=TRACE, stdout, kafkaAppender
    log4j.logger.kafka.network.RequestChannel$=TRACE, requestAppender
    
    # 副本请求线程的日志打开
    log4j.logger.kafka.server.ReplicaFetcherThread=TRACE, requestAppender
    log4j.additivity.kafka.server.ReplicaFetcherThread=false

常用命令

创建topic

bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2182 --partitions 1 --replication-factor 2 --topic simon.test02.p1r2

删除topic

bin/kafka-topics.sh --delete  --zookeeper 127.0.0.1:2182 --topic simon.test03.p10r2

查询offset range

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 127.0.0.1:9092 -topic simon.test02.p1r2 --time -2      
# simon.test01.p1r2:0:0

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 127.0.0.1:9092 -topic simon.test02.p1r2 --time -1      
# simon.test01.p1r2:0:0

-2 是最早的 -1是最近的

查询topic情况

bin/kafka-topics.sh --describe --topic simon.test02.p1r2 --zookeeper 127.0.0.1:2182

修改topic配置,比如修改min.insync.replicas去覆盖server.properties中的值

bin/kafka-configs.sh \
   --zookeeper localhost:2181 \
   --alter \
   --entity-type topics \
   --entity-name yourTopicName \
   --add-config min.insync.replicas=2

console消费

bin/kafka-console-consumer.sh --topic simon.test02.p1r2 --bootstrap-server 127.0.0.1:9092,127.0.0.1:9094  --from-beginning 10
bin/kafka-console-consumer.sh --topic simon.test20.p3r2 --group ss-test-group --bootstrap-server 127.0.0.1:9092,127.0.0.1:9094  --from-beginning 10

console生产

bin/kafka-console-producer.sh --topic simon.test02.p1r2 --broker-list 127.0.0.1:9092
bin/kafka-console-producer.sh --topic simon.test02.p1r2 --broker-list 127.0.0.1:9092,127.0.0.1:9094 --request-timeout-ms 6000000  --request-required-acks -1