>>分享SPSS,Hadoop等大数据处理技术,以及分布式架构以及集群系统的构建 书籍支持  卫琴直播  品书摘要  在线测试  资源下载  联系我们
发表一个新主题 开启一个新投票 回复文章 您是本文章第 25365 个阅读者 刷新本主题
 * 贴子主题:  spark读取kafka后写入redis 回复文章 点赞(0)  收藏  
作者:Jacky    发表时间:2020-03-21 17:27:54     消息  查看  搜索  好友  邮件  复制  引用

                                                                                                

spark读取kafka后写入redis

package  com .prince .demo .test

import  com .typesafe .config .ConfigFactory
import org .apache .kafka .common .serialization .StringDeserializer
import org .apache .log4j.{Level, Logger}
import org .apache .spark .streaming.{Seconds, StreamingContext}
import org .apache .spark .streaming .kafka010 .ConsumerStrategies .Subscribe
import org .apache .spark .streaming .kafka010 .KafkaUtils
import org .apache .spark .streaming .kafka010 .LocationStrategies .PreferConsistent
import org .apache .spark .sql .SparkSession
import redis .clients .jedis .Jedis
/**
  * Created by prince on 2017/9/13.
  */

object SparkStreamingWriteRedis {

  Logger .getLogger( "org") .setLevel(Level .WARN)

  def main(args: Array[String]): Unit = {
    val spark = SparkSession .builder .appName( "SparkStreamingWriteRedis") .master( "local[*]") .getOrCreate()

    val sparkContext = spark .sparkContext
    val ssc = new StreamingContext(sparkContext, Seconds( 1))

    implicit val conf = ConfigFactory .load

    val kafkaParams = Map[String, Object](
       "bootstrap.servers" -> conf .getString( "kafka.brokers"),
       "key.deserializer" -> classOf[StringDeserializer],
       "value.deserializer" -> classOf[StringDeserializer],
       "group.id" -> conf .getString( "kafka.group"),
       "auto.offset.reset" ->  "latest",
       "enable.auto.commit" -> (false: java .lang .Boolean))

    val topic = conf .getString( "kafka.topics")
    val topics = Array(topic)
    val stream = KafkaUtils
       .createDirectStream(ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams))

    val input = stream .flatMap(line => {
      Some(line .value .toString)
    })

    input .foreachRDD(rdd => {
      rdd .foreachPartition(part => {
        val jedis = new Jedis( "192.168.1.97",  6379,  3000)
        jedis .auth( "123456")
        part .foreach( x => {
          jedis .lpush( "test_key",  x)
          jedis .close()
        })
      })
    })

    ssc .start()
    ssc .awaitTermination()
  }
}

----------------------------
原文链接:https://blog.csdn.net/qq_39869388/article/details/80366380

程序猿的技术大观园:www.javathinker.net



[这个贴子最后由 flybird 在 2020-03-22 12:05:09 重新编辑]
  Java面向对象编程-->数据类型
  JavaWeb开发-->Servlet技术详解(Ⅲ)
  JSP与Hibernate开发-->通过JPA API检索数据
  Java网络编程-->安全网络通信
  精通Spring-->通过Axios访问服务器
  Vue3开发-->绑定表单
  大数据存储单位介绍(TB、PB、EB、ZB、YB有多大)
  Redis服务器重要属性详解
  flume+spark streaming+redis完整篇
  Spark配置redis的jar包
  spark读取redis,连接池配置的范例代码
  海量数据解决思路之Hash算法
  使用Ambari搭建Hadoop集群
  Kubernetes(K8S)集群管理Docker容器(部署篇)
  Hadoop入门——初识Hadoop
  Hadoop与Spring的集成
  大数据采集、清洗、处理:使用MapReduce进行离线数据分析完整...
  Hadoop设计思路和目标
  学习大数据处理需要掌握的技能
  云计算和大数据到底有什么关系
  SSH免除免密登录的配置
  更多...
 IPIP: 已设置保密
楼主      
1页 0条记录 当前第1
发表一个新主题 开启一个新投票 回复文章


中文版权所有: JavaThinker技术网站 Copyright 2016-2026 沪ICP备16029593号-2
荟萃Java程序员智慧的结晶,分享交流Java前沿技术。  联系我们
如有技术文章涉及侵权,请与本站管理员联系。