>>分享SPSS,Hadoop等大数据处理技术,以及分布式架构以及集群系统的构建 书籍支持  卫琴直播  品书摘要  在线测试  资源下载  联系我们
发表一个新主题 开启一个新投票 回复文章 您是本文章第 26236 个阅读者 刷新本主题
 * 贴子主题:  spark读取kafka后写入redis 回复文章 点赞(0)  收藏  
作者:Jacky    发表时间:2020-03-21 17:27:54     消息  查看  搜索  好友  邮件  复制  引用

                                                                                                

spark读取kafka后写入redis

package  com .prince .demo .test

import  com .typesafe .config .ConfigFactory
import org .apache .kafka .common .serialization .StringDeserializer
import org .apache .log4j.{Level, Logger}
import org .apache .spark .streaming.{Seconds, StreamingContext}
import org .apache .spark .streaming .kafka010 .ConsumerStrategies .Subscribe
import org .apache .spark .streaming .kafka010 .KafkaUtils
import org .apache .spark .streaming .kafka010 .LocationStrategies .PreferConsistent
import org .apache .spark .sql .SparkSession
import redis .clients .jedis .Jedis
/**
  * Created by prince on 2017/9/13.
  */

object SparkStreamingWriteRedis {

  Logger .getLogger( "org") .setLevel(Level .WARN)

  def main(args: Array[String]): Unit = {
    val spark = SparkSession .builder .appName( "SparkStreamingWriteRedis") .master( "local[*]") .getOrCreate()

    val sparkContext = spark .sparkContext
    val ssc = new StreamingContext(sparkContext, Seconds( 1))

    implicit val conf = ConfigFactory .load

    val kafkaParams = Map[String, Object](
       "bootstrap.servers" -> conf .getString( "kafka.brokers"),
       "key.deserializer" -> classOf[StringDeserializer],
       "value.deserializer" -> classOf[StringDeserializer],
       "group.id" -> conf .getString( "kafka.group"),
       "auto.offset.reset" ->  "latest",
       "enable.auto.commit" -> (false: java .lang .Boolean))

    val topic = conf .getString( "kafka.topics")
    val topics = Array(topic)
    val stream = KafkaUtils
       .createDirectStream(ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams))

    val input = stream .flatMap(line => {
      Some(line .value .toString)
    })

    input .foreachRDD(rdd => {
      rdd .foreachPartition(part => {
        val jedis = new Jedis( "192.168.1.97",  6379,  3000)
        jedis .auth( "123456")
        part .foreach( x => {
          jedis .lpush( "test_key",  x)
          jedis .close()
        })
      })
    })

    ssc .start()
    ssc .awaitTermination()
  }
}

----------------------------
原文链接:https://blog.csdn.net/qq_39869388/article/details/80366380

程序猿的技术大观园:www.javathinker.net



[这个贴子最后由 flybird 在 2020-03-22 12:05:09 重新编辑]
  Java面向对象编程-->集合(上)
  JavaWeb开发-->使用过滤器
  JSP与Hibernate开发-->映射组成关系
  Java网络编程-->Java反射机制
  精通Spring-->创建综合购物网站应用
  Vue3开发-->绑定CSS样式
  害阿里程序员差点被当场开除的P0事故
  实时统计每天pv,uv的sparkStreaming结合redis结果存入mysql供...
  flume+spark streaming+redis完整篇
  spark-redis入门教程
  Spark学习之Redis
  kubeadm安装k8s集群1.17版本
  playbook自动安装kafka集群
  Hadoop中文词频统计
  SparkStreaming的实战案例-WordCount范例
  SNMP 已死 - Streaming Telemetry 流遥测技术
  SDN网络IPv6组播机制支持实时视频业务海量用户扩展
  大数据虚拟混算平台Moonbox配置指南
  Zookeeper+Kafka集群搭建
  大数据的学习方向
  SSH免除免密登录的配置
  更多...
 IPIP: 已设置保密
楼主      
1页 0条记录 当前第1
发表一个新主题 开启一个新投票 回复文章


中文版权所有: JavaThinker技术网站 Copyright 2016-2026 沪ICP备16029593号-2
荟萃Java程序员智慧的结晶,分享交流Java前沿技术。  联系我们
如有技术文章涉及侵权,请与本站管理员联系。