>>分享SPSS,Hadoop等大数据处理技术,以及分布式架构以及集群系统的构建 书籍支持  卫琴直播  品书摘要  在线测试  资源下载  联系我们
发表一个新主题 开启一个新投票 回复文章 您是本文章第 20640 个阅读者 刷新本主题
 * 贴子主题:  kafka+spark-streaming实时推荐系统性能优化笔记 回复文章 点赞(0)  收藏  
作者:Jacky    发表时间:2020-03-21 18:11:52     消息  查看  搜索  好友  邮件  复制  引用

                    

kafka+spark-streaming实时推荐系统性能优化笔记

           1) -- conf spark.dynamicAllocation.enabled=false

   如果正在使用的是CDH的Spark,修改这个配置为false;开源的Spark版本则默认是false。

   当为true时,即使指定了num-executors个数,spark-streaming应用也会占用整个集群的资源。
  
   2) --conf spark.streaming.concurrentJobs=10

这个配置项的默认值为1,代表着新的batch过来之后只能在队列中等待之前的batch执行完之后再执行。

如果batch执行的时间超过了batch本身的时间,可以将该配置增大。

修改该配置的风险:kafka的单个partition只支持顺序消费,如果排在后面的batch先执行完成,kafka consumer 在commit offset时会出现混乱。

建议,使用之前充分评估风险,否则尽量不修改该配置。

   3) cache persist 和broadcast的选择

使用spark-streaming的应用一般是实时或者准实时的应用,所以需要预加载的变量(如模型,矩阵等),一般不会选择cache和persist,而是使用广播变量broadcast(只读,类似于全局变量,但是如果在
spark中直接使用全局变量会大幅降低程序性能)。

另一方面,将rdd/df的cache改为map(key,value)形式后进行广播,可以在需要对该rdd/df进行join的地方采用rdd.map{m=>get(m.key)}的形式来代替。减少了join带来的开销。
  
   4)预加载broadcast变量

广播变量是懒加载的,首次在dataDStream.foreachRDD中使用该广播变量会导致第一批数据处理比较慢,广播变量越大延迟也越大。

  懒加载在Spark离线任务中是比较好的策略,但是对线上实时推荐来说,延迟10s以上的行为数据可能都已经没有处理价值了。

  所以可以在还没有进入到foreachRDD中时,先让广播变量能够预加载到每台服务器,设置kafka读取的offset为latest,这样能够保证spark-streaming总是能够处理到最新的数据。

  预加载的方法利用了懒加载的性质,随便新建一个df,按照executor的个数repartition之后,在每个partition中读取广播变量的value中的任意一个值(不存在的也可以),这样就能保证每个executor都能加载到该广播变量。

someDF.repartition(sparkSession.sparkContext.getConf.getInt("spark.executor.instances", 10)).foreachPartition {
    p =>
    bcVariables.value.get("_")
}

             5)去掉所有不必要的join

  join确实有很多可以优化的配置,但是没必要把时间花在join的优化上,尤其是在可以用广播变量来作为代替方案的情况下。

  需要注意的是,广播变量和broadcast join是不一样的,前者效率在大部分时候要更高。

             6)kafka partition个数和executor个数的关系

  executor个数要能被partition个数整除。例如,如果partition个数为24个,那么12个executor和18个executor处理数据的性能差距不大。如果集群可以分配的executor个数为18个,那么partition数可
  以从24个调整为18个(或者36个等等)。

  原因比较明显,就不多提了。展示几个实验数据

  下图为性能测试实验中3,6,12个executor下数据处理时间(纵坐标)和数据量(横坐标)的关系,是明显的线性关系。
点击在新窗口中浏览原图
CTRL+鼠标滚轮放大或缩小

     下图为性能测试实验中在处理600条数据时,executor数(横坐标)和时间(纵坐标)的关系(分区个数为executor的整数倍)
点击在新窗口中浏览原图
CTRL+鼠标滚轮放大或缩小

      由图一的三组数据也可以看出,每秒能处理的数据的条数和executor的个数约等于线性关系。即如果当前集群每3秒能处理x条数据,那么集群扩容一倍后,每3秒应该能处理2x条数据。

     由图二可看出,executor数和数据的处理时间不是简单的线性关系,也就是说,如果当前集群处理100条数据耗时6秒,并不能保证将集群扩容一倍后100条数据的处理时间变为3秒。

             7)kafka的hash分区

  kafka的各个分区处理的数据应该保证尽量按照某一特征(比如用户id)hash分区,这样能够保证某一用户的所有记录都在某一个partition,这样spark-streaming在处理reduceByKey时会提升效率。

                                                  
                                      
                                                
----------------------------
原文链接:https://blog.csdn.net/arli_xu/article/details/83034581

程序猿的技术大观园:www.javathinker.net



[这个贴子最后由 flybird 在 2020-03-22 10:25:41 重新编辑]
  Java面向对象编程-->Java语言的基本语法和规范
  JavaWeb开发-->使用Session(Ⅱ)
  JSP与Hibernate开发-->使用JPA和注解
  Java网络编程-->基于MVC和RMI的分布式应用
  精通Spring-->组合(Composition)API
  Vue3开发-->Vue Router路由管理器
  一套可复用的方法论!从0-1搭建数据团队,看这篇就够了
  大数据存储单位介绍(TB、PB、EB、ZB、YB有多大)
  海量数据解决思路之Hash算法
  Hadoop安装过程
  使用Helm简化K8S应用管理
  深入玩转K8S之使用kubeadm安装Kubernetes v1.10以及常见问题...
  Apacheの日志分割
  Spark RDD持久化、广播变量和累加器
  SDN网络IPv6组播机制支持实时视频业务海量用户扩展
  大数据平台CDH搭建
  Flume+Kafka+Storm+Redis构建大数据实时处理系统:实时统计网...
  Hadoop的简单单词统计案例
  00-Spark介绍
  大数据系统发展的技术路线
  深入剖析Hadoop HBase
  更多...
 IPIP: 已设置保密
楼主      
该用户目前不在线 patebeng11 
  
威望: 0
级别: 新手上路
魅力: 130
经验: 130
现金: 1044
发文章数: 7
注册时间: 0001-01-01
 消息  查看  搜索  好友  邮件  复制  引用


90后消防员张俊杰:退伍后回乡卖蜂蜜,年收入千万
干将莫邪为何用活人祭剑,薇娅偷逃税被罚13.41亿,李佳琦被浙江消保委点名!
网络主播薇娅因偷逃税款,被追缴税款、加收滞纳金并处罚款共计13.41亿元。
水泵
螺杆泵
全焊接球阀
焊接球阀
离心泵
离心水泵
球阀
球阀厂10
污泥螺杆泵13
上海螺杆泵20
螺杆泵厂14
上海离心泵
直埋全焊接球阀
埋地全焊接球阀
磁力泵
化工泵
上海球阀95
上海球阀厂151
帕特
Fully Welded Ball Valve
all Welded Ball Valve


螺杆泵
全焊接球阀
焊接球阀
球阀厂
离心泵
水泵

发文章时间 2021-12-24 17:25:14
 IPIP: 已设置保密 1 楼     
1页 3条记录 当前第1
发表一个新主题 开启一个新投票 回复文章


中文版权所有: JavaThinker技术网站 Copyright 2016-2026 沪ICP备16029593号-2
荟萃Java程序员智慧的结晶,分享交流Java前沿技术。  联系我们
如有技术文章涉及侵权,请与本站管理员联系。