>>分享SPSS,Hadoop等大数据处理技术,以及分布式架构以及集群系统的构建 书籍支持  卫琴直播  品书摘要  在线测试  资源下载  联系我们
发表一个新主题 开启一个新投票 回复文章 您是本文章第 20118 个阅读者 刷新本主题
 * 贴子主题:  实时统计每天pv,uv的sparkStreaming结合redis结果存入mysql供前端展示 回复文章 点赞(0)  收藏  
作者:mary    发表时间:2020-03-21 17:15:56     消息  查看  搜索  好友  邮件  复制  引用

              

实时统计每天pv,uv的sparkStreaming

结合redis结果存入mysql供前端展示

          最近有个需求,实时统计pv,uv,结果按照date,hour,pv,uv来展示,按天统计,第二天重新统计,当然了实际还需要按照类型字段分类统计pv,uv,比如按照date,hour,pv,uv,type来展示。这里介绍最基本的pv,uv的展示。    
id uv pv date hour
1 155599 306053 2018-07-27 18


关于什么是pv,uv,可以参见这篇博客https://blog.csdn.net/petermsh/article/details/78652246

1、项目流程

点击在新窗口中浏览原图
CTRL+鼠标滚轮放大或缩小

日志数据从flume采集过来,落到hdfs供其它离线业务使用,也会sink到kafka,sparkStreaming从kafka拉数据过来,计算pv,uv,uv是用的redis的set集合去重,最后把结果写入mysql数据库,供前端展示使用。

2、具体过程

1)pv的计算

拉取数据有两种方式,基于received和direct方式,这里用direct直拉的方式,用的mapWithState算子保存状态,这个算子与updateStateByKey一样,并且性能更好。当然了实际中数据过来需要经过清洗,过滤,才能使用。

     定义一个状态函数    

    // 实时流量状态更新函数
  val mapFunction  =  (datehour :String , pv :Option [Long ] , state :State [Long ] )  = >  {
    val accuSum  = pv . getOrElse ( 0L )  + state . getOption ( ) . getOrElse ( 0L )
    val output  =  (datehour ,accuSum )
    state . update (accuSum )
    output
   }

  计算pv
val stateSpec  = StateSpec . function (mapFunction )
val helper_count_all  = helper_data . map (x  = >  (x ._1 , 1L ) ) . mapWithState (stateSpec ) . stateSnapshots ( ) . repartition ( 2 )

这样就很容易的把pv计算出来了。

2)uv的计算

uv是要全天去重的,每次进来一个batch的数据,如果用原生的reduceByKey或者groupByKey对配置要求太高,在配置较低情况下,我们申请了一个93G的redis用来去重,原理是每进来一条数据,将date作为key,guid加入set集合,20秒刷新一次,也就是将set集合的尺寸取出来,更新一下数据库即可。    

   helper_data_dis . foreachRDD (rdd  = >  {
  rdd . foreachPartition (eachPartition  = >  {
    var jedis : Jedis  = null
     try  {
      jedis  = getJedis
      eachPartition . foreach (x  = >  {
        val arr  = x ._2 . split ( "\t" )
        val date : String  =  arr ( 0 ) . split ( ":" ) ( 0 )

         // helper 统计
        val key0  =  "helper_"  + date
        jedis . sadd (key0 , x ._1 )
        jedis . expire (key0 , ConfigFactory .rediskeyexists )
         // helperversion 统计
        val key  = date  +  "_"  +  arr ( 1 )
        jedis . sadd (key , x ._1 )
        jedis . expire (key , ConfigFactory .rediskeyexists )
       } )
     }  catch  {
       case e : Exception  = >  {
        logger . error (e )
        logger2 . error (HelperHandle .getClass .getSimpleName  + e )
       }
     }  finally  {
       if  (jedis  != null )  {
         closeJedis (jedis )
       }
     }
   } )
} )

// 获取jedis连接
def getJedis : Jedis  =  {
  val jedis  = RedisPoolUtil .getPool .getResource
  jedis
}

// 释放jedis连接
def  closeJedis (jedis : Jedis ) : Unit  =  {
  RedisPoolUtil .getPool . returnResource (jedis )
}

redis连接池代码

RedisPoolUtil.scala

:    

   package com .js .ipflow .utils
import com .js .ipflow .start .ConfigFactory
import org .apache .commons .pool2 .impl .GenericObjectPoolConfig
import redis .clients .jedis .JedisPool

/**
  * redis 连接池工具类
  * @author keguang
  */


object RedisPoolUtil  extends Serializable {
  @ transient  private var pool : JedisPool  = null

   /**
    * 读取jedis配置信息, 出发jedis初始化
    */

  def initJedis : Unit  = {
    ConfigFactory . initConfig ( )
    val maxTotal  =  50
    val maxIdle  =  30
    val minIdle  =  10
    val redisHost  = ConfigFactory .redishost
    val redisPort  = ConfigFactory .redisport
    val redisTimeout  = ConfigFactory .redistimeout
    val redisPassword  = ConfigFactory .redispassword
     makePool (redisHost , redisPort , redisTimeout , redisPassword , maxTotal , maxIdle , minIdle )
   }

