>>分享SPSS,Hadoop等大数据处理技术,以及分布式架构以及集群系统的构建 书籍支持  卫琴直播  品书摘要  在线测试  资源下载  联系我们
发表一个新主题 开启一个新投票 回复文章 您是本文章第 20207 个阅读者 刷新本主题
 * 贴子主题:  spark-redis入门教程 回复文章 点赞(0)  收藏  
作者:flybird    发表时间:2020-03-21 22:36:11     消息  查看  搜索  好友  邮件  复制  引用

                                                                                                

spark-redis入门教程

             本文是我翻译自redis官方网站文章,英文作者是Itamar Harber。    
    Spark-Redis是用Spark在redis上面进行读写数据操作的包。其支持redis的所有数据结构:String(字符串), Hash(哈希), List(列表), Set and Sorted Set(集合和有序集合)。此模块既可以用于Redis的standalone模式,也可用于集群情况。此外,Spark-Redis还提供了对Spark-Streaming的支持。

简介

本文概括了开始使用Apache Spark和Redis所需的基本步骤。关于Spark和Redis的安装暂不介绍。我们将使用“WORD COUNT”为例,来介绍Spark,Redis和spark-redis的联合使用。

     Redis实验室最近于2015年9月发布了spark-redis package。显而易见,根据它的名字,这是一个为Apache Spark提供Redis连接的 连接件,它允许人们对Redis的数据结构在Spark中以RDD(弹性分布式数据集,Spark的专门术语。)的结构形式进行操作。

    自从Spark开源以来,由于其针对大规模数据处理的高效且通用的引擎(轻松的超过了之前只能在单一平台上面操作的大数据平台),很快吸引了开发者们的注意。Spark采用了循环数据流和内存计算,使得其比Hadoop的MapReduce速度快了很多倍。由于Spark的易用性和SQL,Streaming以及Mlib等库的扩展,吸引了开发人员的眼球。

    Redis将共享内存的架构(shared in-memory infrastructure)引入到Spark中,这使得Spark处理数据的速度又快了几个数量级。此外,Redis的数据结构简化了数据的获取和处理,使代码复杂度下降,并节省了网络通信和带宽的消耗。

     因此,两者的结合,可以实现大规模数据量的实时处理任务。提速幅度有多大?如果Redis和Spark结合使用,结果证明,处理数据(以时间序列数据为例)的速度比Spark单单使用 进程内存 堆外缓存来存储数据要快45倍――不是快45%,而是快整整45倍!            

配置

  • Apache Spark
  • Scala
  • Jedis
  • Redis
     最低标准:

Apache Spark v1.4.0

Scala v2.10.4

Jedis v2.7

Redis v2.8.12 or v3.0.3                    

Example(Word Count计数器)

     下面,我们使用word count例子来开始介绍Spark-Redis的使用。            

Step 1:读取数据

     这里,我们将对Redis源代码文件进行Word Count统计,希望得出一些有趣的结论。当上面配置好以后,我们运行:

点击在新窗口中浏览原图
CTRL+鼠标滚轮放大或缩小

         注意:

$ bin/spark-shell --jars <path-to>/spark-redis-<version>.jar,<path-to>/jedis-<version>.jar

         需要在–jars 后面加上这两个jar包。

         在这里,输入:

点击在新窗口中浏览原图
CTRL+鼠标滚轮放大或缩小                

  wtext.count =  100

      表示有100个Redis的源文件。当然,你也可以用ls -l redis/src/*.[ch] | wc -l 来统计文件个数。但是通过 WholeTextFileRDD的方式,可以看出job的划分stage和完成的情况。            

Step 2:改变文件内容

     接下来,是将文件变成一个个单词。相比于普遍使用的 TextFileRDD, WholeTextFileRDD返回一个 key-value型的结构数据。key对应的是每个文件的path;value对应的是文件的内容。

下面是 WholeTextFileRDD的代码。                

   /**
   * Read a directory  of text files from HDFS, a local file system (available  on all nodes),  or any
   * Hadoop-supported file system URI. Each file  is read as a single record  and returned  in a
   * key-value pair, where the key  is the path  of each file, the value  is the content  of each file.
   *
   * <p> For example,  if you have the following  files:
   * {{{
   *    hdfs: [color=#009933]//a-hdfs-path/part- 00000
   *    hdfs: //a-hdfs-path/part- 00001
   *   ...
   *    hdfs: //a-hdfs-path/part-nnnnn
   * }}}
   *
   * Do ` val rdd = sparkContext.wholeTextFile( "hdfs://a-hdfs-path")`,
   *
   * <p>  then ` rdd` contains
   * {{{
   *   (a-hdfs-path/part- 00000, its content)
   *   (a-hdfs-path/part- 00001, its content)
   *   ...
   *   (a-hdfs-path/part-nnnnn, its content)
   * }}}
   *
   *  @note Small files are preferred, large file  is also allowable, but may cause bad performance.
   *  @note On some filesystems, ` .../path /*` can be a more efficient way to read all files
   *        in a directory rather than ` .../path/`  or ` .../path`
   *
   *  @param path Directory to the input data files, the path can be comma separated paths as the
   *             list  of inputs.
   *  @param minPartitions A suggestion value  of the minimal splitting number  for input data.
   */[/color]  
  def wholeTextFiles(  
       path: String,  
       minPartitions: Int = defaultMinPartitions): RDD[(String, String)] = withScope {  
    assertNotStopped()  
    val job = NewHadoopJob.getInstance(hadoopConfiguration)  
     // Use setInputPaths so that wholeTextFiles aligns  with hadoopFile/textFile  in taking  
     // comma separated files as input. (see SPARK- 7155)  
    NewFileInputFormat.setInputPaths(job, path)  
    val updateConf = job.getConfiguration  
     new WholeTextFileRDD(  
       this,  
      classOf[WholeTextFileInputFormat],  
      classOf[Text],  
      classOf[Text],  
      updateConf,  
      minPartitions).map(record  => (record._1.toString, record._2.toString)).setName(path)  
  }  

       值得注意的是

     ① wholeTextFiles对于 大量的小文件效率较高,大文件效果不太好。

     ② 一些文件系统的路径名采用 通配符的形式效果比一个一个文件名添加上去更高效。

         将文件变成(文件名:单词)的形式。(变量的名称:wtext代表WholeTextFiles, fwd代表FileWords。):

