>>分享SPSS,Hadoop等大数据处理技术,以及分布式架构以及集群系统的构建 书籍支持  卫琴直播  品书摘要  在线测试  资源下载  联系我们
发表一个新主题 开启一个新投票 回复文章 您是本文章第 29166 个阅读者 刷新本主题
 * 贴子主题:  spark读取kafka后写入redis 回复文章 点赞(0)  收藏  
作者:Jacky    发表时间:2020-03-21 17:27:54     消息  查看  搜索  好友  邮件  复制  引用

                                                                                                

spark读取kafka后写入redis

package  com .prince .demo .test

import  com .typesafe .config .ConfigFactory
import org .apache .kafka .common .serialization .StringDeserializer
import org .apache .log4j.{Level, Logger}
import org .apache .spark .streaming.{Seconds, StreamingContext}
import org .apache .spark .streaming .kafka010 .ConsumerStrategies .Subscribe
import org .apache .spark .streaming .kafka010 .KafkaUtils
import org .apache .spark .streaming .kafka010 .LocationStrategies .PreferConsistent
import org .apache .spark .sql .SparkSession
import redis .clients .jedis .Jedis
/**
  * Created by prince on 2017/9/13.
  */

object SparkStreamingWriteRedis {

  Logger .getLogger( "org") .setLevel(Level .WARN)

  def main(args: Array[String]): Unit = {
    val spark = SparkSession .builder .appName( "SparkStreamingWriteRedis") .master( "local[*]") .getOrCreate()

    val sparkContext = spark .sparkContext
    val ssc = new StreamingContext(sparkContext, Seconds( 1))

    implicit val conf = ConfigFactory .load

    val kafkaParams = Map[String, Object](
       "bootstrap.servers" -> conf .getString( "kafka.brokers"),
       "key.deserializer" -> classOf[StringDeserializer],
       "value.deserializer" -> classOf[StringDeserializer],
       "group.id" -> conf .getString( "kafka.group"),
       "auto.offset.reset" ->  "latest",
       "enable.auto.commit" -> (false: java .lang .Boolean))

    val topic = conf .getString( "kafka.topics")
    val topics = Array(topic)
    val stream = KafkaUtils
       .createDirectStream(ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams))

    val input = stream .flatMap(line => {
      Some(line .value .toString)
    })

    input .foreachRDD(rdd => {
      rdd .foreachPartition(part => {
        val jedis = new Jedis( "192.168.1.97",  6379,  3000)
        jedis .auth( "123456")
        part .foreach( x => {
          jedis .lpush( "test_key",  x)
          jedis .close()
        })
      })
    })

    ssc .start()
    ssc .awaitTermination()
  }
}

----------------------------
原文链接:https://blog.csdn.net/qq_39869388/article/details/80366380

程序猿的技术大观园:www.javathinker.net



[这个贴子最后由 flybird 在 2020-03-22 12:05:09 重新编辑]
  Java面向对象编程-->泛型
  JavaWeb开发-->集合(上)
  JSP与Hibernate开发-->Servlet技术详解(Ⅰ)
  Java网络编程-->Servlet技术详解(Ⅱ)
  精通Spring-->数据库事务的并发问题的解决方案
  Vue3开发-->第一个helloapp应用
  数据治理到底能治什么,怎么治
  Redis服务器重要属性详解
  推荐系统的设计
  Spark批量读取Redis数据-Pipeline(Scala)
  spark读取redis,连接池配置的范例代码
  kafka+spark-streaming实时推荐系统性能优化笔记
  合并两套环境的FastDFS数据
  Hadoop中文词频统计
  Apacheの日志分割
  Spark SQL常见4种数据源(详细)
  Hadoop与Spring的集成
  大数据的学习方向
  云计算和大数据到底有什么关系
  用Maven构建Hadoop项目
  Hadoop的安装和基本操作命令
  更多...
 IPIP: 已设置保密
楼主      
1页 0条记录 当前第1
发表一个新主题 开启一个新投票 回复文章


中文版权所有: JavaThinker技术网站 Copyright 2016-2026 沪ICP备16029593号-2
荟萃Java程序员智慧的结晶,分享交流Java前沿技术。  联系我们
如有技术文章涉及侵权,请与本站管理员联系。