/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.xima.kafka
import java.util.Properties
import com.ximalaya.spark.common.log.CommonLoggerTrait
import com.ximalaya.spark.kafka.PartitionOffsets
import kafka.common.TopicAndPartition
import org.apache.spark.SparkException
import org.apache.spark.streaming.kafka.{KafkaCluster, OffsetRange}
import scala.collection.JavaConverters.propertiesAsScalaMapConverter
/**
*
* Created by todd.chen on 6/29/16.
* email : todd.chen@ximalaya.com
*
* @param kafkaParams kafkaParams
*/
class XQLKafkaExtension(kafkaParams: Map[String, String]) extends CommonLoggerTrait {
/**
*
* @param kafkaConsumerConfig kafkaConsumer config
*/
def this(kafkaConsumerConfig: Properties) = {
this(kafkaConsumerConfig.asScala.toMap)
}
lazy val kc = new KafkaCluster(kafkaParams)
/**
* wrapper version using in sparkStreaming
*
* @param offsets offsets
* @param topic topic name
* @return Array Offset
*/
def getArrayOffsetRangeWrapper(offsets: Seq[(Int, Long, Long)], topic: String): Array[OffsetRange] = {
offsets.map {
case (partitionId: Int, startOffset: Long, stopOffset: Long) ⇒
OffsetRange(topic, partitionId, startOffset, stopOffset)
}.toArray
}
/**
* get cluster offset range
*
* @param topic topic name
* @return cluster offset range
* format : (partitionId,smallestOffset,largestOffset)
*/
def getClusterOffsetRanges(topic: String) = {
val topics = Set(topic)
val isSmallest = false
val largest = getTopicPartitionOffsets(topics, isSmallest)
val smallest = getTopicPartitionOffsets(topics)
val topicPartitionMap2topicOffset = (topicPartition: Map[TopicAndPartition, Long]) ⇒
topicPartition.map { case (k, v) ⇒ (k.topic, k.partition) → v }
val largestMap = topicPartitionMap2topicOffset(largest)
val smallestMap = topicPartitionMap2topicOffset(smallest)
for {
small ← smallestMap
large ← largestMap if small._1 == large._1
} yield (small._1._2, small._2, large._2)
}
/**
* get topicPartitionOffset
*
* @param topics topics with a set
* @param isSmallest get smallest topic
* @return smallest or largest offset ,default value is smallest
*/
private def getTopicPartitionOffsets(topics: Set[String], isSmallest: Boolean = true): Map[TopicAndPartition, Long] = {
(for {
topicPartitions <- kc.getPartitions(topics).right
leaderOffsets <- (if (isSmallest) {
kc.getEarliestLeaderOffsets(topicPartitions)
} else {
kc.getLatestLeaderOffsets(topicPartitions) //largest
}).right
} yield {
leaderOffsets.map { case (tp, lo) =>
(tp, lo.offset)
}
}).fold(
errs => throw new SparkException(errs.mkString("\n")),
ok => ok
)
}
}
object XQLKafkaExtension {
def apply(kafkaParams: Map[String, String]): XQLKafkaExtension = {
new XQLKafkaExtension(kafkaParams)
}
def apply(kafkaConsumerConfig: Properties): XQLKafkaExtension = {
new XQLKafkaExtension(kafkaConsumerConfig)
}
}