package com.bnls.test.common import kafka.common.TopicAndPartition import org.apache.curator.framework.CuratorFrameworkFactory import org.apache.curator.retry.ExponentialBackoffRetry import org.apache.kafka.common.TopicPartition import org.apache.spark.streaming.kafka010.OffsetRange import org.slf4j.LoggerFactory import scala.collection.JavaConversions._ import scala.collection.mutable object ZookeeperHelper { val LOG = LoggerFactory.getLogger(ZookeeperHelper.getClass) //链接zookeeper的host及端口 val zk_connectstring = "10.60.81.168:2181,10.60.81.167:2181,10.60.81.166:2181" //zookeeper表空间 val zk_namespace = "mykafka" // offset 路径起始位置 val Globe_kafkaOffsetPath = "/kafka/offsets" // ZK client val zkClient = { val client = CuratorFrameworkFactory.builder.connectString(zk_connectstring) .retryPolicy(new ExponentialBackoffRetry(1000, 3)).namespace(zk_namespace).build() client.start() client } // 路径确认函数 确认ZK中路径存在,不存在则创建该路径 def ensureZKPathExists(path: String) = { if (zkClient.checkExists().forPath(path) == null) { zkClient.create().creatingParentsIfNeeded().forPath(path) } } // 保存 新的 offset def storeOffsets(offsetRange: Array[OffsetRange], groupName: String) = { for (o <- offsetRange) { val zkPath = s"${Globe_kafkaOffsetPath}/${groupName}/${o.topic}/${o.partition}" // 向对应分区第一次写入或者更新Offset 信息 println("---Offset写入ZK------\nTopic:" + o.topic + ", Partition:" + o.partition + ", Offset:" + o.untilOffset) ensureZKPathExists(zkPath) zkClient.setData().forPath(zkPath, o.untilOffset.toString.getBytes()) } println(s"保存新 offset 成功!") } def getZKOffset(kafkaParam: Map[String, String], topic: Set[String], groupName: String): Map[TopicAndPartition, Long] = { // Kafka 0.8和0.10的版本差别,0.10 为 TopicPartition 0.8 TopicAndPartition var offsets: Map[TopicAndPartition, Long] = Map() val topic1 = topic.head // 读取ZK中保存的Offset,作为Dstrem的起始位置。如果没有则创建该路径,并从 0 开始Dstream val zkTopicPath = s"${Globe_kafkaOffsetPath}/${groupName}/${topic1}" // 检查路径是否存在 ensureZKPathExists(zkTopicPath) // 获取topic的子节点,即 分区 val childrens = zkClient.getChildren().forPath(zkTopicPath) // 遍历分区 for { p <- childrens } yield { // 遍历读取子节点中的数据:即 offset val offsetData = zkClient.getData().forPath(s"$zkTopicPath/$p") // 将offset转为Long val offSet = java.lang.Long.valueOf(new String(offsetData)).toLong offsets += TopicAndPartition(topic1, Integer.parseInt(p)) -> offSet } offsets } def getResetOffsets(kafkaParam: Map[String, String], topics: Set[String]): Map[TopicAndPartition, Long] = { //复制KafkaCluster val cluster = new KafkaClusterHelper(kafkaParam) var offsets: Map[TopicAndPartition, Long] = Map() System.out.println("dddddd22222222222dddd") // 最新或者最小offset reset为smallest或largest val reset = kafkaParam.get("auto.offset.reset").map(x => x.toString.toLowerCase()) System.out.println(kafkaParam) System.out.println(cluster.toString) System.out.println(reset) val topicAndPartitions: Set[TopicAndPartition] = cluster.getPartitions(topics).right.get System.out.println(topicAndPartitions) if (reset == Some("smallest")) { System.out.println("start**********") val leaderOffsets = cluster.getEarliestLeaderOffsets(topicAndPartitions).right.get System.out.println(leaderOffsets) topicAndPartitions.foreach(tp => { offsets += tp -> leaderOffsets(tp).offset }) } else if (reset == Some("largest")) { val leaderOffsets = cluster.getLatestLeaderOffsets(topicAndPartitions).right.get topicAndPartitions.foreach(tp => { offsets += tp -> leaderOffsets(tp).offset }) } offsets } def getConSumerOffsets(kafkaParam: Map[String, String], topicSet1:Set[String], groupName:String) : (Map[TopicPartition, Long],Int) = { val brokers = kafkaParam("bootstrap.servers") System.out.println(brokers) // println(brokers) var topicSet = topicSet1.toArray System.out.println("rrrrrrrrrrrrrrrr") System.out.println(topicSet.toString) val kafkaSmallestParams = Map[String, String]("metadata.broker.list" -> brokers, "auto.offset.reset" -> "smallest") val kafkaLargestParams = Map[String, String]("metadata.broker.list" -> brokers, "auto.offset.reset" -> "largest") var offSets: mutable.Buffer[(TopicPartition, Long)] = mutable.Buffer() val smallOffsets = getResetOffsets(kafkaSmallestParams, topicSet1) val largestOffsets = getResetOffsets(kafkaLargestParams, topicSet1) val consumerOffsets = getZKOffset( kafkaParam,topicSet1, groupName) // cOffset-从外部存储中读取的offset smallOffsets.foreach({ case(tp, sOffset) => { val cOffset = if (!consumerOffsets.containsKey(tp)) 0 else consumerOffsets(tp) val lOffset = largestOffsets(tp) if(sOffset > cOffset) { offSets.append((new TopicPartition(tp.topic.toString,tp.partition.toInt), sOffset)) } else if(cOffset > lOffset){ offSets.append((new TopicPartition(tp.topic.toString,tp.partition.toInt), lOffset)) } else{ offSets.append((new TopicPartition(tp.topic.toString,tp.partition.toInt), cOffset)) } } }) if(offSets.isEmpty){ (offSets.toMap,0) } else { (offSets.toMap, 1) } } }