  def  makePool (redisHost : String , redisPort : Int , redisTimeout : Int ,redisPassword :String , maxTotal : Int , maxIdle : Int , minIdle : Int ) : Unit  =  {
    init (redisHost , redisPort , redisTimeout , redisPassword , maxTotal , maxIdle , minIdle ,  true ,  false ,  10000 )
   }

   /**
    * 初始化jedis连接池
    * @param redisHost host
    * @param redisPort 端口
    * @param redisTimeout 连接redis超时时间
    * @param redisPassword redis密码
    * @param maxTotal 总的连接数
    * @param maxIdle 最大空闲连接数
    * @param minIdle 最小空闲连接数
    * @param testOnBorrow
    * @param testOnReturn
    * @param maxWaitMillis
    */

  def  init (redisHost : String , redisPort : Int , redisTimeout : Int ,redisPassword :String , maxTotal : Int , maxIdle : Int , minIdle : Int , testOnBorrow : Boolean , testOnReturn : Boolean , maxWaitMillis : Long ) : Unit  =  {
     if  (pool  == null )  {
      val poolConfig  =  new  GenericObjectPoolConfig ( )
      poolConfig . setMaxTotal (maxTotal )
      poolConfig . setMaxIdle (maxIdle )
      poolConfig . setMinIdle (minIdle )
      poolConfig . setTestOnBorrow (testOnBorrow )
      poolConfig . setTestOnReturn (testOnReturn )
      poolConfig . setMaxWaitMillis (maxWaitMillis )
      pool  =  new  JedisPool (poolConfig , redisHost , redisPort , redisTimeout ,redisPassword )

      val hook  =  new  Thread  {
        override def run  = pool . destroy ( )
       }
      sys . addShutdownHook (hook .run )
     }
   }

  def getPool : JedisPool  =  {
     if (pool  == null ) {
      initJedis
     }
    pool
   }

}

3)结果保存到数据库

结果保存到mysql,数据库,20秒刷新一次数据库,前端展示刷新一次,就会重新查询一次数据库,做到实时统计展示pv,uv的目的。    

   /**
  * 插入数据
  *
  * @param data (addTab(datehour)+helperversion)
  * @param tbName
  * @param colNames
  */

def  insertHelper (data : DStream [ (String , Long ) ] , tbName : String , colNames : String * ) : Unit  =  {
  data . foreachRDD (rdd  = >  {
    val tmp_rdd  = rdd . map (x  = > x ._1 . substring ( 11 ,  13 ) .toInt )
     if  ( !rdd . isEmpty ( ) )  {
      val hour_now  = tmp_rdd . max ( )  // 获取当前结果中最大的时间,在数据恢复中可以起作用
      rdd . foreachPartition (eachPartition  = >  {
        var jedis : Jedis  = null
        var conn : Connection  = null
         try  {
          jedis  = getJedis
          conn  = MysqlPoolUtil . getConnection ( )
          conn . setAutoCommit ( false )
          val stmt  = conn . createStatement ( )
          eachPartition . foreach (x  = >  {
             if  (colNames .length  ==  7 )  {
              val datehour  = x ._1 . split ( "\t" ) ( 0 )
              val helperversion  = x ._1 . split ( "\t" ) ( 1 )
              val date_hour  = datehour . split ( ":" )
              val date  =  date_hour ( 0 )
              val hour  =  date_hour ( 1 ) .toInt

              val colName0  =  colNames ( 0 )  // date
              val colName1  =  colNames ( 1 )  // hour
              val colName2  =  colNames ( 2 )  // count_all
              val colName3  =  colNames ( 3 )  // count
              val colName4  =  colNames ( 4 )  // helperversion
              val colName5  =  colNames ( 5 )  // datehour
              val colName6  =  colNames ( 6 )  // dh

              val colValue0  =  addYin (date )
              val colValue1  = hour
              val colValue2  = x ._2 .toInt
              val colValue3  = jedis . scard (date  +  "_"  + helperversion )  // // 2018-07-08_10.0.1.22
              val colValue4  =  addYin (helperversion )
              var colValue5  =  if  (hour  <  10 )  "'"  + date  +  " 0"  + hour  +  ":00 "  + helperversion  +  "'"  else  "'"  + date  +  " "  + hour  +  ":00 "  + helperversion  +  "'"
              val colValue6  =  if  (hour  <  10 )  "'"  + date  +  " 0"  + hour  +  ":00'"  else  "'"  + date  +  " "  + hour  +  ":00'"

              var sql  =  ""
               if  (hour  == hour_now )  {  // uv只对现在更新
                sql  = s "insert into ${tbName}(${colName0},${colName1},${colName2},${colName3},${colName4},${colName5},${colName6}) values(${colValue0},${colValue1},${colValue2},${colValue3},${colValue4},${colValue5},${colValue6}) on duplicate key update ${colName2} =  ${colValue2},${colName3} = ${colValue3}"
                logger . warn (sql )
                stmt . addBatch (sql )
               }  /* else {
              sql = s"insert into ${tbName}(${colName0},${colName1},${colName2},${colName4},${colName5},${colName6}) values(${colValue0},${colValue1},${colValue2},${colValue4},${colValue5},${colValue6}) on duplicate key update ${colName2} =  ${colValue2}"
            }*/

             }  else  if  (colNames .length  ==  5 )  {
              val date_hour  = x ._1 . split ( ":" )
              val date  =  date_hour ( 0 )
              val hour  =  date_hour ( 1 ) .toInt
              val colName0  =  colNames ( 0 )  // date
              val colName1  =  colNames ( 1 )  // hour
              val colName2  =  colNames ( 2 )  // helper_count_all
              val colName3  =  colNames ( 3 )  // helper_count
              val colName4  =  colNames ( 4 )  // dh

              val colValue0  =  addYin (date )
              val colValue1  = hour
              val colValue2  = x ._2 .toInt
              val colValue3  = jedis . scard ( "helper_"  + date )  // // helper_2018-07-08
              val colValue4  =  if  (hour  <  10 )  "'"  + date  +  " 0"  + hour  +  ":00'"  else  "'"  + date  +  " "  + hour  +  ":00'"

              var sql  =  ""
               if  (hour  == hour_now )  {  // uv只对现在更新
                sql  = s "insert into ${tbName}(${colName0},${colName1},${colName2},${colName3},${colName4}) values(${colValue0},${colValue1},${colValue2},${colValue3},${colValue4}) on duplicate key update ${colName2} =  ${colValue2},${colName3} = ${colValue3}"
                logger . warn (sql )
                stmt . addBatch (sql )
               }
             }
           } )
          stmt . executeBatch ( )  // 批量执行sql语句
          conn . commit ( )
         }  catch  {
           case e : Exception  = >  {
            logger . error (e )
            logger2 . error (HelperHandle .getClass .getSimpleName  + e )
           }
         }  finally  {
           if  (jedis  != null )  {
             closeJedis (jedis )
           }

           if (conn  != null ) {
            conn . close ( )
           }
         }
       } )
     }
   } )
}
  
