offset range查询的实现
我们在实际使用过程中经常需要查询某个topic的某分区的offset的range
命令行:
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list xxxx:9092 -topic xxxtopic --time -2
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list xxxx:9092 -topic xxxtopic --time -1
-1 -2 的特殊含义:
public class ListOffsetRequest extends AbstractRequest {
public static final long EARLIEST_TIMESTAMP = -2L;
public static final long LATEST_TIMESTAMP = -1L;
}
客户端
KafkaConsumer.endOffsets(Collection
KafkaConsumer.beginningOffsets(Collection
Fetcher.beginningOrEndOffset(Collection
Fetcher.retrieveOffsetsByTimes(Map<TopicPartition, Long>, long, boolean)
Fetcher.sendListOffsetRequests(boolean, Map<TopicPartition, Long>)
// Group the partitions by node.
final Map<Node, Map<TopicPartition, Long>> timestampsToSearchByNode = new HashMap<>();
for (Map.Entry<TopicPartition, Long> entry: timestampsToSearch.entrySet()) {
TopicPartition tp = entry.getKey();
PartitionInfo info = metadata.fetch().partition(tp);
if (info == null) {
metadata.add(tp.topic());
log.debug("Partition {} is unknown for fetching offset, wait for metadata refresh", tp);
return RequestFuture.staleMetadata();
} else if (info.leader() == null) {
log.debug("Leader for partition {} unavailable for fetching offset, wait for metadata refresh", tp);
return RequestFuture.leaderNotAvailable();
} else {
Node node = info.leader();
Map<TopicPartition, Long> topicData = timestampsToSearchByNode.get(node);
if (topicData == null) {
topicData = new HashMap<>();
timestampsToSearchByNode.put(node, topicData);
}
topicData.put(entry.getKey(), entry.getValue());
}
}
final RequestFuture<Map<TopicPartition, OffsetData>> listOffsetRequestsFuture = new RequestFuture<>();
final Map<TopicPartition, OffsetData> fetchedTimestampOffsets = new HashMap<>();
final AtomicInteger remainingResponses = new AtomicInteger(timestampsToSearchByNode.size());
for (Map.Entry<Node, Map<TopicPartition, Long>> entry : timestampsToSearchByNode.entrySet()) {
sendListOffsetRequest(entry.getKey(), entry.getValue(), requireTimestamps)
.addListener(new RequestFutureListener<Map<TopicPartition, OffsetData>>() {
@Override
public void onSuccess(Map<TopicPartition, OffsetData> value) {
synchronized (listOffsetRequestsFuture) {
fetchedTimestampOffsets.putAll(value);
if (remainingResponses.decrementAndGet() == 0 && !listOffsetRequestsFuture.isDone())
listOffsetRequestsFuture.complete(fetchedTimestampOffsets);
}
}
@Override
public void onFailure(RuntimeException e) {
synchronized (listOffsetRequestsFuture) {
// This may cause all the requests to be retried, but should be rare.
if (!listOffsetRequestsFuture.isDone())
listOffsetRequestsFuture.raise(e);
}
}
});
}
return listOffsetRequestsFuture;
简单点说:就是找到leader节点然后给其发送ListOffsetRequest
请求。这个请求是按时间进行offset定位。
broker端
KafkaApis.handleListOffsetRequestV1AndAbove(request: RequestChannel.Request)
查询最新offset
这个值应该是在生产的时候维护好的
val lastFetchableOffset = offsetRequest.isolationLevel match {
case IsolationLevel.READ_COMMITTED => localReplica.lastStableOffset.messageOffset
case IsolationLevel.READ_UNCOMMITTED => localReplica.highWatermark.messageOffset
}
这个地方也能反映出 LEO,LSO,highwater的区别!!
查询最早offset
kafka.log.Log.fetchOffsetsByTimestamp(targetTimestamp: Long)
这个值应该是在生产的时候维护好的
@threadsafe
class Log(@volatile var dir: File,
@volatile var config: LogConfig,
@volatile var logStartOffset: Long,
@volatile var recoveryPoint: Long,
scheduler: Scheduler,
brokerTopicStats: BrokerTopicStats,
time: Time,
val maxProducerIdExpirationMs: Int,
val producerIdExpirationCheckIntervalMs: Int,
val topicPartition: TopicPartition,
val producerStateManager: ProducerStateManager,
logDirFailureChannel: LogDirFailureChannel) extends Logging with KafkaMetricsGroup {
// ......
if (targetTimestamp == ListOffsetRequest.EARLIEST_TIMESTAMP)
return Some(TimestampOffset(RecordBatch.NO_TIMESTAMP, logStartOffset))
按时间戳查询offset
先确定target segment
val targetSeg = {
// Get all the segments whose largest timestamp is smaller than target timestamp
val earlierSegs = segmentsCopy.takeWhile(_.largestTimestamp < targetTimestamp)
// We need to search the first segment whose largest timestamp is greater than the target timestamp if there is one.
if (earlierSegs.length < segmentsCopy.length)
Some(segmentsCopy(earlierSegs.length))
else
None
}
再到seg的index根据时间查找
LogSegment.findOffsetByTimestamp(timestamp: Long, startingOffset: Long)
先定位到index然后再二分查找
// LogSegment.scala
val timestampOffset = timeIndex.lookup(timestamp)
val position = index.lookup(math.max(timestampOffset.offset, startingOffset)).position
// AbstractIndex.scala
/**
* Lookup lower and upper bounds for the given target.
*/
private def indexSlotRangeFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchEntity): (Int, Int) = {
// check if the index is empty
if(_entries == 0)
return (-1, -1)
// check if the target offset is smaller than the least offset
if(compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0)
return (-1, 0)
// binary search for the entry
var lo = 0
var hi = _entries - 1
while(lo < hi) {
val mid = ceil(hi/2.0 + lo/2.0).toInt
val found = parseEntry(idx, mid)
val compareResult = compareIndexEntry(found, target, searchEntity)
if(compareResult > 0)
hi = mid - 1
else if(compareResult < 0)
lo = mid
else
return (mid, mid)
}
(lo, if (lo == _entries - 1) -1 else lo + 1)
}
本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!