>>分享SPSS,Hadoop等大数据处理技术,以及分布式架构以及集群系统的构建 书籍支持  卫琴直播  品书摘要  在线测试  资源下载  联系我们
发表一个新主题 开启一个新投票 回复文章 您是本文章第 20348 个阅读者 刷新本主题
 * 贴子主题:  flume+spark streaming+redis完整篇 回复文章 点赞(0)  收藏  
作者:mary    发表时间:2020-03-21 19:29:40     消息  查看  搜索  好友  邮件  复制  引用

                          

flume+spark streaming+redis完整篇

                                                                                                                                                                                                                                           一.前言

     本篇是用flume作为数据源,spark streaming来实时处理,然后把结果存在redis供查询.

     本篇介绍的是一个实时统计网站访问的pv的例子.

     本篇采用的各种版本如下 scala-2.10.4   spark-1.6.1  flume-1.6.0

     本篇采用的spark集群为sdandalone模式

二.数据源flume配置

     flume的详细说明请自行百度.这里的flume source采用http,并且使用json handler来处理.

     source 配置:

a1.sources.r2.type = http

a1.sources.r2.bind=10.8.23.58

a1.sources.r2.port = 5140

a1.sources.r2.channels = c3

a1.sources.r2.handler = org.apache.flume.source.http.JSONHandler

channel因为测试,所以选择的是内存方式,实际根据情况,建议使用flie模式,并且配置checkpoint防止数据丢失.

     channel配置:

a1.channels.c3.type = memory

a1.channels.c3.capacity = 100

a1.channels.c3.transactionCapacity = 100

sink有两种,如下

     flume=>spark streaming有两种方式

     1.推模式:这种模式比较简单,直接连接上对应的avro端口即可,但是有个最大的问题,你必须先启动spark streaming任务,然后观察这个端口开在哪台spark节点上,并且每次启动都会随机到某一个节点,然后再去改flume的配置,往那台机器上发数据,这就比较蛋疼,如果你有100台flume,那会让人疯狂的.因此这个模式,只有你的flume数量比较少的情况下适用.

     推模式的sink配置:

a1.sinks.k3.type = avro

a1.sinks.k3.channel = c3

a1.sinks.k3.hostname = 10.8.23.32

a1.sinks.k3.port = 4545

     2.拉模式:

     这里建议先使用推模式,等推模式跑通了,再切换到拉模式. 拉模式相比推模式稍微复杂点,主要复杂在flume的配置,官网有很详细的说明,我这里会把用到的都描述出来.

     首先需要把3个jar放到flume/lib下.(分别是:spark-streaming-flume-sink_2.10-1.6.1.jar ,scala-library-2.10.5.jar,  commons-lang3-3.3.2.jar,在最后的网盘地址里面有)

     拉模式的sink配置

a1.sinks = spark

a1.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink

a1.sinks.spark.hostname = 10.2.23.58

a1.sinks.spark.port = 4545