// 计算当前时间距离次日零点的时长(毫秒)
def resetTime  =  {
    val now  =  new  Date ( )
    val todayEnd  = Calendar .getInstance
    todayEnd . set (Calendar .HOUR_OF_DAY ,  23 )  // Calendar.HOUR 12小时制
    todayEnd . set (Calendar .MINUTE ,  59 )
    todayEnd . set (Calendar .SECOND ,  59 )
    todayEnd . set (Calendar .MILLISECOND ,  999 )
    todayEnd .getTimeInMillis  - now .getTime
  }

msql 连接池代码

MysqlPoolUtil.scala

  package com .js .ipflow .utils

import java .sql . {Connection , PreparedStatement , ResultSet }

import com .js .ipflow .start .ConfigFactory
import org .apache .commons .dbcp .BasicDataSource
import org .apache .logging .log4j .LogManager

/**
  *jdbc mysql 连接池工具类
  * @author keguang
  */

object MysqlPoolUtil  {

  val logger  = LogManager . getLogger (MysqlPoolUtil .getClass .getSimpleName )

   private var bs :BasicDataSource  = null

   /**
    * 创建数据源
    * @return
    */

  def  getDataSource ( ) :BasicDataSource = {
     if (bs ==null ) {
      ConfigFactory . initConfig ( )
      bs  =  new  BasicDataSource ( )
      bs . setDriverClassName ( "com.mysql.jdbc.Driver" )
      bs . setUrl (ConfigFactory .mysqlurl )
      bs . setUsername (ConfigFactory .mysqlusername )
      bs . setPassword (ConfigFactory .mysqlpassword )
      bs . setMaxActive ( 50 )            // 设置最大并发数
      bs . setInitialSize ( 20 )           // 数据库初始化时,创建的连接个数
      bs . setMinIdle ( 20 )               // 在不新建连接的条件下,池中保持空闲的最少连接数。
      bs . setMaxIdle ( 20 )              // 池里不会被释放的最多空闲连接数量。设置为0时表示无限制。
      bs . setMaxWait ( 5000 )              // 在抛出异常之前,池等待连接被回收的最长时间(当没有可用连接时)。设置为-1表示无限等待。
      bs . setMinEvictableIdleTimeMillis ( 10 * 1000 )      // 空闲连接5秒中后释放
      bs . setTimeBetweenEvictionRunsMillis ( 1 * 60 * 1000 )       //1分钟检测一次是否有死掉的线程
      bs . setTestOnBorrow ( true )
     }
    bs
   }

   /**
    * 释放数据源
    */

  def  shutDownDataSource ( ) {
     if (bs !=null ) {
      bs . close ( )
     }
   }

   /**
    * 获取数据库连接
    * @return
    */

  def  getConnection ( ) :Connection = {
    var con :Connection  = null
     try  {
       if (bs !=null ) {
        con  = bs . getConnection ( )
       } else {
        con  =  getDataSource ( ) . getConnection ( )
       }
     }  catch {
       case e :Exception  = > logger . error (e )
     }
    con
   }

   /**
    * 关闭连接
    */

  def  closeCon (rs :ResultSet  ,ps :PreparedStatement ,con :Connection ) {
     if (rs !=null ) {
       try  {
        rs . close ( )
       }  catch {
         case e :Exception  = >  println (e .getMessage )
       }
     }
     if (ps !=null ) {
       try  {
        ps . close ( )
       }  catch {
         case e :Exception  = >  println (e .getMessage )
       }
     }
     if (con !=null ) {
       try  {
        con . close ( )
       }  catch {
         case e :Exception  = >  println (e .getMessage )
       }
     }
   }
}