点击在新窗口中浏览原图
CTRL+鼠标滚轮放大或缩小

         当fwds这个RDD将文件名和单词清楚的分开之后,我们就准备好进行词频统计了。在此之前,先对所有文件的单词进行汇总,进行一个整体的词频统计。点击在新窗口中浏览原图
CTRL+鼠标滚轮放大或缩小

         其结果为:

点击在新窗口中浏览原图
CTRL+鼠标滚轮放大或缩小    

Step 3:将RDD写到Redis中

     从这步开始,我们就开始使用到非常6的Redis了!我们将用Redis存储这个结果,以供后续计算使用。Redis的 有序集合对词频统计(单词:出现次数)场景特别适合,既可以根据单词找到单词的出现次数,也可以通过单词的出现次数找到符合要求的单词。

         实现这个只需要一行代码!!!

点击在新窗口中浏览原图
CTRL+鼠标滚轮放大或缩小

         一旦数据存放到Redis中,我们可以使用命令行操作:

         点击在新窗口中浏览原图
CTRL+鼠标滚轮放大或缩小            

Step 4:从Redis中读取RDD

        相较于从Redis写数据,更常见的场景是从Redis中读数据。运行下面的代码,使得每一个文件的词频统计汇总起来变成一个总的输出。

         点击在新窗口中浏览原图
CTRL+鼠标滚轮放大或缩小

         用spark-shell进行操作,获取全部单词的个数(包括 重复的内容)。

         点击在新窗口中浏览原图
CTRL+鼠标滚轮放大或缩小            

闭注(Closing Notes)

     当数据量小的时候,我们统计单词个数可以用

wc -w

来实现。随着数据量的增大,需要找到新的方法来抽象解答方式并增加方法的灵活性和可扩展性。Spark是一个让人兴奋的大数据处理工具。更别说其和Hadoop生态系统耦合,并有诸如SQL,streaming,Mlib等扩展包。

  Redis的出现,可以说是为Spark“ 解了渴“。spark-redis

通过简单的几行代码,将RDD和Redis的核心数据结构迅速互换。spark-redis包已经提供了直接的方式将RDD和redis的结构进行互换,并提供了友好的方式来获取key的名字。此外,连接件还通过将RDD分区转换成Redis的hash slot,有效的减少了引擎内部shuffling操作。

         最后,这个开源的连接件还在不断的发展中,将来有可能会被spark设置成默认组件。            

参考(References)

1、RedisLabs/Spark-Redis

2、飞一般的感觉!当Spark遇到Redis~

3、spark-redis (homepage)
                                    
                                                                    
----------------------------
原文链接:https://blog.csdn.net/g11d111/article/details/72868189

程序猿的技术大观园:www.javathinker.net




[这个贴子最后由 flybird 在 2020-03-21 22:36:11 重新编辑]
  Java面向对象编程-->数组
  JavaWeb开发-->使用过滤器
  JSP与Hibernate开发-->Spring、JPA与Hibernate的整合
  Java网络编程-->RMI框架
  精通Spring-->计算属性和数据监听
  Vue3开发-->计算属性和数据监听
  数据治理到底能治什么,怎么治
  Redis服务器重要属性详解
  springboot的缓存技术
  demo2 Kafka+Spark Streaming+Redis实时计算整合实践 foreac...
  mongodb与redis与Hbase比较
  MapReduce实现自定义排序功能
  Hadoop安装过程
  Hadoop 之 HDFS
  搭建高可用的Replication集群归档大量的冷数据
  hadoop从零开始--HDFS篇
  SDN网络IPv6组播机制支持实时视频业务海量用户扩展
  大数据虚拟混算平台Moonbox配置指南
  Hadoop小文件优化
  深入剖析Hadoop HBase
  Hadoop生态系统介绍
  更多...
 IPIP: 已设置保密
楼主      
1页 0条记录 当前第1
发表一个新主题 开启一个新投票 回复文章


中文版权所有: JavaThinker技术网站 Copyright 2016-2026 沪ICP备16029593号-2
荟萃Java程序员智慧的结晶,分享交流Java前沿技术。  联系我们
如有技术文章涉及侵权,请与本站管理员联系。