博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
zookeeper
阅读量:4638 次
发布时间:2019-06-09

本文共 5473 字,大约阅读时间需要 18 分钟。

package com.bnls.test.common import kafka.common.TopicAndPartition import org.apache.curator.framework.CuratorFrameworkFactory import org.apache.curator.retry.ExponentialBackoffRetry import org.apache.kafka.common.TopicPartition import org.apache.spark.streaming.kafka010.OffsetRange import org.slf4j.LoggerFactory import scala.collection.JavaConversions._ import scala.collection.mutable object ZookeeperHelper {
val LOG = LoggerFactory.getLogger(ZookeeperHelper.getClass) //链接zookeeper的host及端口 val zk_connectstring = "10.60.81.168:2181,10.60.81.167:2181,10.60.81.166:2181" //zookeeper表空间 val zk_namespace = "mykafka" // offset 路径起始位置 val Globe_kafkaOffsetPath = "/kafka/offsets" // ZK client val zkClient = {
val client = CuratorFrameworkFactory.builder.connectString(zk_connectstring) .retryPolicy(new ExponentialBackoffRetry(1000, 3)).namespace(zk_namespace).build() client.start() client } // 路径确认函数 确认ZK中路径存在,不存在则创建该路径 def ensureZKPathExists(path: String) = {
if (zkClient.checkExists().forPath(path) == null) {
zkClient.create().creatingParentsIfNeeded().forPath(path) } } // 保存 新的 offset def storeOffsets(offsetRange: Array[OffsetRange], groupName: String) = {
for (o <- offsetRange) {
val zkPath = s"${Globe_kafkaOffsetPath}/${groupName}/${o.topic}/${o.partition}" // 向对应分区第一次写入或者更新Offset 信息 println("---Offset写入ZK------\nTopic:" + o.topic + ", Partition:" + o.partition + ", Offset:" + o.untilOffset) ensureZKPathExists(zkPath) zkClient.setData().forPath(zkPath, o.untilOffset.toString.getBytes()) } println(s"保存新 offset 成功!") } def getZKOffset(kafkaParam: Map[String, String], topic: Set[String], groupName: String): Map[TopicAndPartition, Long] = {
// Kafka 0.8和0.10的版本差别,0.10 为 TopicPartition 0.8 TopicAndPartition var offsets: Map[TopicAndPartition, Long] = Map() val topic1 = topic.head // 读取ZK中保存的Offset,作为Dstrem的起始位置。如果没有则创建该路径,并从 0 开始Dstream val zkTopicPath = s"${Globe_kafkaOffsetPath}/${groupName}/${topic1}" // 检查路径是否存在 ensureZKPathExists(zkTopicPath) // 获取topic的子节点,即 分区 val childrens = zkClient.getChildren().forPath(zkTopicPath) // 遍历分区 for { p <- childrens } yield {
// 遍历读取子节点中的数据:即 offset val offsetData = zkClient.getData().forPath(s"$zkTopicPath/$p") // 将offset转为Long val offSet = java.lang.Long.valueOf(new String(offsetData)).toLong offsets += TopicAndPartition(topic1, Integer.parseInt(p)) -> offSet } offsets } def getResetOffsets(kafkaParam: Map[String, String], topics: Set[String]): Map[TopicAndPartition, Long] = {
//复制KafkaCluster val cluster = new KafkaClusterHelper(kafkaParam) var offsets: Map[TopicAndPartition, Long] = Map() System.out.println("dddddd22222222222dddd") // 最新或者最小offset reset为smallest或largest val reset = kafkaParam.get("auto.offset.reset").map(x => x.toString.toLowerCase()) System.out.println(kafkaParam) System.out.println(cluster.toString) System.out.println(reset) val topicAndPartitions: Set[TopicAndPartition] = cluster.getPartitions(topics).right.get System.out.println(topicAndPartitions) if (reset == Some("smallest")) {
System.out.println("start**********") val leaderOffsets = cluster.getEarliestLeaderOffsets(topicAndPartitions).right.get System.out.println(leaderOffsets) topicAndPartitions.foreach(tp => {
offsets += tp -> leaderOffsets(tp).offset }) } else if (reset == Some("largest")) {
val leaderOffsets = cluster.getLatestLeaderOffsets(topicAndPartitions).right.get topicAndPartitions.foreach(tp => {
offsets += tp -> leaderOffsets(tp).offset }) } offsets } def getConSumerOffsets(kafkaParam: Map[String, String], topicSet1:Set[String], groupName:String) : (Map[TopicPartition, Long],Int) = {
val brokers = kafkaParam("bootstrap.servers") System.out.println(brokers) // println(brokers) var topicSet = topicSet1.toArray System.out.println("rrrrrrrrrrrrrrrr") System.out.println(topicSet.toString) val kafkaSmallestParams = Map[String, String]("metadata.broker.list" -> brokers, "auto.offset.reset" -> "smallest") val kafkaLargestParams = Map[String, String]("metadata.broker.list" -> brokers, "auto.offset.reset" -> "largest") var offSets: mutable.Buffer[(TopicPartition, Long)] = mutable.Buffer() val smallOffsets = getResetOffsets(kafkaSmallestParams, topicSet1) val largestOffsets = getResetOffsets(kafkaLargestParams, topicSet1) val consumerOffsets = getZKOffset( kafkaParam,topicSet1, groupName) // cOffset-从外部存储中读取的offset smallOffsets.foreach({
case(tp, sOffset) => {
val cOffset = if (!consumerOffsets.containsKey(tp)) 0 else consumerOffsets(tp) val lOffset = largestOffsets(tp) if(sOffset > cOffset) {
offSets.append((new TopicPartition(tp.topic.toString,tp.partition.toInt), sOffset)) } else if(cOffset > lOffset){
offSets.append((new TopicPartition(tp.topic.toString,tp.partition.toInt), lOffset)) } else{
offSets.append((new TopicPartition(tp.topic.toString,tp.partition.toInt), cOffset)) } } }) if(offSets.isEmpty){
(offSets.toMap,0) } else {
(offSets.toMap, 1) } } }

转载于:https://www.cnblogs.com/heguoxiu/p/10064292.html

你可能感兴趣的文章
cocos2d-x 音效中断问题
查看>>
子分类账知识学习(汇总网上比较有用的资料)
查看>>
pyQt 每日一练习 -- 登录框
查看>>
wp 删除独立存储空间文件(多级非空文件夹删除)
查看>>
Loadrunner安装使用入门
查看>>
smartupload 上传文件时 把页面编码改成gbk 解决乱码
查看>>
EPS是什么格式
查看>>
Python的数据库操作(Sqlalchemy)
查看>>
2.抽取代码(BaseActivity)
查看>>
My simplified pickit2 clone
查看>>
Redis 入门知识
查看>>
夏天过去了, 姥爷推荐几套来自smashingmagzine的超棒秋天主题壁纸
查看>>
转--Android如何在java代码中设置margin
查看>>
反射的所有api
查看>>
Js 判断网页窗口是否滚动到底部
查看>>
上传文件
查看>>
css 定位及遮罩层小技巧
查看>>
用java向mysql数据库中插入数据为空
查看>>
项目中非常有用并且常见的ES6语法
查看>>
dateTimePicker编辑状态下,取值不正确的问题
查看>>