4)数据容错

流处理消费kafka都会考虑到数据丢失问题,一般可以保存到任何存储系统,包括mysql,hdfs,hbase,redis,zookeeper等到。这里用SparkStreaming自带的checkpoint机制来实现应用重启时数据恢复。
checkpoint
这里采用的是checkpoint机制,在重启或者失败后重启可以直接读取上次没有完成的任务,从kafka对应offset读取数据。    

  // 初始化配置文件
ConfigFactory . initConfig ( )

val conf  =  new  SparkConf ( ) . setAppName (ConfigFactory .sparkstreamname )
conf . set ( "spark.streaming.stopGracefullyOnShutdown" , "true" )
conf . set ( "spark.streaming.kafka.maxRatePerPartition" ,consumeRate )
conf . set ( "spark.default.parallelism" , "24" )
val sc  =  new  SparkContext (conf )

while  ( true ) {
val ssc  = StreamingContext . getOrCreate (ConfigFactory .checkpointdir  + DateUtil . getDay ( 0 ) ,getStreamingContext _  )
    ssc . start ( )
    ssc . awaitTerminationOrTimeout (resetTime )
    ssc . stop ( false , true )
}

  checkpoint是每天一个目录,在第二天凌晨定时销毁StreamingContext对象,重新统计计算pv,uv。

注意

ssc.stop(false,true)表示优雅地销毁StreamingContext对象,不能销毁SparkContext对象,ssc.stop(true,true)会停掉SparkContext对象,程序就直接停了。
应用迁移或者程序升级
在这个过程中,我们把应用升级了一下,比如说某个功能写的不够完善,或者有逻辑错误,这时候都是需要修改代码,重新打jar包的,这时候如果把程序停了,新的应用还是会读取老的checkpoint,可能会有两个问题:
  1. 执行的还是上一次的程序,因为checkpoint里面也有序列化的代码;
  2. 直接执行失败,反序列化失败;
其实有时候,修改代码后不用删除checkpoint也是可以直接生效,经过很多测试,我发现如果对数据的过滤操作导致数据过滤逻辑改变,还有状态操作保存修改,也会导致重启失败,只有删除checkpoint才行,可是实际中一旦删除checkpoint,就会导致上一次未完成的任务和消费kafka的offset丢失,直接导致数据丢失,这种情况下我一般这么做。
  这种情况一般是在另外一个集群,或者把checkpoint目录修改下,我们是代码与配置文件分离,所以修改配置文件checkpoint的位置还是很方便的。然后两个程序一起跑,除了checkpoint目录不一样,会重新建,都插入同一个数据库,跑一段时间后,把旧的程序停掉就好。以前看官网这么说,只能记住不能清楚明了,只有自己做时才会想一下办法去保证数据准确。

5)日志

日志用的log4j2,本地保存一份,ERROR级别的日志会通过邮件发送到手机。    

val logger  = LogManager . getLogger (HelperHandle .getClass .getSimpleName )
   // 邮件level=error日志
  val logger2  = LogManager . getLogger ( "email" )

3、主要代码

pom.xml

