>>分享SPSS,Hadoop等大数据处理技术,以及分布式架构以及集群系统的构建 书籍支持  卫琴直播  品书摘要  在线测试  资源下载  联系我们
发表一个新主题 开启一个新投票 回复文章 您是本文章第 25933 个阅读者 刷新本主题
 * 贴子主题:  Spark批量读取Redis数据-Pipeline(Scala) 回复文章 点赞(0)  收藏  
作者:flybird    发表时间:2020-03-21 13:20:15     消息  查看  搜索  好友  邮件  复制  引用

                

Spark批量读取Redis数据-Pipeline(Scala)

最近在处理数据时,需要将原始数据与Redis的数据进行join,在读取Redis的过程中,碰到了一些问题,顺便做个笔记,希望对其他同学也有所帮助。实验过程中,当数据量还是十万级别的时候,逐个读取Redis并无压力;但当数据量达到千万级别时,问题就油然而生了,即使是使用Spark的mapPartitions也无法解决。因此,就考虑使用Redis的pipeline了(如果你有更好的方法,还请不吝赐教)。PS:本文主要针对的是Scala语言,因为目前在网上还没有看到Scala版本的Redis pipeline,希望此文能给初学者提供一个参考。

文章会先介绍如何使用Scala逐个去读取Redis数据,然后再介绍pipeline的使用。            

方法一、逐行读取Redis数据

     在本文,主要使用的是 redis.clients.jedis.Jedis库,如果你是使用sbt来运行spark,可以在build.sbt中做如下配置:                

name  :=  "sparkRedisExp"

version  :=  "1.0.0"

scalaVersion  :=  "2.10.4"

libraryDependencies +=  "org.apache.spark"  %% "spark-core" %  "1.3.1"

libraryDependencies +=  "redis.clients" %  "jedis" %  "2.6.2"

resolvers +=  "Akka Respository" at  "http://repo.akka.io/releases/"

     相应的jedis库可以到Github中下载 jedis-2.6.2.jar:https://github.com/csuldw/WorkUtils/tree/master/Spark/deps。下面请看详细内容。            

导入Redis库

     首先导入redis库,这里使用redis.clients.jedis.Jedis库。                

import redis .clients .jedis .Jedis

连接Redis

     然后连接Redis,主要设置 redisHost、 redisPort,如果有密码,需要进行密码验证。                

   val redisHost =  "localhost"
val redisPort =  8080
val redisClient =  new Jedis(redisHost, redisPort)
redisClient.auth(redisPassword)

读取Redis数据

     接下来,就可以直接使用get获取redis数据                

val  keys = Array( "key1",  "key2",  "key3",  "key4")
for(key <-  keys){
  println(redisClient. get(key))
}

      上述方法并没有使用Redis的pipeline,当数据较少的时候,可以用来使用。下面介绍如何使用pipeline来批量读取Redis数据。            

方法二、使用Redis pipeline批量读取Redis数据

     相对于第一种方法,这里需要额外引入两个库, redis.clients.jedis.Pipeline和 redis.clients.jedis.Response。            

导入相关库

import redis .clients .jedis .Jedis
import redis .clients .jedis .Pipeline
import redis .clients .jedis .Response

连接Redis

     此操作与上面的一样,如下:                

   val redisHost =  "localhost"
val redisPort =  8080
val redisClient =  new Jedis(redisHost, redisPort)
redisClient.auth(redisPassword)

使用pipeline读取数据之一(简化版)

     先给出代码,下面再做解释。                

var tempRedisRes = Map[ String,  Response[ String]]()
val keys =  Array( "key1",  "key2",  "key3",  "key4")
val pp = redisClient.pipelined()
for(key <- keys){
  tempRedisRes ++= Map(key -> pp. get(key))
}
pp.sync()

      因为 redis.clients.jedis.Jedis的 pipelined下的 get方法获取的是一个 Response[String]类型的返回值,所以上面定义了一个临时变量 Map[String, Response[String]]类型的 tempRedisRes,key是String类型,value是Response[String]类型,用于保存 pp.get(key)的返回值。当for循环执行完之后,使用sync同步即可。这样便实现了Redis的Pipeline功能。            

使用pipeline读取数据之二(强化版)

     为了防止连接Redis时的意外失败,我们需要设置一个尝试次数,确保数据一定程度上的正确性。因此,在上面的代码外面增加一层连接逻辑,如下:                

  var tempRedisRes = Map[ String, Response[ String]]()
val keys =  Array( "key1",  "key2",  "key3",  "key4")
var tryTimes =  2
var flag =  false
while(tryTimes >  0 && !flag) {
   try{
    val pp = redisClient.pipelined()
     for(key <- keys){
      tempRedisRes ++= Map(key -> pp.get(key))
    }
    pp.sync()
    flag =  true
  } catch {
     case e: Exception => {
      flag =  false
      println( "Redis-Timeout" + e)
      tryTimes = tryTimes -  1
    }
  } finally{
    redisClient.disconnect()
  }
}

     再次说明: pp.get()得到的是一个 Response[String]的结果,详细内容请查看redis-clients-jedis-Pipeline.

                                  
                                                                    
----------------------------
原文链接:https://blog.csdn.net/Dream_angel_Z/article/details/51763066

程序猿的技术大观园:www.javathinker.net



[这个贴子最后由 flybird 在 2020-03-22 10:16:57 重新编辑]
  Java面向对象编程-->操作符
  JavaWeb开发-->使用过滤器
  JSP与Hibernate开发-->JPA API的高级用法
  Java网络编程-->客户端协议处理框架
  精通Spring-->Vue指令
  Vue3开发-->Vue组件开发基础
  一套可复用的方法论!从0-1搭建数据团队,看这篇就够了
  spark-redis使用简易脚本
  使用Helm简化K8S应用管理
  MySQL 每秒 570000 的写入,如何实现
  SparkStreaming的实战案例-WordCount范例
  Spark高级排序与TopN问题揭密
  kafka作为流式处理的上一层,为什么吞吐量那么大?
  如何设计实时数据平台(设计篇)-wx5c2da66615f74的博客
  大数据虚拟混算平台Moonbox配置指南
  hive 的支持的文件类型与 压缩格式
  ELK+Filebeat+Kafka+ZooKeeper 构建海量日志分析平台
  超详细的Hadoop2配置详解
  大数据采集、清洗、处理:使用MapReduce进行离线数据分析完整...
  学习大数据处理需要掌握的技能
  SSH免除免密登录的配置
  更多...
 IPIP: 已设置保密
楼主      
1页 0条记录 当前第1
发表一个新主题 开启一个新投票 回复文章


中文版权所有: JavaThinker技术网站 Copyright 2016-2026 沪ICP备16029593号-2
荟萃Java程序员智慧的结晶,分享交流Java前沿技术。  联系我们
如有技术文章涉及侵权,请与本站管理员联系。