>>分享SPSS,Hadoop等大数据处理技术,以及分布式架构以及集群系统的构建 书籍支持  卫琴直播  品书摘要  在线测试  资源下载  联系我们
发表一个新主题 开启一个新投票 回复文章 您是本文章第 27379 个阅读者 刷新本主题
 * 贴子主题:  spark读取kafka后写入redis 回复文章 点赞(0)  收藏  
作者:Jacky    发表时间:2020-03-21 17:27:54     消息  查看  搜索  好友  邮件  复制  引用

                                                                                                

spark读取kafka后写入redis

package  com .prince .demo .test

import  com .typesafe .config .ConfigFactory
import org .apache .kafka .common .serialization .StringDeserializer
import org .apache .log4j.{Level, Logger}
import org .apache .spark .streaming.{Seconds, StreamingContext}
import org .apache .spark .streaming .kafka010 .ConsumerStrategies .Subscribe
import org .apache .spark .streaming .kafka010 .KafkaUtils
import org .apache .spark .streaming .kafka010 .LocationStrategies .PreferConsistent
import org .apache .spark .sql .SparkSession
import redis .clients .jedis .Jedis
/**
  * Created by prince on 2017/9/13.
  */

object SparkStreamingWriteRedis {

  Logger .getLogger( "org") .setLevel(Level .WARN)

  def main(args: Array[String]): Unit = {
    val spark = SparkSession .builder .appName( "SparkStreamingWriteRedis") .master( "local[*]") .getOrCreate()

    val sparkContext = spark .sparkContext
    val ssc = new StreamingContext(sparkContext, Seconds( 1))

    implicit val conf = ConfigFactory .load

    val kafkaParams = Map[String, Object](
       "bootstrap.servers" -> conf .getString( "kafka.brokers"),
       "key.deserializer" -> classOf[StringDeserializer],
       "value.deserializer" -> classOf[StringDeserializer],
       "group.id" -> conf .getString( "kafka.group"),
       "auto.offset.reset" ->  "latest",
       "enable.auto.commit" -> (false: java .lang .Boolean))

    val topic = conf .getString( "kafka.topics")
    val topics = Array(topic)
    val stream = KafkaUtils
       .createDirectStream(ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams))

    val input = stream .flatMap(line => {
      Some(line .value .toString)
    })

    input .foreachRDD(rdd => {
      rdd .foreachPartition(part => {
        val jedis = new Jedis( "192.168.1.97",  6379,  3000)
        jedis .auth( "123456")
        part .foreach( x => {
          jedis .lpush( "test_key",  x)
          jedis .close()
        })
      })
    })

    ssc .start()
    ssc .awaitTermination()
  }
}

----------------------------
原文链接:https://blog.csdn.net/qq_39869388/article/details/80366380

程序猿的技术大观园:www.javathinker.net



[这个贴子最后由 flybird 在 2020-03-22 12:05:09 重新编辑]
  Java面向对象编程-->图形用户界面(上)
  JavaWeb开发-->Web运作原理(Ⅰ)
  JSP与Hibernate开发-->JPA API的高级用法
  Java网络编程-->创建非阻塞的HTTP服务器
  精通Spring-->绑定CSS样式
  Vue3开发-->Vue组件开发高级技术
  数据治理到底能治什么,怎么治
  推荐系统的设计
  实战SparkStream+Kafka+Redis实时计算商品销售额
  K8S使用dashboard管理集群
  playbook自动安装kafka集群
  TiDB在360的落地及实战干货
  使用Helm简化K8S应用管理
  Nginx+Keepalived高可用集群
  Spark on Yarn with Hive实战案例与常见问题解决
  spark DAGScheduler、TaskSchedule、Executor执行task源码分...
  SDN网络IPv6组播机制支持实时视频业务海量用户扩展
  大数据处理的基本流程
  数据仓库的两种建模方法
  Hive基础教程
  Hadoop生态系统介绍
  更多...
 IPIP: 已设置保密
楼主      
1页 0条记录 当前第1
发表一个新主题 开启一个新投票 回复文章


中文版权所有: JavaThinker技术网站 Copyright 2016-2026 沪ICP备16029593号-2
荟萃Java程序员智慧的结晶,分享交流Java前沿技术。  联系我们
如有技术文章涉及侵权,请与本站管理员联系。