文件:    

  <?xml version="1.0" encoding="UTF-8"?>
   <project  xmlns  = "http://maven.apache.org/POM/4.0.0 "
           xmlns:xsi  = "http://www.w3.org/2001/XMLSchema-instance "
           xsi:schemaLocation  = "http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd " >
       <modelVersion >4.0.0   </modelVersion >

       <groupId >com.2144   </groupId >
       <artifactId >ipflow   </artifactId >
       <version >1.0-SNAPSHOT   </version >

     <!-- 版本信息 -->
       <properties >
           <project.build.sourceEncoding >UTF-8   </project.build.sourceEncoding >
           <log4j.version >2.5   </log4j.version >
           <scala.version >2.11.8   </scala.version >
           <spark.version >2.0.2   </spark.version >
           <hadoop.version >2.6.5   </hadoop.version >
           <pkg.name >ipflow_flash_count   </pkg.name >
       </properties >

       <dependencies >
           <dependency >
               <groupId >junit   </groupId >
               <artifactId >junit   </artifactId >
               <version >4.12   </version >
               <scope >provided   </scope >
           </dependency >
           <dependency >
               <groupId >org.apache.spark   </groupId >
               <artifactId >spark-core_2.11   </artifactId >
               <version >${spark.version}   </version >
               <scope >provided   </scope >
           </dependency >
           <dependency >
               <groupId >org.apache.spark   </groupId >
               <artifactId >spark-streaming_2.11   </artifactId >
               <version >${spark.version}   </version >
               <scope >provided   </scope >
           </dependency >
           <dependency >
               <groupId >mysql   </groupId >
               <artifactId >mysql-connector-java   </artifactId >
               <version >5.1.40   </version >
           </dependency >
           <dependency >
               <groupId >commons-dbcp   </groupId >
               <artifactId >commons-dbcp   </artifactId >
               <version >1.4   </version >
               <scope >provided   </scope >
           </dependency >
         <!-- zookeeper连接工具 -->
           <dependency >
               <groupId >com.101tec   </groupId >
               <artifactId >zkclient   </artifactId >
               <version >0.8   </version >
               <scope >provided   </scope >
           </dependency >
           <dependency >
               <groupId >org.apache.hadoop   </groupId >
               <artifactId >hadoop-hdfs   </artifactId >
               <version >2.6.5   </version >
               <scope >provided   </scope >
           </dependency >

         <!--<dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>1.1.0</version>
        </dependency>-->

         <!--
            sparkstreaming与kafka包 不需要org.apache.kafka相关依赖,
            这个已经在spark-streaming-kafka-0-8_2.11中包含了,并且不同的版本可能出现不兼容的问题
        -->
           <dependency >
               <groupId >org.apache.spark   </groupId >
               <artifactId >spark-streaming-kafka-0-8_2.11   </artifactId >
               <version >${spark.version}   </version >
           </dependency >
           <dependency >
               <groupId >redis.clients   </groupId >
               <artifactId >jedis   </artifactId >
               <version >2.9.0   </version >
           </dependency >
           <dependency >
               <groupId >com.alibaba   </groupId >
               <artifactId >fastjson   </artifactId >
               <version >1.2.9   </version >
           </dependency >

         <!-- logger 依赖 -->
           <dependency >
               <groupId >com.google.guava   </groupId >
               <artifactId >guava   </artifactId >
               <version >19.0   </version >
               <scope >provided   </scope >
           </dependency >
           <dependency >
               <groupId >dom4j   </groupId >
               <artifactId >dom4j   </artifactId >
               <version >1.6.1   </version >
           </dependency >
           <dependency >
               <groupId >org.jodd   </groupId >
               <artifactId >jodd-core   </artifactId >
               <version >3.9.1   </version >
           </dependency >

           <dependency >
               <groupId >org.apache.logging.log4j   </groupId >
               <artifactId >log4j-api   </artifactId >
               <version >${log4j.version}   </version >
           </dependency >
           <dependency >
               <groupId >org.apache.logging.log4j   </groupId >
               <artifactId >log4j-core   </artifactId >
               <version >${log4j.version}   </version >
           </dependency >

         <!-- BloomFilter过滤依赖 -->
           <dependency >
               <groupId >com.github.wxisme   </groupId >
               <artifactId >bloomfilter   </artifactId >
               <version >1.0.0   </version >
           </dependency >
       </dependencies >
       <build >
         <!--测试代码和文件-->
         <!--<testSourceDirectory>${basedir}/src/test</testSourceDirectory>-->
           <finalName >${pkg.name}   </finalName >
           <sourceDirectory >src/main/java   </sourceDirectory >
           <resources >
               <resource >
                   <directory >src/main/resources   </directory >
                   <includes >
                       <include >*.properties   </include >
                       <include >*.xml   </include >
                   </includes >
                   <filtering >false   </filtering >
               </resource >
           </resources >
           <plugins >
             <!-- 跳过测试插件-->
               <plugin >
                   <groupId >org.apache.maven.plugins   </groupId >
                   <artifactId >maven-surefire-plugin   </artifactId >
                   <configuration >
                       <skip >true   </skip >
                   </configuration >
               </plugin >
             <!--编译scala插件-->
               <plugin >
                   <groupId >org.scala-tools   </groupId >
                   <artifactId >maven-scala-plugin   </artifactId >
                   <version >2.15.2   </version >
                   <executions >
                       <execution >
                           <goals >
                               <goal >compile   </goal >
                               <goal >testCompile   </goal >
                           </goals >
                       </execution >
                   </executions >
               </plugin >
             <!--打包插件-->
               <plugin >
                   <groupId >org.apache.maven.plugins   </groupId >
                   <artifactId >maven-shade-plugin   </artifactId >
                   <version >2.2   </version >
                   <executions >
                       <execution >
                           <phase >package   </phase >
                           <goals >
                               <goal >shade   </goal >
                           </goals >
                           <configuration >
                               <filters >
                                   <filter >
                                       <artifact >*:*   </artifact >
                                       <excludes >
                                           <exclude >META-INF/*.SF   </exclude >
                                           <exclude >META-INF/*.DSA   </exclude >
                                           <exclude >META-INF/*.RSA   </exclude >
                                       </excludes >
                                   </filter >
                               </filters >
                               <transformers >
                                   <transformer  implementation  = "org.apache.maven.plugins.shade.resource.AppendingTransformer " >
                                       <resource >reference.conf   </resource >
                                   </transformer >
                                   <transformer  implementation  = "org.apache.maven.plugins.shade.resource.AppendingTransformer " >
                                       <resource >META-INF/services/org.apache.hadoop.fs.FileSystem   </resource >
                                   </transformer >
                               </transformers >
                           </configuration >
                       </execution >
                   </executions >
               </plugin >
           </plugins >

       </build >
   </project >

读取配置文件代码

ConfigFactory .java

:    

  package com .js .ipflow .start ;

import com .google .common .io .Resources ;
import org .apache .logging .log4j .LogManager ;
import org .apache .logging .log4j .Logger ;
import org .dom4j .Document ;
import org .dom4j .DocumentException ;
import org .dom4j .Element ;
import org .dom4j .io .SAXReader ;

import java .io .File ;

public  class  ConfigFactory  {
     private  final  static Logger log  = LogManager . getLogger ( "email" ) ;

     public  static String kafkaipport ;
     public  static String kafkazookeeper ;
     public  static String kafkatopic ;
     public  static String kafkagroupid ;
     public  static String mysqlurl ;
     public  static String mysqlusername ;
     public  static String mysqlpassword ;
     public  static String redishost ;
     public  static  int redisport ;
     public  static String redispassword ;
     public  static  int redistimeout ;
     public  static  int rediskeyexists ;
     public  static String sparkstreamname ;
     public  static  int sparkstreamseconds ;
     public  static String sparkstreammaster  =  "spark://qcloud-spark01:7077" ;  // 仅供本地测试使用
     public  static String localpath ;
     public  static String checkpointdir ;
     // public static String gracestopfile; // 优雅得kill掉程序
     public  static String keydeserilizer ;
     public  static String valuedeserilizer ;

     /**
     * 初始化所有的通用信息
     */

     public  static  void  initConfig ( ) { readCommons ( ) ; }

     /**
     * 读取commons.xml文件
     */

     private  static  void  readCommons ( ) {
        SAXReader reader  =  new  SAXReader ( ) ;  // 构建xml解析器
        Document document  = null ;
         try {
            document  = reader . read (Resources . getResource ( "commons.xml" ) ) ;
         } catch  ( DocumentException e ) {
            log . error ( "ConfigFactory.readCommons" ,e ) ;
         }

         if (document  != null ) {
            Element root  = document . getRootElement ( ) ;

            Element kafkaElement  = root . element ( "kafka" ) ;
            kafkaipport  = kafkaElement . element ( "ipport" ) . getText ( ) ;
            kafkazookeeper  = kafkaElement . element ( "zookeeper" ) . getText ( ) ;
            kafkatopic  = kafkaElement . element ( "topic" ) . getText ( ) ;
            kafkagroupid  = kafkaElement . element ( "groupid" ) . getText ( ) ;
            keydeserilizer =kafkaElement . element ( "keySer" ) . getText ( ) ;
            valuedeserilizer =kafkaElement . element ( "valSer" ) . getText ( ) ;

            Element mysqlElement  = root . element ( "mysql" ) ;
            mysqlurl  = mysqlElement . element ( "url" ) . getText ( ) ;
            mysqlusername  = mysqlElement . element ( "username" ) . getText ( ) ;
            mysqlpassword  = mysqlElement . element ( "password" ) . getText ( ) ;

            Element redisElement  = root . element ( "redis" ) ;
            redishost  = redisElement . element ( "host" ) . getText ( ) ;
            redisport  = Integer . valueOf (redisElement . element ( "port" ) . getText ( ) ) ;
            redispassword  = redisElement . element ( "password" ) . getText ( ) ;
            redistimeout  = Integer . valueOf (redisElement . element ( "timeout" ) . getText ( ) ) ;
            rediskeyexists  = Integer . valueOf (redisElement . element ( "keyexists" ) . getText ( ) ) ;

            Element sparkElement  = root . element ( "spark" ) ;
             // sparkstreammaster = sparkElement.element("streammaster").getText();
            sparkstreamname  = sparkElement . element ( "streamname" ) . getText ( ) ;
            sparkstreamseconds  = Integer . valueOf (sparkElement . element ( "seconds" ) . getText ( ) ) ;

            Element pathElement  = root . element ( "path" ) ;
            localpath  = pathElement . element ( "localpath" ) . getText ( ) ;
            checkpointdir  = pathElement . element ( "checkpointdir" ) . getText ( ) ;
             // gracestopfile = pathElement.element("gracestopfile").getText();

         } else  {
            log . warn ( "commons.xml配置文件读取错误..." ) ;
         }
     }
}

