package com .prince .demo .test
import com .typesafe .config .ConfigFactory
import org .apache .kafka .common .serialization .StringDeserializer
import org .apache .log4j.{Level, Logger}
import org .apache .spark .streaming.{Seconds, StreamingContext}
import org .apache .spark .streaming .kafka010 .ConsumerStrategies .Subscribe
import org .apache .spark .streaming .kafka010 .KafkaUtils
import org .apache .spark .streaming .kafka010 .LocationStrategies .PreferConsistent
import org .apache .spark .sql .SparkSession
import redis .clients .jedis .Jedis
/**
* Created by prince on 2017/9/13.
*/
object SparkStreamingWriteRedis {
Logger .getLogger( "org") .setLevel(Level .WARN)
def main(args: Array[String]): Unit = {
val spark = SparkSession .builder .appName( "SparkStreamingWriteRedis") .master( "local[*]") .getOrCreate()
val sparkContext = spark .sparkContext
val ssc = new StreamingContext(sparkContext, Seconds( 1))
implicit val conf = ConfigFactory .load
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> conf .getString( "kafka.brokers"),
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> conf .getString( "kafka.group"),
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java .lang .Boolean))
val topic = conf .getString( "kafka.topics")
val topics = Array(topic)
val stream = KafkaUtils
.createDirectStream(ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams))
val input = stream .flatMap(line => {
Some(line .value .toString)
})
input .foreachRDD(rdd => {
rdd .foreachPartition(part => {
val jedis = new Jedis( "192.168.1.97", 6379, 3000)
jedis .auth( "123456")
part .foreach( x => {
jedis .lpush( "test_key", x)
jedis .close()
})
})
})
ssc .start()
ssc .awaitTermination()
}
} |