a1.sinks.spark.channel = c3

        至此 flume配置完成.可以启动观察一下.

     三.代码开发

   object    PvStatistic {

   def main (args: Array[ String ]):  Unit  = {

   val masterUrl =  "spark://10.8.23.112:7077"

   val conf =   new SparkConf().setMaster(masterUrl).setAppName( "PvStatistic" )

   val ssc =   new StreamingContext(conf ,  Seconds ( 15 ))

//    推模式
//    val flumeStream = FlumeUtils.createStream(ssc, "10.8.23.58", 4545, StorageLevel.MEMORY_ONLY_SER_2)
//    拉模式
       val flumeStream = FlumeUtils. createPollingStream (ssc ,  "10.8.23.58" ,  4545 ,  StorageLevel. MEMORY_ONLY_SER_2 )

        flumeStream.foreachRDD(rdd => {
          rdd.foreachPartition(it=>{
             val jedis = RedisClient. pool .getResource
            it.foreach(event=>{
               val sensorInfo =   new String(event. event .getBody.array())  //单行记录
//              println(sensorInfo)
                 val json = JSONObject. fromObject (sensorInfo) ;
               val url=json.getString( "url" )
                jedis.hincrBy( "Spark:PV" ,  url ,  1 ) ;
              })
            RedisClient. pool .returnResource(jedis)
          })
        })

    ssc.start()
    ssc.awaitTermination()
  }

   两种模式对于数据源的获取只是方法不一样,处理逻辑都一样.

   需要的jar包 我直接贴pom文件

  <dependency>
       <groupId>org.apache.spark </groupId>
       <artifactId>spark-core_2.10 </artifactId>
       <version>1.6.1 </version>
       <scope>provided </scope>
   </dependency>
<dependency>
   <groupId>org.apache.spark </groupId>
   <artifactId>spark-streaming_2.10 </artifactId>
   <version>1.6.1 </version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-flume_2.10 -->
<dependency>
   <groupId>org.apache.spark </groupId>
   <artifactId>spark-streaming-flume_2.10 </artifactId>
   <version>1.6.1 </version>
</dependency>

  <dependency>
       <groupId>redis.clients </groupId>
       <artifactId>jedis </artifactId>
       <version>2.8.1 </version>
  </dependency>

  <dependency>
   <groupId>net.sf.json-lib </groupId>
   <artifactId>json-lib </artifactId>
   <version>2.4 </version>
   <classifier>jdk15 </classifier>
</dependency>

    完成以后就是打包上传到spark主节点了,这里提一下,因为需要很多依赖包,所以建议直接打成一个包含所有依赖的jar.

    推荐一个maven插件来做这个事.

  <plugin>
   <groupId>org.apache.maven.plugins </groupId>
   <artifactId>maven-shade-plugin </artifactId>
   <version>2.3 </version>
   <executions>
     <execution>
       <phase>package </phase>
       <goals>
         <goal>shade </goal>
       </goals>
       <configuration>
         <filters>
           <filter>
             <artifact>*:* </artifact>
             <excludes>
               <exclude>META-INF/*.SF </exclude>
               <exclude>META-INF/*.DSA </exclude>
               <exclude>META-INF/*.RSA </exclude>
             </excludes>
           </filter>
         </filters>
         <transformers>
           <transformer
                    implementation= "org.apache.maven.plugins.shade.resource.ManifestResourceTransformer" >
             <mainClass>com.defonds.RsaEncryptor </mainClass>
           </transformer>
           <transformer
                    implementation= "org.apache.maven.plugins.shade.resource.AppendingTransformer" >
             <resource>META-INF/spring.handlers </resource>
           </transformer>
           <transformer
                    implementation= "org.apache.maven.plugins.shade.resource.AppendingTransformer" >
             <resource>META-INF/spring.schemas </resource>
           </transformer>
         </transformers>
       </configuration>
     </execution>
   </executions>
</plugin>

   有了这个插件,直接maven clean package打包就行了,打出来的包包含所有依赖文件.

   然后把这个包上传到spark主节点,在spark/bin下执行

     spark-submit --master spark://cdh112:7077 --class com.dome.PvStatistic  /opt/spark/job/analyseSys-1.0-SNAPSHOT.jar

   然后可以向之前配置的flume的接收端口发送http请求测试了

     发送内容:

    {

        "body":"{\"url\": \"http://20160926 16:02\"}",

        "headers":{"v1":"log"}

        }


    在redis里面可以看到 SparkPv开头的key下面 每个访问地址的访问次数.

    上面提到的jar包

    链接:http://pan.baidu.com/s/1kUEJJJx 密码:9fxv

    最后感谢一下群内的各位兄弟以及群主aDog~ 有兴趣交流的可以加入QQ群:459898801

                                                                                        
----------------------------
原文链接:https://blog.csdn.net/ghost06211/article/details/52667958

程序猿的技术大观园:www.javathinker.net



[这个贴子最后由 flybird 在 2020-03-22 21:27:01 重新编辑]
  Java面向对象编程-->第一个Java程序
  JavaWeb开发-->使用Session(Ⅰ)
  JSP与Hibernate开发-->通过JPA API检索数据
  Java网络编程-->RMI框架
  精通Spring-->CSS过渡和动画
  Vue3开发-->Vue简介
  Spark客户端之Spark Submit的使用
  demo2 Kafka+Spark Streaming+Redis实时计算整合实践 foreac...
  spark读取kafka后写入redis
  K8S使用dashboard管理集群
  Yarn资源调度系统架构与原理分析
  MapReduce实现自定义排序功能
  Hadoop中文词频统计
  基于Spark MLlib平台的协同过滤算法---电影推荐系统
  hive 的支持的文件类型与 压缩格式
  Spark Thrift JDBCServer应用场景解析与实战案例
  大数据采集、清洗、处理:使用MapReduce进行离线数据分析完整...
  Hadoop设计思路和目标
  数据科学最终迁移到云端的5个原因
  MapReduce工作原理图文详解
  Hadoop常用命令参考
  更多...
 IPIP: 已设置保密
楼主      
1页 0条记录 当前第1
发表一个新主题 开启一个新投票 回复文章


中文版权所有: JavaThinker技术网站 Copyright 2016-2026 沪ICP备16029593号-2
荟萃Java程序员智慧的结晶,分享交流Java前沿技术。  联系我们
如有技术文章涉及侵权,请与本站管理员联系。