主要业务代码,如下:    

  package com .js .ipflow .flash .helper

import java .sql .Connection
import java .util . {Calendar , Date }

import com .alibaba .fastjson .JSON
import com .js .ipflow .start .ConfigFactory
import com .js .ipflow .utils . {DateUtil , MysqlPoolUtil , RedisPoolUtil }
import kafka .serializer .StringDecoder
import org .apache .logging .log4j .LogManager
import org .apache .spark .streaming .dstream .DStream
import org .apache .spark .streaming .kafka .KafkaUtils
import org .apache .spark .streaming . {Seconds , State , StateSpec , StreamingContext }
import org .apache .spark . {SparkConf , SparkContext }
import redis .clients .jedis .Jedis

object HelperHandle  {

  val logger  = LogManager . getLogger (HelperHandle .getClass .getSimpleName )
   // 邮件level=error日志
  val logger2  = LogManager . getLogger ( "email" )

  def  main (args : Array [String ] ) : Unit  =  {
     helperHandle ( args ( 0 ) )
   }

  def  helperHandle (consumeRate : String ) : Unit  =  {

     // 初始化配置文件
    ConfigFactory . initConfig ( )

    val conf  =  new  SparkConf ( ) . setAppName (ConfigFactory .sparkstreamname )
    conf . set ( "spark.streaming.stopGracefullyOnShutdown" ,  "true" )
    conf . set ( "spark.streaming.kafka.maxRatePerPartition" , consumeRate )
    conf . set ( "spark.default.parallelism" ,  "30" )
    val sc  =  new  SparkContext (conf )

     while  ( true )  {
      val ssc  = StreamingContext . getOrCreate (ConfigFactory .checkpointdir  + DateUtil . getDay ( 0 ) , getStreamingContext _ )
      ssc . start ( )
      ssc . awaitTerminationOrTimeout (resetTime )
      ssc . stop ( false ,  true )
     }

    def  getStreamingContext ( ) : StreamingContext  =  {
      val stateSpec  = StateSpec . function (mapFunction )
      val ssc  =  new  StreamingContext (sc ,  Seconds (ConfigFactory .sparkstreamseconds ) )
      ssc . checkpoint (ConfigFactory .checkpointdir  + DateUtil . getDay ( 0 ) )
      val zkQuorm  = ConfigFactory .kafkazookeeper
      val topics  = ConfigFactory .kafkatopic
      val topicSet  =  Set (topics )
      val kafkaParams  = Map [String , String ] (
         "metadata.broker.list"  - >  (ConfigFactory .kafkaipport )
         ,  "group.id"  - >  (ConfigFactory .kafkagroupid )
         ,  "auto.offset.reset"  - > kafka .api .OffsetRequest .LargestTimeString
       )

      val rmessage  = KafkaUtils .createDirectStream [String , String , StringDecoder , StringDecoder ] (
        ssc , kafkaParams , topicSet
       )

       // helper数据 (dateHour,guid,helperversion)
      val helper_data  = FilterHelper . getHelperData (rmessage . map (x  = >  {
        val message  = JSON . parseObject (x ._2 ) . getString ( "message" )
        JSON . parseObject (message )
       } ) ) . repartition ( 60 ) . cache ( )

       // (guid, datehour + helperversion)
      val helper_data_dis  = helper_data . map (x  = >  (x ._2 ,  addTab (x ._1 )  + x ._3 ) ) . reduceByKey ( (x , y )  = > y )

       // pv,uv
      val helper_count  = helper_data . map (x  = >  (x ._1 ,  1L ) ) . mapWithState (stateSpec ) . stateSnapshots ( ) . repartition ( 2 )

       // helperversion
      val helper_helperversion_count  = helper_data . map (x  = >  ( addTab (x ._1 )  + x ._3 ,  1L ) ) . mapWithState (stateSpec ) . stateSnapshots ( ) . repartition ( 2 )
      helper_data_dis . foreachRDD (rdd  = >  {
        rdd . foreachPartition (eachPartition  = >  {
          var jedis : Jedis  = null
           try  {
            jedis  = getJedis
            eachPartition . foreach (x  = >  {
              val arr  = x ._2 . split ( "\t" )
              val date : String  =  arr ( 0 ) . split ( ":" ) ( 0 )

               // helper 统计
              val key0  =  "helper_"  + date
              jedis . sadd (key0 , x ._1 )
              jedis . expire (key0 , ConfigFactory .rediskeyexists )
               // helperversion 统计
              val key  = date  +  "_"  +  arr ( 1 )
              jedis . sadd (key , x ._1 )
              jedis . expire (key , ConfigFactory .rediskeyexists )
             } )
           }  catch  {
             case e : Exception  = >  {
              logger . error (e )
              logger2 . error (HelperHandle .getClass .getSimpleName  + e )
             }
           }  finally  {
             if  (jedis  != null )  {
               closeJedis (jedis )
             }
           }
         } )
       } )
       insertHelper (helper_helperversion_count ,  "statistic_realtime_flash_helper" ,  "date" ,  "hour" ,  "count_all" ,  "count" ,  "helperversion" ,  "datehour" ,  "dh" )
       insertHelper (helper_count ,  "statistic_realtime_helper_count" ,  "date" ,  "hour" ,  "helper_count_all" ,  "helper_count" ,  "dh" )

      ssc
     }
   }

       ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
   // 计算当前时间距离次日零点的时长(毫秒)
  def resetTime  =  {
    val now  =  new  Date ( )
    val todayEnd  = Calendar .getInstance
    todayEnd . set (Calendar .HOUR_OF_DAY ,  23 )  // Calendar.HOUR 12小时制
    todayEnd . set (Calendar .MINUTE ,  59 )
    todayEnd . set (Calendar .SECOND ,  59 )
    todayEnd . set (Calendar .MILLISECOND ,  999 )
    todayEnd .getTimeInMillis  - now .getTime
   }

