>>分享SPSS,Hadoop等大数据处理技术,以及分布式架构以及集群系统的构建 书籍支持  卫琴直播  品书摘要  在线测试  资源下载  联系我们
发表一个新主题 开启一个新投票 回复文章 您是本文章第 28757 个阅读者 刷新本主题
 * 贴子主题:  spark读取kafka后写入redis 回复文章 点赞(0)  收藏  
作者:Jacky    发表时间:2020-03-21 17:27:54     消息  查看  搜索  好友  邮件  复制  引用

                                                                                                

spark读取kafka后写入redis

package  com .prince .demo .test

import  com .typesafe .config .ConfigFactory
import org .apache .kafka .common .serialization .StringDeserializer
import org .apache .log4j.{Level, Logger}
import org .apache .spark .streaming.{Seconds, StreamingContext}
import org .apache .spark .streaming .kafka010 .ConsumerStrategies .Subscribe
import org .apache .spark .streaming .kafka010 .KafkaUtils
import org .apache .spark .streaming .kafka010 .LocationStrategies .PreferConsistent
import org .apache .spark .sql .SparkSession
import redis .clients .jedis .Jedis
/**
  * Created by prince on 2017/9/13.
  */

object SparkStreamingWriteRedis {

  Logger .getLogger( "org") .setLevel(Level .WARN)

  def main(args: Array[String]): Unit = {
    val spark = SparkSession .builder .appName( "SparkStreamingWriteRedis") .master( "local[*]") .getOrCreate()

    val sparkContext = spark .sparkContext
    val ssc = new StreamingContext(sparkContext, Seconds( 1))

    implicit val conf = ConfigFactory .load

    val kafkaParams = Map[String, Object](
       "bootstrap.servers" -> conf .getString( "kafka.brokers"),
       "key.deserializer" -> classOf[StringDeserializer],
       "value.deserializer" -> classOf[StringDeserializer],
       "group.id" -> conf .getString( "kafka.group"),
       "auto.offset.reset" ->  "latest",
       "enable.auto.commit" -> (false: java .lang .Boolean))

    val topic = conf .getString( "kafka.topics")
    val topics = Array(topic)
    val stream = KafkaUtils
       .createDirectStream(ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams))

    val input = stream .flatMap(line => {
      Some(line .value .toString)
    })

    input .foreachRDD(rdd => {
      rdd .foreachPartition(part => {
        val jedis = new Jedis( "192.168.1.97",  6379,  3000)
        jedis .auth( "123456")
        part .foreach( x => {
          jedis .lpush( "test_key",  x)
          jedis .close()
        })
      })
    })

    ssc .start()
    ssc .awaitTermination()
  }
}

----------------------------
原文链接:https://blog.csdn.net/qq_39869388/article/details/80366380

程序猿的技术大观园:www.javathinker.net



[这个贴子最后由 flybird 在 2020-03-22 12:05:09 重新编辑]
  Java面向对象编程-->集合(上)
  JavaWeb开发-->使用过滤器
  JSP与Hibernate开发-->映射一对多关联关系
  Java网络编程-->安全网络通信
  精通Spring-->组合(Composition)API
  Vue3开发-->虚拟DOM和render()函数
  flume+spark streaming+redis完整篇
  spark读取redis数据(交互式,scala单机版,java单机版)
  kubernetes之Ingress部署
  kubeadm安装k8s集群1.17版本
  Kubernetes(K8S)集群管理Docker容器(部署篇)
  Spark高级排序与TopN问题揭密
  spark DAGScheduler、TaskSchedule、Executor执行task源码分...
  Java处理大数据小技巧总结
  如何设计实时数据平台(设计篇)
  Hadoop是否过时?
  大数据的学习方向
  Hadoop起源及其四大特性详解
  Hadoop中文版使用文档
  MapReduce工作原理图文详解
  SSH免除免密登录的配置
  更多...
 IPIP: 已设置保密
楼主      
1页 0条记录 当前第1
发表一个新主题 开启一个新投票 回复文章


中文版权所有: JavaThinker技术网站 Copyright 2016-2026 沪ICP备16029593号-2
荟萃Java程序员智慧的结晶,分享交流Java前沿技术。  联系我们
如有技术文章涉及侵权,请与本站管理员联系。