|
实时统计每天pv,uv的sparkStreaming
结合redis结果存入mysql供前端展示 最近有个需求,实时统计pv,uv,结果按照date,hour,pv,uv来展示,按天统计,第二天重新统计,当然了实际还需要按照类型字段分类统计pv,uv,比如按照date,hour,pv,uv,type来展示。这里介绍最基本的pv,uv的展示。 id | uv | pv | date | hour |
---|
1 | 155599 | 306053 | 2018-07-27 | 18 |
关于什么是pv,uv,可以参见这篇博客https://blog.csdn.net/petermsh/article/details/78652246
1、项目流程 
日志数据从flume采集过来,落到hdfs供其它离线业务使用,也会sink到kafka,sparkStreaming从kafka拉数据过来,计算pv,uv,uv是用的redis的set集合去重,最后把结果写入mysql数据库,供前端展示使用。
2、具体过程 1)pv的计算 拉取数据有两种方式,基于received和direct方式,这里用direct直拉的方式,用的mapWithState算子保存状态,这个算子与updateStateByKey一样,并且性能更好。当然了实际中数据过来需要经过清洗,过滤,才能使用。
定义一个状态函数
// 实时流量状态更新函数
val mapFunction = (datehour :String , pv :Option [Long ] , state :State [Long ] ) = > {
val accuSum = pv . getOrElse ( 0L ) + state . getOption ( ) . getOrElse ( 0L )
val output = (datehour ,accuSum )
state . update (accuSum )
output
} |
计算pv
val stateSpec = StateSpec . function (mapFunction )
val helper_count_all = helper_data . map (x = > (x ._1 , 1L ) ) . mapWithState (stateSpec ) . stateSnapshots ( ) . repartition ( 2 ) |
这样就很容易的把pv计算出来了。
2)uv的计算 uv是要全天去重的,每次进来一个batch的数据,如果用原生的reduceByKey或者groupByKey对配置要求太高,在配置较低情况下,我们申请了一个93G的redis用来去重,原理是每进来一条数据,将date作为key,guid加入set集合,20秒刷新一次,也就是将set集合的尺寸取出来,更新一下数据库即可。
helper_data_dis . foreachRDD (rdd = > {
rdd . foreachPartition (eachPartition = > {
var jedis : Jedis = null
try {
jedis = getJedis
eachPartition . foreach (x = > {
val arr = x ._2 . split ( "\t" )
val date : String = arr ( 0 ) . split ( ":" ) ( 0 )
// helper 统计
val key0 = "helper_" + date
jedis . sadd (key0 , x ._1 )
jedis . expire (key0 , ConfigFactory .rediskeyexists )
// helperversion 统计
val key = date + "_" + arr ( 1 )
jedis . sadd (key , x ._1 )
jedis . expire (key , ConfigFactory .rediskeyexists )
} )
} catch {
case e : Exception = > {
logger . error (e )
logger2 . error (HelperHandle .getClass .getSimpleName + e )
}
} finally {
if (jedis != null ) {
closeJedis (jedis )
}
}
} )
} )
// 获取jedis连接
def getJedis : Jedis = {
val jedis = RedisPoolUtil .getPool .getResource
jedis
}
// 释放jedis连接
def closeJedis (jedis : Jedis ) : Unit = {
RedisPoolUtil .getPool . returnResource (jedis )
} |
redis连接池代码
:
package com .js .ipflow .utils
import com .js .ipflow .start .ConfigFactory
import org .apache .commons .pool2 .impl .GenericObjectPoolConfig
import redis .clients .jedis .JedisPool
/**
* redis 连接池工具类
* @author keguang
*/
object RedisPoolUtil extends Serializable {
@ transient private var pool : JedisPool = null
/**
* 读取jedis配置信息, 出发jedis初始化
*/
def initJedis : Unit = {
ConfigFactory . initConfig ( )
val maxTotal = 50
val maxIdle = 30
val minIdle = 10
val redisHost = ConfigFactory .redishost
val redisPort = ConfigFactory .redisport
val redisTimeout = ConfigFactory .redistimeout
val redisPassword = ConfigFactory .redispassword
makePool (redisHost , redisPort , redisTimeout , redisPassword , maxTotal , maxIdle , minIdle )
}
def makePool (redisHost : String , redisPort : Int , redisTimeout : Int ,redisPassword :String , maxTotal : Int , maxIdle : Int , minIdle : Int ) : Unit = {
init (redisHost , redisPort , redisTimeout , redisPassword , maxTotal , maxIdle , minIdle , true , false , 10000 )
}
/**
* 初始化jedis连接池
* @param redisHost host
* @param redisPort 端口
* @param redisTimeout 连接redis超时时间
* @param redisPassword redis密码
* @param maxTotal 总的连接数
* @param maxIdle 最大空闲连接数
* @param minIdle 最小空闲连接数
* @param testOnBorrow
* @param testOnReturn
* @param maxWaitMillis
*/
def init (redisHost : String , redisPort : Int , redisTimeout : Int ,redisPassword :String , maxTotal : Int , maxIdle : Int , minIdle : Int , testOnBorrow : Boolean , testOnReturn : Boolean , maxWaitMillis : Long ) : Unit = {
if (pool == null ) {
val poolConfig = new GenericObjectPoolConfig ( )
poolConfig . setMaxTotal (maxTotal )
poolConfig . setMaxIdle (maxIdle )
poolConfig . setMinIdle (minIdle )
poolConfig . setTestOnBorrow (testOnBorrow )
poolConfig . setTestOnReturn (testOnReturn )
poolConfig . setMaxWaitMillis (maxWaitMillis )
pool = new JedisPool (poolConfig , redisHost , redisPort , redisTimeout ,redisPassword )
val hook = new Thread {
override def run = pool . destroy ( )
}
sys . addShutdownHook (hook .run )
}
}
def getPool : JedisPool = {
if (pool == null ) {
initJedis
}
pool
}
} |
3)结果保存到数据库 结果保存到mysql,数据库,20秒刷新一次数据库,前端展示刷新一次,就会重新查询一次数据库,做到实时统计展示pv,uv的目的。
/**
* 插入数据
*
* @param data (addTab(datehour)+helperversion)
* @param tbName
* @param colNames
*/
def insertHelper (data : DStream [ (String , Long ) ] , tbName : String , colNames : String * ) : Unit = {
data . foreachRDD (rdd = > {
val tmp_rdd = rdd . map (x = > x ._1 . substring ( 11 , 13 ) .toInt )
if ( !rdd . isEmpty ( ) ) {
val hour_now = tmp_rdd . max ( ) // 获取当前结果中最大的时间,在数据恢复中可以起作用
rdd . foreachPartition (eachPartition = > {
var jedis : Jedis = null
var conn : Connection = null
try {
jedis = getJedis
conn = MysqlPoolUtil . getConnection ( )
conn . setAutoCommit ( false )
val stmt = conn . createStatement ( )
eachPartition . foreach (x = > {
if (colNames .length == 7 ) {
val datehour = x ._1 . split ( "\t" ) ( 0 )
val helperversion = x ._1 . split ( "\t" ) ( 1 )
val date_hour = datehour . split ( ":" )
val date = date_hour ( 0 )
val hour = date_hour ( 1 ) .toInt
val colName0 = colNames ( 0 ) // date
val colName1 = colNames ( 1 ) // hour
val colName2 = colNames ( 2 ) // count_all
val colName3 = colNames ( 3 ) // count
val colName4 = colNames ( 4 ) // helperversion
val colName5 = colNames ( 5 ) // datehour
val colName6 = colNames ( 6 ) // dh
val colValue0 = addYin (date )
val colValue1 = hour
val colValue2 = x ._2 .toInt
val colValue3 = jedis . scard (date + "_" + helperversion ) // // 2018-07-08_10.0.1.22
val colValue4 = addYin (helperversion )
var colValue5 = if (hour < 10 ) "'" + date + " 0" + hour + ":00 " + helperversion + "'" else "'" + date + " " + hour + ":00 " + helperversion + "'"
val colValue6 = if (hour < 10 ) "'" + date + " 0" + hour + ":00'" else "'" + date + " " + hour + ":00'"
var sql = ""
if (hour == hour_now ) { // uv只对现在更新
sql = s "insert into ${tbName}(${colName0},${colName1},${colName2},${colName3},${colName4},${colName5},${colName6}) values(${colValue0},${colValue1},${colValue2},${colValue3},${colValue4},${colValue5},${colValue6}) on duplicate key update ${colName2} = ${colValue2},${colName3} = ${colValue3}"
logger . warn (sql )
stmt . addBatch (sql )
} /* else {
sql = s"insert into ${tbName}(${colName0},${colName1},${colName2},${colName4},${colName5},${colName6}) values(${colValue0},${colValue1},${colValue2},${colValue4},${colValue5},${colValue6}) on duplicate key update ${colName2} = ${colValue2}"
}*/
} else if (colNames .length == 5 ) {
val date_hour = x ._1 . split ( ":" )
val date = date_hour ( 0 )
val hour = date_hour ( 1 ) .toInt
val colName0 = colNames ( 0 ) // date
val colName1 = colNames ( 1 ) // hour
val colName2 = colNames ( 2 ) // helper_count_all
val colName3 = colNames ( 3 ) // helper_count
val colName4 = colNames ( 4 ) // dh
val colValue0 = addYin (date )
val colValue1 = hour
val colValue2 = x ._2 .toInt
val colValue3 = jedis . scard ( "helper_" + date ) // // helper_2018-07-08
val colValue4 = if (hour < 10 ) "'" + date + " 0" + hour + ":00'" else "'" + date + " " + hour + ":00'"
var sql = ""
if (hour == hour_now ) { // uv只对现在更新
sql = s "insert into ${tbName}(${colName0},${colName1},${colName2},${colName3},${colName4}) values(${colValue0},${colValue1},${colValue2},${colValue3},${colValue4}) on duplicate key update ${colName2} = ${colValue2},${colName3} = ${colValue3}"
logger . warn (sql )
stmt . addBatch (sql )
}
}
} )
stmt . executeBatch ( ) // 批量执行sql语句
conn . commit ( )
} catch {
case e : Exception = > {
logger . error (e )
logger2 . error (HelperHandle .getClass .getSimpleName + e )
}
} finally {
if (jedis != null ) {
closeJedis (jedis )
}
if (conn != null ) {
conn . close ( )
}
}
} )
}
} )
}
// 计算当前时间距离次日零点的时长(毫秒)
def resetTime = {
val now = new Date ( )
val todayEnd = Calendar .getInstance
todayEnd . set (Calendar .HOUR_OF_DAY , 23 ) // Calendar.HOUR 12小时制
todayEnd . set (Calendar .MINUTE , 59 )
todayEnd . set (Calendar .SECOND , 59 )
todayEnd . set (Calendar .MILLISECOND , 999 )
todayEnd .getTimeInMillis - now .getTime
} |
msql 连接池代码
package com .js .ipflow .utils
import java .sql . {Connection , PreparedStatement , ResultSet }
import com .js .ipflow .start .ConfigFactory
import org .apache .commons .dbcp .BasicDataSource
import org .apache .logging .log4j .LogManager
/**
*jdbc mysql 连接池工具类
* @author keguang
*/
object MysqlPoolUtil {
val logger = LogManager . getLogger (MysqlPoolUtil .getClass .getSimpleName )
private var bs :BasicDataSource = null
/**
* 创建数据源
* @return
*/
def getDataSource ( ) :BasicDataSource = {
if (bs ==null ) {
ConfigFactory . initConfig ( )
bs = new BasicDataSource ( )
bs . setDriverClassName ( "com.mysql.jdbc.Driver" )
bs . setUrl (ConfigFactory .mysqlurl )
bs . setUsername (ConfigFactory .mysqlusername )
bs . setPassword (ConfigFactory .mysqlpassword )
bs . setMaxActive ( 50 ) // 设置最大并发数
bs . setInitialSize ( 20 ) // 数据库初始化时,创建的连接个数
bs . setMinIdle ( 20 ) // 在不新建连接的条件下,池中保持空闲的最少连接数。
bs . setMaxIdle ( 20 ) // 池里不会被释放的最多空闲连接数量。设置为0时表示无限制。
bs . setMaxWait ( 5000 ) // 在抛出异常之前,池等待连接被回收的最长时间(当没有可用连接时)。设置为-1表示无限等待。
bs . setMinEvictableIdleTimeMillis ( 10 * 1000 ) // 空闲连接5秒中后释放
bs . setTimeBetweenEvictionRunsMillis ( 1 * 60 * 1000 ) //1分钟检测一次是否有死掉的线程
bs . setTestOnBorrow ( true )
}
bs
}
/**
* 释放数据源
*/
def shutDownDataSource ( ) {
if (bs !=null ) {
bs . close ( )
}
}
/**
* 获取数据库连接
* @return
*/
def getConnection ( ) :Connection = {
var con :Connection = null
try {
if (bs !=null ) {
con = bs . getConnection ( )
} else {
con = getDataSource ( ) . getConnection ( )
}
} catch {
case e :Exception = > logger . error (e )
}
con
}
/**
* 关闭连接
*/
def closeCon (rs :ResultSet ,ps :PreparedStatement ,con :Connection ) {
if (rs !=null ) {
try {
rs . close ( )
} catch {
case e :Exception = > println (e .getMessage )
}
}
if (ps !=null ) {
try {
ps . close ( )
} catch {
case e :Exception = > println (e .getMessage )
}
}
if (con !=null ) {
try {
con . close ( )
} catch {
case e :Exception = > println (e .getMessage )
}
}
}
} |
4)数据容错 流处理消费kafka都会考虑到数据丢失问题,一般可以保存到任何存储系统,包括mysql,hdfs,hbase,redis,zookeeper等到。这里用SparkStreaming自带的checkpoint机制来实现应用重启时数据恢复。
checkpoint 这里采用的是checkpoint机制,在重启或者失败后重启可以直接读取上次没有完成的任务,从kafka对应offset读取数据。
// 初始化配置文件
ConfigFactory . initConfig ( )
val conf = new SparkConf ( ) . setAppName (ConfigFactory .sparkstreamname )
conf . set ( "spark.streaming.stopGracefullyOnShutdown" , "true" )
conf . set ( "spark.streaming.kafka.maxRatePerPartition" ,consumeRate )
conf . set ( "spark.default.parallelism" , "24" )
val sc = new SparkContext (conf )
while ( true ) {
val ssc = StreamingContext . getOrCreate (ConfigFactory .checkpointdir + DateUtil . getDay ( 0 ) ,getStreamingContext _ )
ssc . start ( )
ssc . awaitTerminationOrTimeout (resetTime )
ssc . stop ( false , true )
} |
checkpoint是每天一个目录,在第二天凌晨定时销毁StreamingContext对象,重新统计计算pv,uv。
注意 ssc.stop(false,true)表示优雅地销毁StreamingContext对象,不能销毁SparkContext对象,ssc.stop(true,true)会停掉SparkContext对象,程序就直接停了。
应用迁移或者程序升级 在这个过程中,我们把应用升级了一下,比如说某个功能写的不够完善,或者有逻辑错误,这时候都是需要修改代码,重新打jar包的,这时候如果把程序停了,新的应用还是会读取老的checkpoint,可能会有两个问题:
- 执行的还是上一次的程序,因为checkpoint里面也有序列化的代码;
- 直接执行失败,反序列化失败;
其实有时候,修改代码后不用删除checkpoint也是可以直接生效,经过很多测试,我发现如果对数据的过滤操作导致数据过滤逻辑改变,还有状态操作保存修改,也会导致重启失败,只有删除checkpoint才行,可是实际中一旦删除checkpoint,就会导致上一次未完成的任务和消费kafka的offset丢失,直接导致数据丢失,这种情况下我一般这么做。
这种情况一般是在另外一个集群,或者把checkpoint目录修改下,我们是代码与配置文件分离,所以修改配置文件checkpoint的位置还是很方便的。然后两个程序一起跑,除了checkpoint目录不一样,会重新建,都插入同一个数据库,跑一段时间后,把旧的程序停掉就好。以前看官网这么说,只能记住不能清楚明了,只有自己做时才会想一下办法去保证数据准确。
5)日志 日志用的log4j2,本地保存一份,ERROR级别的日志会通过邮件发送到手机。
val logger = LogManager . getLogger (HelperHandle .getClass .getSimpleName )
// 邮件level=error日志
val logger2 = LogManager . getLogger ( "email" ) |
3、主要代码 文件:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns = "http://maven.apache.org/POM/4.0.0 "
xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance "
xsi:schemaLocation = "http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd " >
<modelVersion >4.0.0 </modelVersion >
<groupId >com.2144 </groupId >
<artifactId >ipflow </artifactId >
<version >1.0-SNAPSHOT </version >
<!-- 版本信息 -->
<properties >
<project.build.sourceEncoding >UTF-8 </project.build.sourceEncoding >
<log4j.version >2.5 </log4j.version >
<scala.version >2.11.8 </scala.version >
<spark.version >2.0.2 </spark.version >
<hadoop.version >2.6.5 </hadoop.version >
<pkg.name >ipflow_flash_count </pkg.name >
</properties >
<dependencies >
<dependency >
<groupId >junit </groupId >
<artifactId >junit </artifactId >
<version >4.12 </version >
<scope >provided </scope >
</dependency >
<dependency >
<groupId >org.apache.spark </groupId >
<artifactId >spark-core_2.11 </artifactId >
<version >${spark.version} </version >
<scope >provided </scope >
</dependency >
<dependency >
<groupId >org.apache.spark </groupId >
<artifactId >spark-streaming_2.11 </artifactId >
<version >${spark.version} </version >
<scope >provided </scope >
</dependency >
<dependency >
<groupId >mysql </groupId >
<artifactId >mysql-connector-java </artifactId >
<version >5.1.40 </version >
</dependency >
<dependency >
<groupId >commons-dbcp </groupId >
<artifactId >commons-dbcp </artifactId >
<version >1.4 </version >
<scope >provided </scope >
</dependency >
<!-- zookeeper连接工具 -->
<dependency >
<groupId >com.101tec </groupId >
<artifactId >zkclient </artifactId >
<version >0.8 </version >
<scope >provided </scope >
</dependency >
<dependency >
<groupId >org.apache.hadoop </groupId >
<artifactId >hadoop-hdfs </artifactId >
<version >2.6.5 </version >
<scope >provided </scope >
</dependency >
<!--<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.0</version>
</dependency>-->
<!--
sparkstreaming与kafka包 不需要org.apache.kafka相关依赖,
这个已经在spark-streaming-kafka-0-8_2.11中包含了,并且不同的版本可能出现不兼容的问题
-->
<dependency >
<groupId >org.apache.spark </groupId >
<artifactId >spark-streaming-kafka-0-8_2.11 </artifactId >
<version >${spark.version} </version >
</dependency >
<dependency >
<groupId >redis.clients </groupId >
<artifactId >jedis </artifactId >
<version >2.9.0 </version >
</dependency >
<dependency >
<groupId >com.alibaba </groupId >
<artifactId >fastjson </artifactId >
<version >1.2.9 </version >
</dependency >
<!-- logger 依赖 -->
<dependency >
<groupId >com.google.guava </groupId >
<artifactId >guava </artifactId >
<version >19.0 </version >
<scope >provided </scope >
</dependency >
<dependency >
<groupId >dom4j </groupId >
<artifactId >dom4j </artifactId >
<version >1.6.1 </version >
</dependency >
<dependency >
<groupId >org.jodd </groupId >
<artifactId >jodd-core </artifactId >
<version >3.9.1 </version >
</dependency >
<dependency >
<groupId >org.apache.logging.log4j </groupId >
<artifactId >log4j-api </artifactId >
<version >${log4j.version} </version >
</dependency >
<dependency >
<groupId >org.apache.logging.log4j </groupId >
<artifactId >log4j-core </artifactId >
<version >${log4j.version} </version >
</dependency >
<!-- BloomFilter过滤依赖 -->
<dependency >
<groupId >com.github.wxisme </groupId >
<artifactId >bloomfilter </artifactId >
<version >1.0.0 </version >
</dependency >
</dependencies >
<build >
<!--测试代码和文件-->
<!--<testSourceDirectory>${basedir}/src/test</testSourceDirectory>-->
<finalName >${pkg.name} </finalName >
<sourceDirectory >src/main/java </sourceDirectory >
<resources >
<resource >
<directory >src/main/resources </directory >
<includes >
<include >*.properties </include >
<include >*.xml </include >
</includes >
<filtering >false </filtering >
</resource >
</resources >
<plugins >
<!-- 跳过测试插件-->
<plugin >
<groupId >org.apache.maven.plugins </groupId >
<artifactId >maven-surefire-plugin </artifactId >
<configuration >
<skip >true </skip >
</configuration >
</plugin >
<!--编译scala插件-->
<plugin >
<groupId >org.scala-tools </groupId >
<artifactId >maven-scala-plugin </artifactId >
<version >2.15.2 </version >
<executions >
<execution >
<goals >
<goal >compile </goal >
<goal >testCompile </goal >
</goals >
</execution >
</executions >
</plugin >
<!--打包插件-->
<plugin >
<groupId >org.apache.maven.plugins </groupId >
<artifactId >maven-shade-plugin </artifactId >
<version >2.2 </version >
<executions >
<execution >
<phase >package </phase >
<goals >
<goal >shade </goal >
</goals >
<configuration >
<filters >
<filter >
<artifact >*:* </artifact >
<excludes >
<exclude >META-INF/*.SF </exclude >
<exclude >META-INF/*.DSA </exclude >
<exclude >META-INF/*.RSA </exclude >
</excludes >
</filter >
</filters >
<transformers >
<transformer implementation = "org.apache.maven.plugins.shade.resource.AppendingTransformer " >
<resource >reference.conf </resource >
</transformer >
<transformer implementation = "org.apache.maven.plugins.shade.resource.AppendingTransformer " >
<resource >META-INF/services/org.apache.hadoop.fs.FileSystem </resource >
</transformer >
</transformers >
</configuration >
</execution >
</executions >
</plugin >
</plugins >
</build >
</project > |
读取配置文件代码
:
package com .js .ipflow .start ;
import com .google .common .io .Resources ;
import org .apache .logging .log4j .LogManager ;
import org .apache .logging .log4j .Logger ;
import org .dom4j .Document ;
import org .dom4j .DocumentException ;
import org .dom4j .Element ;
import org .dom4j .io .SAXReader ;
import java .io .File ;
public class ConfigFactory {
private final static Logger log = LogManager . getLogger ( "email" ) ;
public static String kafkaipport ;
public static String kafkazookeeper ;
public static String kafkatopic ;
public static String kafkagroupid ;
public static String mysqlurl ;
public static String mysqlusername ;
public static String mysqlpassword ;
public static String redishost ;
public static int redisport ;
public static String redispassword ;
public static int redistimeout ;
public static int rediskeyexists ;
public static String sparkstreamname ;
public static int sparkstreamseconds ;
public static String sparkstreammaster = "spark://qcloud-spark01:7077" ; // 仅供本地测试使用
public static String localpath ;
public static String checkpointdir ;
// public static String gracestopfile; // 优雅得kill掉程序
public static String keydeserilizer ;
public static String valuedeserilizer ;
/**
* 初始化所有的通用信息
*/
public static void initConfig ( ) { readCommons ( ) ; }
/**
* 读取commons.xml文件
*/
private static void readCommons ( ) {
SAXReader reader = new SAXReader ( ) ; // 构建xml解析器
Document document = null ;
try {
document = reader . read (Resources . getResource ( "commons.xml" ) ) ;
} catch ( DocumentException e ) {
log . error ( "ConfigFactory.readCommons" ,e ) ;
}
if (document != null ) {
Element root = document . getRootElement ( ) ;
Element kafkaElement = root . element ( "kafka" ) ;
kafkaipport = kafkaElement . element ( "ipport" ) . getText ( ) ;
kafkazookeeper = kafkaElement . element ( "zookeeper" ) . getText ( ) ;
kafkatopic = kafkaElement . element ( "topic" ) . getText ( ) ;
kafkagroupid = kafkaElement . element ( "groupid" ) . getText ( ) ;
keydeserilizer =kafkaElement . element ( "keySer" ) . getText ( ) ;
valuedeserilizer =kafkaElement . element ( "valSer" ) . getText ( ) ;
Element mysqlElement = root . element ( "mysql" ) ;
mysqlurl = mysqlElement . element ( "url" ) . getText ( ) ;
mysqlusername = mysqlElement . element ( "username" ) . getText ( ) ;
mysqlpassword = mysqlElement . element ( "password" ) . getText ( ) ;
Element redisElement = root . element ( "redis" ) ;
redishost = redisElement . element ( "host" ) . getText ( ) ;
redisport = Integer . valueOf (redisElement . element ( "port" ) . getText ( ) ) ;
redispassword = redisElement . element ( "password" ) . getText ( ) ;
redistimeout = Integer . valueOf (redisElement . element ( "timeout" ) . getText ( ) ) ;
rediskeyexists = Integer . valueOf (redisElement . element ( "keyexists" ) . getText ( ) ) ;
Element sparkElement = root . element ( "spark" ) ;
// sparkstreammaster = sparkElement.element("streammaster").getText();
sparkstreamname = sparkElement . element ( "streamname" ) . getText ( ) ;
sparkstreamseconds = Integer . valueOf (sparkElement . element ( "seconds" ) . getText ( ) ) ;
Element pathElement = root . element ( "path" ) ;
localpath = pathElement . element ( "localpath" ) . getText ( ) ;
checkpointdir = pathElement . element ( "checkpointdir" ) . getText ( ) ;
// gracestopfile = pathElement.element("gracestopfile").getText();
} else {
log . warn ( "commons.xml配置文件读取错误..." ) ;
}
}
} |
主要业务代码,如下:
package com .js .ipflow .flash .helper
import java .sql .Connection
import java .util . {Calendar , Date }
import com .alibaba .fastjson .JSON
import com .js .ipflow .start .ConfigFactory
import com .js .ipflow .utils . {DateUtil , MysqlPoolUtil , RedisPoolUtil }
import kafka .serializer .StringDecoder
import org .apache .logging .log4j .LogManager
import org .apache .spark .streaming .dstream .DStream
import org .apache .spark .streaming .kafka .KafkaUtils
import org .apache .spark .streaming . {Seconds , State , StateSpec , StreamingContext }
import org .apache .spark . {SparkConf , SparkContext }
import redis .clients .jedis .Jedis
object HelperHandle {
val logger = LogManager . getLogger (HelperHandle .getClass .getSimpleName )
// 邮件level=error日志
val logger2 = LogManager . getLogger ( "email" )
def main (args : Array [String ] ) : Unit = {
helperHandle ( args ( 0 ) )
}
def helperHandle (consumeRate : String ) : Unit = {
// 初始化配置文件
ConfigFactory . initConfig ( )
val conf = new SparkConf ( ) . setAppName (ConfigFactory .sparkstreamname )
conf . set ( "spark.streaming.stopGracefullyOnShutdown" , "true" )
conf . set ( "spark.streaming.kafka.maxRatePerPartition" , consumeRate )
conf . set ( "spark.default.parallelism" , "30" )
val sc = new SparkContext (conf )
while ( true ) {
val ssc = StreamingContext . getOrCreate (ConfigFactory .checkpointdir + DateUtil . getDay ( 0 ) , getStreamingContext _ )
ssc . start ( )
ssc . awaitTerminationOrTimeout (resetTime )
ssc . stop ( false , true )
}
def getStreamingContext ( ) : StreamingContext = {
val stateSpec = StateSpec . function (mapFunction )
val ssc = new StreamingContext (sc , Seconds (ConfigFactory .sparkstreamseconds ) )
ssc . checkpoint (ConfigFactory .checkpointdir + DateUtil . getDay ( 0 ) )
val zkQuorm = ConfigFactory .kafkazookeeper
val topics = ConfigFactory .kafkatopic
val topicSet = Set (topics )
val kafkaParams = Map [String , String ] (
"metadata.broker.list" - > (ConfigFactory .kafkaipport )
, "group.id" - > (ConfigFactory .kafkagroupid )
, "auto.offset.reset" - > kafka .api .OffsetRequest .LargestTimeString
)
val rmessage = KafkaUtils .createDirectStream [String , String , StringDecoder , StringDecoder ] (
ssc , kafkaParams , topicSet
)
// helper数据 (dateHour,guid,helperversion)
val helper_data = FilterHelper . getHelperData (rmessage . map (x = > {
val message = JSON . parseObject (x ._2 ) . getString ( "message" )
JSON . parseObject (message )
} ) ) . repartition ( 60 ) . cache ( )
// (guid, datehour + helperversion)
val helper_data_dis = helper_data . map (x = > (x ._2 , addTab (x ._1 ) + x ._3 ) ) . reduceByKey ( (x , y ) = > y )
// pv,uv
val helper_count = helper_data . map (x = > (x ._1 , 1L ) ) . mapWithState (stateSpec ) . stateSnapshots ( ) . repartition ( 2 )
// helperversion
val helper_helperversion_count = helper_data . map (x = > ( addTab (x ._1 ) + x ._3 , 1L ) ) . mapWithState (stateSpec ) . stateSnapshots ( ) . repartition ( 2 )
helper_data_dis . foreachRDD (rdd = > {
rdd . foreachPartition (eachPartition = > {
var jedis : Jedis = null
try {
jedis = getJedis
eachPartition . foreach (x = > {
val arr = x ._2 . split ( "\t" )
val date : String = arr ( 0 ) . split ( ":" ) ( 0 )
// helper 统计
val key0 = "helper_" + date
jedis . sadd (key0 , x ._1 )
jedis . expire (key0 , ConfigFactory .rediskeyexists )
// helperversion 统计
val key = date + "_" + arr ( 1 )
jedis . sadd (key , x ._1 )
jedis . expire (key , ConfigFactory .rediskeyexists )
} )
} catch {
case e : Exception = > {
logger . error (e )
logger2 . error (HelperHandle .getClass .getSimpleName + e )
}
} finally {
if (jedis != null ) {
closeJedis (jedis )
}
}
} )
} )
insertHelper (helper_helperversion_count , "statistic_realtime_flash_helper" , "date" , "hour" , "count_all" , "count" , "helperversion" , "datehour" , "dh" )
insertHelper (helper_count , "statistic_realtime_helper_count" , "date" , "hour" , "helper_count_all" , "helper_count" , "dh" )
ssc
}
}
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// 计算当前时间距离次日零点的时长(毫秒)
def resetTime = {
val now = new Date ( )
val todayEnd = Calendar .getInstance
todayEnd . set (Calendar .HOUR_OF_DAY , 23 ) // Calendar.HOUR 12小时制
todayEnd . set (Calendar .MINUTE , 59 )
todayEnd . set (Calendar .SECOND , 59 )
todayEnd . set (Calendar .MILLISECOND , 999 )
todayEnd .getTimeInMillis - now .getTime
}
/**
* 插入数据
*
* @param data (addTab(datehour)+helperversion)
* @param tbName
* @param colNames
*/
def insertHelper (data : DStream [ (String , Long ) ] , tbName : String , colNames : String * ) : Unit = {
data . foreachRDD (rdd = > {
val tmp_rdd = rdd . map (x = > x ._1 . substring ( 11 , 13 ) .toInt )
if ( !rdd . isEmpty ( ) ) {
val hour_now = tmp_rdd . max ( ) // 获取当前结果中最大的时间,在数据恢复中可以起作用
rdd . foreachPartition (eachPartition = > {
var jedis : Jedis = null
var conn : Connection = null
try {
jedis = getJedis
conn = MysqlPoolUtil . getConnection ( )
conn . setAutoCommit ( false )
val stmt = conn . createStatement ( )
eachPartition . foreach (x = > {
if (colNames .length == 7 ) {
val datehour = x ._1 . split ( "\t" ) ( 0 )
val helperversion = x ._1 . split ( "\t" ) ( 1 )
val date_hour = datehour . split ( ":" )
val date = date_hour ( 0 )
val hour = date_hour ( 1 ) .toInt
val colName0 = colNames ( 0 ) // date
val colName1 = colNames ( 1 ) // hour
val colName2 = colNames ( 2 ) // count_all
val colName3 = colNames ( 3 ) // count
val colName4 = colNames ( 4 ) // helperversion
val colName5 = colNames ( 5 ) // datehour
val colName6 = colNames ( 6 ) // dh
val colValue0 = addYin (date )
val colValue1 = hour
val colValue2 = x ._2 .toInt
val colValue3 = jedis . scard (date + "_" + helperversion ) // // 2018-07-08_10.0.1.22
val colValue4 = addYin (helperversion )
var colValue5 = if (hour < 10 ) "'" + date + " 0" + hour + ":00 " + helperversion + "'" else "'" + date + " " + hour + ":00 " + helperversion + "'"
val colValue6 = if (hour < 10 ) "'" + date + " 0" + hour + ":00'" else "'" + date + " " + hour + ":00'"
var sql = ""
if (hour == hour_now ) { // uv只对现在更新
sql = s "insert into ${tbName}(${colName0},${colName1},${colName2},${colName3},${colName4},${colName5},${colName6}) values(${colValue0},${colValue1},${colValue2},${colValue3},${colValue4},${colValue5},${colValue6}) on duplicate key update ${colName2} = ${colValue2},${colName3} = ${colValue3}"
logger . warn (sql )
stmt . addBatch (sql )
} /* else {
sql = s"insert into ${tbName}(${colName0},${colName1},${colName2},${colName4},${colName5},${colName6}) values(${colValue0},${colValue1},${colValue2},${colValue4},${colValue5},${colValue6}) on duplicate key update ${colName2} = ${colValue2}"
}*/
} else if (colNames .length == 5 ) {
val date_hour = x ._1 . split ( ":" )
val date = date_hour ( 0 )
val hour = date_hour ( 1 ) .toInt
val colName0 = colNames ( 0 ) // date
val colName1 = colNames ( 1 ) // hour
val colName2 = colNames ( 2 ) // helper_count_all
val colName3 = colNames ( 3 ) // helper_count
val colName4 = colNames ( 4 ) // dh
val colValue0 = addYin (date )
val colValue1 = hour
val colValue2 = x ._2 .toInt
val colValue3 = jedis . scard ( "helper_" + date ) // // helper_2018-07-08
val colValue4 = if (hour < 10 ) "'" + date + " 0" + hour + ":00'" else "'" + date + " " + hour + ":00'"
var sql = ""
if (hour == hour_now ) { // uv只对现在更新
sql = s "insert into ${tbName}(${colName0},${colName1},${colName2},${colName3},${colName4}) values(${colValue0},${colValue1},${colValue2},${colValue3},${colValue4}) on duplicate key update ${colName2} = ${colValue2},${colName3} = ${colValue3}"
logger . warn (sql )
stmt . addBatch (sql )
}
}
} )
stmt . executeBatch ( ) // 批量执行sql语句
conn . commit ( )
} catch {
case e : Exception = > {
logger . error (e )
logger2 . error (HelperHandle .getClass .getSimpleName + e )
}
} finally {
if (jedis != null ) {
closeJedis (jedis )
}
if (conn != null ) {
conn . close ( )
}
}
} )
}
} )
}
def addYin (str : String ) : String = {
"'" + str + "'"
}
// 字符串添加tab格式化方法
def addTab (str : String ) : String = {
str + "\t" ;
}
// 实时流量状态更新函数
val mapFunction = (datehour : String , pv : Option [Long ] , state : State [Long ] ) = > {
val accuSum = pv . getOrElse ( 0L ) + state . getOption ( ) . getOrElse ( 0L )
val output = (datehour , accuSum )
state . update (accuSum )
output
}
// 获取jedis连接
def getJedis : Jedis = {
val jedis = RedisPoolUtil .getPool .getResource
jedis
}
// 释放jedis连接
def closeJedis (jedis : Jedis ) : Unit = {
RedisPoolUtil .getPool . returnResource (jedis )
}
} |
----------------------------
原文链接:https://blog.csdn.net/ddxygq/article/details/81258643
程序猿的技术大观园:www.javathinker.net
[这个贴子最后由 flybird 在 2020-03-22 11:01:02 重新编辑]
|
|