   /**
    * 插入数据
    *
    * @param data (addTab(datehour)+helperversion)
    * @param tbName
    * @param colNames
    */

  def  insertHelper (data : DStream [ (String , Long ) ] , tbName : String , colNames : String * ) : Unit  =  {
    data . foreachRDD (rdd  = >  {
      val tmp_rdd  = rdd . map (x  = > x ._1 . substring ( 11 ,  13 ) .toInt )
       if  ( !rdd . isEmpty ( ) )  {
        val hour_now  = tmp_rdd . max ( )  // 获取当前结果中最大的时间,在数据恢复中可以起作用
        rdd . foreachPartition (eachPartition  = >  {
          var jedis : Jedis  = null
          var conn : Connection  = null
           try  {
            jedis  = getJedis
            conn  = MysqlPoolUtil . getConnection ( )
            conn . setAutoCommit ( false )
            val stmt  = conn . createStatement ( )
            eachPartition . foreach (x  = >  {
               if  (colNames .length  ==  7 )  {
                val datehour  = x ._1 . split ( "\t" ) ( 0 )
                val helperversion  = x ._1 . split ( "\t" ) ( 1 )
                val date_hour  = datehour . split ( ":" )
                val date  =  date_hour ( 0 )
                val hour  =  date_hour ( 1 ) .toInt

                val colName0  =  colNames ( 0 )  // date
                val colName1  =  colNames ( 1 )  // hour
                val colName2  =  colNames ( 2 )  // count_all
                val colName3  =  colNames ( 3 )  // count
                val colName4  =  colNames ( 4 )  // helperversion
                val colName5  =  colNames ( 5 )  // datehour
                val colName6  =  colNames ( 6 )  // dh

                val colValue0  =  addYin (date )
                val colValue1  = hour
                val colValue2  = x ._2 .toInt
                val colValue3  = jedis . scard (date  +  "_"  + helperversion )  // // 2018-07-08_10.0.1.22
                val colValue4  =  addYin (helperversion )
                var colValue5  =  if  (hour  <  10 )  "'"  + date  +  " 0"  + hour  +  ":00 "  + helperversion  +  "'"  else  "'"  + date  +  " "  + hour  +  ":00 "  + helperversion  +  "'"
                val colValue6  =  if  (hour  <  10 )  "'"  + date  +  " 0"  + hour  +  ":00'"  else  "'"  + date  +  " "  + hour  +  ":00'"

                var sql  =  ""
                 if  (hour  == hour_now )  {  // uv只对现在更新
                  sql  = s "insert into ${tbName}(${colName0},${colName1},${colName2},${colName3},${colName4},${colName5},${colName6}) values(${colValue0},${colValue1},${colValue2},${colValue3},${colValue4},${colValue5},${colValue6}) on duplicate key update ${colName2} =  ${colValue2},${colName3} = ${colValue3}"
                  logger . warn (sql )
                  stmt . addBatch (sql )
                 }  /* else {
                sql = s"insert into ${tbName}(${colName0},${colName1},${colName2},${colName4},${colName5},${colName6}) values(${colValue0},${colValue1},${colValue2},${colValue4},${colValue5},${colValue6}) on duplicate key update ${colName2} =  ${colValue2}"
              }*/

               }  else  if  (colNames .length  ==  5 )  {
                val date_hour  = x ._1 . split ( ":" )
                val date  =  date_hour ( 0 )
                val hour  =  date_hour ( 1 ) .toInt
                val colName0  =  colNames ( 0 )  // date
                val colName1  =  colNames ( 1 )  // hour
                val colName2  =  colNames ( 2 )  // helper_count_all
                val colName3  =  colNames ( 3 )  // helper_count
                val colName4  =  colNames ( 4 )  // dh

                val colValue0  =  addYin (date )
                val colValue1  = hour
                val colValue2  = x ._2 .toInt
                val colValue3  = jedis . scard ( "helper_"  + date )  // // helper_2018-07-08
                val colValue4  =  if  (hour  <  10 )  "'"  + date  +  " 0"  + hour  +  ":00'"  else  "'"  + date  +  " "  + hour  +  ":00'"

                var sql  =  ""
                 if  (hour  == hour_now )  {  // uv只对现在更新
                  sql  = s "insert into ${tbName}(${colName0},${colName1},${colName2},${colName3},${colName4}) values(${colValue0},${colValue1},${colValue2},${colValue3},${colValue4}) on duplicate key update ${colName2} =  ${colValue2},${colName3} = ${colValue3}"
                  logger . warn (sql )
                  stmt . addBatch (sql )
                 }
               }
             } )
            stmt . executeBatch ( )  // 批量执行sql语句
            conn . commit ( )
           }  catch  {
             case e : Exception  = >  {
              logger . error (e )
              logger2 . error (HelperHandle .getClass .getSimpleName  + e )
             }
           }  finally  {
             if  (jedis  != null )  {
               closeJedis (jedis )
             }

             if (conn  != null ) {
              conn . close ( )
             }
           }
         } )
       }
     } )
   }

