2020-07-22 最近工作总结

从7月初开始安排任务去写spark程序,到现在为止将近一个月的时间,收获颇多,今天程序终于写完上线,抽时间总结一下开发过程中遇到的问题及经验。
刚开始安排开发spark程序,要求是将数据实时的存储到phoenix内,我内心是感觉很轻松的,毕竟原来也开发过类似的程序,但是当我开始开发后,wtf!!!根据需求来看居然无法使用phoenix的原生api实现,也就是说使用savetophoenix方法是行不通, 于是再次确认产品相关细节,最后得到结果,无法使用原生api,只能通过jdbc的方式去实现,好吧,多少有点心虚,毕竟没有这么干过。但是没办法,在其位谋其职,只能硬着头皮写,接下来就是各种查资料,搜索合适的连接池工具,经过3天的查找,最后在联系同神大神了解情况,公司内部有一个自封装的Phoenix数据接口,接下来就是将接口内相关的工具类与druid连接池整合起来,由于公司平台是cdh外加kerberos安全认证模式,经过一天的测试总算是打通了jdbc,接下来就继续写逻辑,此处逻辑实现就不详述了,毕竟没什么参考价值,写完程序就开始陆陆续续的进行上线测试。
好吧 我承认接下来就是各种填坑的过程,首先是偏移量存储的坑,由于本公司是将偏移量存储在zk内,而我在测试环境下是没有存储的,导致上生产的时候首先就遇到偏移量存储问题,因为单个批次的数据量比较大,存储到phoenix内需要4到5分钟左右,在存储完数据,保存offset的时候报

ERROR.spark.kafkaOffsetManager$:update offsets fails reson is Shutdown in progress
java.lang.IllegalStateException:Shutdown in proress

java.lang.Error: java.lang.InterruptedException

错误,经过查询,是因为数据量太大,处理时间过长导致偏移量保存失败,此处的做法是开启背压机制,自动调节每批次数据量,然后顺利解决Offset保存的问题。
接下来遇到第二个坑,存偏移量时能连上zk,但是在存储时报

newStub in org.apache.phoenix.coprocessor.generated.MetaDataProtos$MetaDataService!

错误,通过排除日志发现实际是

org.apache.hadoop.hbase.security.AccessDeniedException

错误,wtf,怎么能是权限问题?kerberos已经做了认证了,不应该在出现权限异常的。于是去phoenix官网查询貌似是缺少一个phoenix-5.0.0-cdh6.2.0-client.jar的包,好吧,接下来在启动程序里面添加spark.executor.extraClassPath;--jars;--driver-class-path 均指定phoenix-5.0.0-cdh6.2.0-client.jar,然后顺利改变了报错信息-_-!
接下来开始报类型转换异常,

java.lang.ClassCastException:org.apache.phoenix.shaded.org.apache.zookeeper.data.Stat cannot be cast to org.apache.zookeeper.data.Stat

好吧 我的依赖里面没有这个类,为什么会出现这个类?然后突然灵光一闪,会不会是添加的jar包内有?于是添加依赖查看一下,果然是在这个client包里面?这是为什么?这个问题足足卡了我一天多,然后突然想起来,我为什么需要这个包才能连接上zk,不添加就不可以?按理说目前项目内pom依赖是包含了所有依赖了,会不会是spark额外指定的jar包内有这个类的依赖?然后去查询spark的相关配置,果然内部有这个类:hbase-shaded-mapreduce-2.1.0-cdh6.2.0.jar,于是删除改类依赖重新编译部署,接下来终于成功将偏移量存入了zk。
接下来3个程序成功上线,不在报乱七八糟的异常了。。。可是奇怪的问题又来了。。有一个程序没问题,能顺利运行,其他两个程序出现了offset越界异常?什么?偏移量越界?偏移量越界大部分情况就是因为kafka中未被消费的数据被broker清除了,导致zk中的offset落在了earliest的左侧,使得本来合法的偏移量变得不合法,可是为什么会出现这种情况呢?于是去查看spark程序的监控,发现spark程序拉取了很多批次,但是因为处理能力与拉取速度不匹配,导致产生了数据积压,并且这个结果会越来越严重,最后引起了偏移量越界的异常,好吧如何提升spark处理能力?首先想到的就是怎加executor的个数,好吧又一个程序能正常运行了 并且没有数据积压的现象了,可是另一个还是不行,接下来就考虑优化jdbc部分,首先将是老思路,批处理!但是发现调用Statement对象的addBatch方法最后批量执行,发现不行,然后咨询公司的一个大神,给的建议是使用conn.prepateStatement方法进行预编译,然后在每次setobject的时候调用executeUpdate方法,然后执行100次的时候手动commit一次,但是此处有一处想不明白,我执行addBatch,然后在commit却不行,但是executeUpdate后在commit却能行得通。。。希望路过看到的大神能帮忙解答一下其中的原理。感谢!
以上就是最近的工作总结,有大佬看到望清喷。

全部评论

相关推荐

点赞 收藏 评论
分享
牛客网
牛客企业服务