|
spark-redis入门教程 本文是我翻译自redis官方网站文章,英文作者是Itamar Harber。 Spark-Redis是用Spark在redis上面进行读写数据操作的包。其支持redis的所有数据结构:String(字符串), Hash(哈希), List(列表), Set and Sorted Set(集合和有序集合)。此模块既可以用于Redis的standalone模式,也可用于集群情况。此外,Spark-Redis还提供了对Spark-Streaming的支持。
简介 本文概括了开始使用Apache Spark和Redis所需的基本步骤。关于Spark和Redis的安装暂不介绍。我们将使用“WORD COUNT”为例,来介绍Spark,Redis和spark-redis的联合使用。 |
Redis实验室最近于2015年9月发布了spark-redis package。显而易见,根据它的名字,这是一个为Apache Spark提供Redis连接的 连接件,它允许人们对Redis的数据结构在Spark中以RDD(弹性分布式数据集,Spark的专门术语。)的结构形式进行操作。
自从Spark开源以来,由于其针对大规模数据处理的高效且通用的引擎(轻松的超过了之前只能在单一平台上面操作的大数据平台),很快吸引了开发者们的注意。Spark采用了循环数据流和内存计算,使得其比Hadoop的MapReduce速度快了很多倍。由于Spark的易用性和SQL,Streaming以及Mlib等库的扩展,吸引了开发人员的眼球。
Redis将共享内存的架构(shared in-memory infrastructure)引入到Spark中,这使得Spark处理数据的速度又快了几个数量级。此外,Redis的数据结构简化了数据的获取和处理,使代码复杂度下降,并节省了网络通信和带宽的消耗。
因此,两者的结合,可以实现大规模数据量的实时处理任务。提速幅度有多大?如果Redis和Spark结合使用,结果证明,处理数据(以时间序列数据为例)的速度比Spark单单使用 进程内存或 堆外缓存来存储数据要快45倍――不是快45%,而是快整整45倍! 配置- Apache Spark
- Scala
- Jedis
- Redis
最低标准:
Apache Spark v1.4.0
Scala v2.10.4
Jedis v2.7
Redis v2.8.12 or v3.0.3 Example(Word Count计数器) 下面,我们使用word count例子来开始介绍Spark-Redis的使用。 Step 1:读取数据 这里,我们将对Redis源代码文件进行Word Count统计,希望得出一些有趣的结论。当上面配置好以后,我们运行:

注意:
$ bin/spark-shell --jars <path-to>/spark-redis-<version>.jar,<path-to>/jedis-<version>.jar |
需要在–jars 后面加上这两个jar包。
在这里,输入:
表示有100个Redis的源文件。当然,你也可以用ls -l redis/src/*.[ch] | wc -l 来统计文件个数。但是通过 WholeTextFileRDD的方式,可以看出job的划分stage和完成的情况。 Step 2:改变文件内容 接下来,是将文件变成一个个单词。相比于普遍使用的 TextFileRDD, WholeTextFileRDD返回一个 key-value型的结构数据。key对应的是每个文件的path;value对应的是文件的内容。
下面是 WholeTextFileRDD的代码。
/**
* Read a directory of text files from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI. Each file is read as a single record and returned in a
* key-value pair, where the key is the path of each file, the value is the content of each file.
*
* <p> For example, if you have the following files:
* {{{
* hdfs: [color=#009933]//a-hdfs-path/part- 00000
* hdfs: //a-hdfs-path/part- 00001
* ...
* hdfs: //a-hdfs-path/part-nnnnn
* }}}
*
* Do ` val rdd = sparkContext.wholeTextFile( "hdfs://a-hdfs-path")`,
*
* <p> then ` rdd` contains
* {{{
* (a-hdfs-path/part- 00000, its content)
* (a-hdfs-path/part- 00001, its content)
* ...
* (a-hdfs-path/part-nnnnn, its content)
* }}}
*
* @note Small files are preferred, large file is also allowable, but may cause bad performance.
* @note On some filesystems, ` .../path /*` can be a more efficient way to read all files
* in a directory rather than ` .../path/` or ` .../path`
*
* @param path Directory to the input data files, the path can be comma separated paths as the
* list of inputs.
* @param minPartitions A suggestion value of the minimal splitting number for input data.
*/[/color]
def wholeTextFiles(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[(String, String)] = withScope {
assertNotStopped()
val job = NewHadoopJob.getInstance(hadoopConfiguration)
// Use setInputPaths so that wholeTextFiles aligns with hadoopFile/textFile in taking
// comma separated files as input. (see SPARK- 7155)
NewFileInputFormat.setInputPaths(job, path)
val updateConf = job.getConfiguration
new WholeTextFileRDD(
this,
classOf[WholeTextFileInputFormat],
classOf[Text],
classOf[Text],
updateConf,
minPartitions).map(record => (record._1.toString, record._2.toString)).setName(path)
} |
值得注意的是:
① wholeTextFiles对于 大量的小文件效率较高,大文件效果不太好。
② 一些文件系统的路径名采用 通配符的形式效果比一个一个文件名添加上去更高效。
将文件变成(文件名:单词)的形式。(变量的名称:wtext代表WholeTextFiles, fwd代表FileWords。):

当fwds这个RDD将文件名和单词清楚的分开之后,我们就准备好进行词频统计了。在此之前,先对所有文件的单词进行汇总,进行一个整体的词频统计。
其结果为:
Step 3:将RDD写到Redis中 从这步开始,我们就开始使用到非常6的Redis了!我们将用Redis存储这个结果,以供后续计算使用。Redis的 有序集合对词频统计(单词:出现次数)场景特别适合,既可以根据单词找到单词的出现次数,也可以通过单词的出现次数找到符合要求的单词。
实现这个只需要一行代码!!!

一旦数据存放到Redis中,我们可以使用命令行操作:
Step 4:从Redis中读取RDD 相较于从Redis写数据,更常见的场景是从Redis中读数据。运行下面的代码,使得每一个文件的词频统计汇总起来变成一个总的输出。

用spark-shell进行操作,获取全部单词的个数(包括 重复的内容)。
闭注(Closing Notes) 当数据量小的时候,我们统计单词个数可以用
来实现。随着数据量的增大,需要找到新的方法来抽象解答方式并增加方法的灵活性和可扩展性。Spark是一个让人兴奋的大数据处理工具。更别说其和Hadoop生态系统耦合,并有诸如SQL,streaming,Mlib等扩展包。
Redis的出现,可以说是为Spark“ 解了渴“。spark-redis
通过简单的几行代码,将RDD和Redis的核心数据结构迅速互换。spark-redis包已经提供了直接的方式将RDD和redis的结构进行互换,并提供了友好的方式来获取key的名字。此外,连接件还通过将RDD分区转换成Redis的hash slot,有效的减少了引擎内部shuffling操作。
最后,这个开源的连接件还在不断的发展中,将来有可能会被spark设置成默认组件。 参考(References)1、RedisLabs/Spark-Redis
2、飞一般的感觉!当Spark遇到Redis~
3、spark-redis (homepage)
----------------------------
原文链接:https://blog.csdn.net/g11d111/article/details/72868189
程序猿的技术大观园:www.javathinker.net
[这个贴子最后由 flybird 在 2020-03-21 22:36:11 重新编辑]
|
|