  def  addYin (str : String ) : String  =  {
     "'"  + str  +  "'"
   }

   // 字符串添加tab格式化方法
  def  addTab (str : String ) : String  =  {
    str  +  "\t" ;
   }

   // 实时流量状态更新函数
  val mapFunction  =  (datehour : String , pv : Option [Long ] , state : State [Long ] )  = >  {
    val accuSum  = pv . getOrElse ( 0L )  + state . getOption ( ) . getOrElse ( 0L )
    val output  =  (datehour , accuSum )
    state . update (accuSum )
    output
   }

   // 获取jedis连接
  def getJedis : Jedis  =  {
    val jedis  = RedisPoolUtil .getPool .getResource
    jedis
   }

   // 释放jedis连接
  def  closeJedis (jedis : Jedis ) : Unit  =  {
    RedisPoolUtil .getPool . returnResource (jedis )
   }

}

----------------------------
原文链接:https://blog.csdn.net/ddxygq/article/details/81258643

程序猿的技术大观园:www.javathinker.net



[这个贴子最后由 flybird 在 2020-03-22 11:01:02 重新编辑]
网站系统异常


系统异常信息
Request URL: http://www.javathinker.net/WEB-INF/lybbs/jsp/topic.jsp?postID=2954

java.lang.NullPointerException

如果你不知道错误发生的原因,请把上面完整的信息提交给本站管理人员