import org.apache.spark.streaming.kafka010.KafkaUtils
KafkaUtils.createDirectStream(...).foreachRDD { rdd =>
import org.apache.spark.streaming.kafka010.OffsetRange
val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
}
HasOffsetRanges and OffsetRange
HasOffsetRanges
HasOffsetRanges represents an object that has a collection of OffsetRanges (i.e. a range of offsets from a single Kafka topic partition).
HasOffsetRanges is part of org.apache.spark.streaming.kafka010 package.
|
Note
|
KafkaRDD is a HasOffsetRanges object.
|
You can access HasOffsetRanges given a KafkaRDD as follows:
OffsetRange
OffsetRange represents a range of offsets from a single Kafka TopicPartition (i.e. a topic name and partition number).
OffsetRange holds a topic, partition number, fromOffset (inclusive) and untilOffset (exclusive) offsets.
You can create instances of OffsetRange using the factory methods from OffsetRange companion object. You can then count the number of records in a topic partition using count method.
// Start spark-shell with spark-streaming-kafka-0-10_2.11 dependency
// --packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.1.0-SNAPSHOT
import org.apache.spark.streaming.kafka010.OffsetRange
scala> val offsets = OffsetRange(topic = "spark-logs", partition = 0, fromOffset = 2, untilOffset = 5)
offsets: org.apache.spark.streaming.kafka010.OffsetRange = OffsetRange(topic: 'spark-logs', partition: 0, range: [2 -> 5])
scala> offsets.count
res0: Long = 3
scala> offsets.topicPartition
res1: org.apache.kafka.common.TopicPartition = spark-logs-0
OffsetRange is part of org.apache.spark.streaming.kafka010 package.
Creating OffsetRange Instance
You can create instances of OffsetRange using the following factory methods (from OffsetRange companion object):
OffsetRange.create(
topic: String,
partition: Int,
fromOffset: Long,
untilOffset: Long): OffsetRange
OffsetRange.create(
topicPartition: TopicPartition,
fromOffset: Long,
untilOffset: Long): OffsetRange
OffsetRange.apply(
topic: String,
partition: Int,
fromOffset: Long,
untilOffset: Long): OffsetRange
OffsetRange.apply(
topicPartition: TopicPartition,
fromOffset: Long,
untilOffset: Long): OffsetRange