日志文件: https://pan.baidu.com/s/1Eve8GmGi21JLV70fqJjmQw
提取码:3xsp
使用工具:IDEA Maven
使用Spark完成数据清洗和日用户留存分析:
目录
1.搭建环境
配置pom.xml
<repositories> <repository> <id>aliyunmaven</id> <url>http://maven.aliyun.com/nexus/content/groups/public/</url> </repository> <repository> <id>spring-milestones</id> <name>Spring Milestones</name> <url>https://repo.spring.io/milestone</url> </repository> </repositories> <dependencies> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.13</artifactId> <version>3.2.1</version> </dependency> <!-- https://mvnrepository.com/artifact/junit/junit --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.13.2</version> <scope>test</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library --> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.13.8</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.13</artifactId> <version>3.2.1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.13</artifactId> <version>3.2.1</version> </dependency> <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.28</version> </dependency> </dependencies>
下载Scala插件:
file->setting->plugins
2.数据清洗
可以通过SparkSql中DataFrame的数据抽象,将数据存放在Mysql中,整个日志的RDD格式走向变化过程可理解为:
RDD[String]->RDD[Array[String]]->RDD[Row]->DataFrame->存入Mysql
在数据清洗前,需要了解Web日志的规格设置,本日志数据与数据之间是通过”\t”也就是Tab键位分隔开的,下面是一条常规的Web日志,其规格如下
event_time = 2018-09-04T20:27:31+08:00 url = http://datacenter.bdqn.cn/logs/user?actionBegin=1536150451540&actionClient=Mozilla%2F5.0+%28Windows+NT+10.0%3B+WOW64%29+AppleWebKit%2F537.36+%28KHTML%2C+like+Gecko%29+Chrome%2F58.0.3029.110+Safari%2F537.36+SE+2.X+MetaSr+1.0&actionEnd=1536150451668&actionName=startEval&actionTest=0&actionType=3&actionValue=272090&clientType=001_kgc&examType=001&ifEquipment=web&isFromContinue=false&skillIdCount=0&skillLevel=0&testType=jineng&userSID=B842B843AE317425D53D0C567A903EF7.exam-tomcat-node3.exam-tomcat-node3&userUID=272090&userUIP=1.180.18.157 method = GET status = 200 sip = 192.168.168.64 user_uip = - action_prepend = - action_client = Apache-HttpClient/4.1.2 (java 1.5)
1)将RDD[String]转换为RDD[Row]的形式,并且过滤字段数少于8的日志
val linesRDD = sc.textFile("C:/Users/Lenovo/Desktop/Working/Python/data/test.log") import spark.implicits._ val line1 = linesRDD.map(x => x.split("\t")) //line1.foreach(println) val rdd = line1 .filter(x => x.length == 8) .map(x => Row(x(0).trim, x(1).trim, x(2).trim, x(3).trim, x(4).trim, x(5).trim, x(6).trim, x(7).trim)) //rdd.foreach(println)
2)将RDD[Row]转换为DataFrame,建立初步映射关系
// 建立RDD和表格的映射关系 val schema = StructType(Array( StructField("event_time", StringType), StructField("url", StringType), StructField("method", StringType), StructField("status", StringType), StructField("sip", StringType), StructField("user_uip", StringType), StructField("action_prepend", StringType), StructField("action_client", StringType) )) val orgDF = spark.createDataFrame(rdd, schema) // orgDF.show(5)
3)将url按照”&”和”=”切割字段
//去重,过滤掉状态码非200,过滤时间为空 //distinct是根据每一条数据进行完整内容的比对和去重,dropDuplicates可以根据指定的字段进行去重。 val ds1 = orgDF.dropDuplicates("event_time", "url") .filter(x => x(3) == "200") .filter(x => StringUtils.isNotEmpty(x(0).toString)) //将url按照"&"和"="切割 //userSID //userUIP //actionClient //actionBegin //actionEnd //actionType //actionPrepend //actionTest //ifEquipment //actionName //id //progress进行切割 //以map的形式建立内部映射关系 val dfDetail = ds1.map(row => { val urlArray = row.getAs[String]("url").split("\\?") var map = Map("params" -> "null") if (urlArray.length == 2) { map = urlArray(1).split("&") .map(x => x.split("=")) .filter(_.length == 2) .map(x => (x(0), x(1))) .toMap } ( //map为url中字段,row为原DataFrame字段 row.getAs[String]("event_time"), row.getAs[String]("user_uip"), row.getAs[String]("method"), row.getAs[String]("status"), row.getAs[String]("sip"), map.getOrElse("actionBegin", ""), map.getOrElse("actionEnd", ""), map.getOrElse("userUID", ""), map.getOrElse("userSID", ""), map.getOrElse("userUIP", ""), map.getOrElse("actionClient", ""), map.getOrElse("actionType", ""), map.getOrElse("actionPrepend", ""), map.getOrElse("actionTest", ""), map.getOrElse("ifEquipment", ""), map.getOrElse("actionName", ""), map.getOrElse("progress", ""), map.getOrElse("id", "") ) }).toDF() // dfDetail.show(5)
4)重新组建表头,将原DataFrame数据全部平摊,并存入数据库
val detailRDD = dfDetail.rdd val detailSchema = StructType(Array( StructField("event_time", StringType), StructField("user_uip", StringType), StructField("method", StringType), StructField("status", StringType), StructField("sip", StringType), StructField("actionBegin", StringType), StructField("actionEnd", StringType), StructField("userUID", StringType), StructField("userSID", StringType), StructField("userUIP", StringType), StructField("actionClient", StringType), StructField("actionType", StringType), StructField("actionPrepend", StringType), StructField("actionTest", StringType), StructField("ifEquipment", StringType), StructField("actionName", StringType), StructField("progress", StringType), StructField("id", StringType) )) val detailDF = spark.createDataFrame(detailRDD, detailSchema) // overwrite重写,append追加 val prop = new Properties() prop.put("user", "root") prop.put("password", "******") prop.put("driver","com.mysql.jdbc.Driver") val url = "jdbc:mysql://localhost:3306/python_db" println("开始写入数据库") detailDF.write.mode("overwrite").jdbc(url,"logDetail",prop) println("完成写入数据库")
3.用户日留存分析
- 求出第n天的新增用户总数m
- 求出第n+1天登录与n天新增用户的交集的总数n
- 留存率=n/m*100%
1)求出注册和登录行为的数据表
val prop = new Properties() prop.put("user", "root") prop.put("password", "******") prop.put("driver", "com.mysql.jdbc.Driver") val url = "jdbc:mysql://localhost:3306/python_db" val dataFrame = spark.read.jdbc(url, "logdetail", prop) //所有的注册用户信息(userID,register_time,注册行为) val registerDF = dataFrame .filter(dataFrame("actionName") === ("Registered")) .select("userUID","event_time", "actionName") .withColumnRenamed("event_time","register_time") .withColumnRenamed("userUID","regUID") // registerDF.show(5) //原获取的日期格式为2018-09-04T20:27:31+08:00,只需要获取前10个字段(yyyy-mm-dd) val registDF2 = registerDF .select(registerDF("regUID"),registerDF("register_time") .substr(1,10).as("register_date"),registerDF("actionName")) .distinct() // registDF2.show(5) //所有的用户登录信息DF(userUID,signin_time,登录行为) val signinDF = dataFrame.filter(dataFrame("actionName") === ("Signin")) .select("userUID","event_time", "actionName") .withColumnRenamed("event_time","signing_time") .withColumnRenamed("userUID","signUID") // signinDF.show(5) val signiDF2 = signinDF .select(signinDF("signUID"),signinDF("signing_time") .substr(1,10).as("signing_date"),signinDF("actionName")) .distinct() // signiDF2.show(5)
2)求出第n和n+1天的交集总数n,第n天新增用户数m
//以inner方式将相同userUID加在一起 val joinDF = registDF2 .join(signiDF2,signiDF2("signUID") === registDF2("regUID"),joinType = "inner") // joinDF.show(5) //Spark内置的datediff函数求出第n和n+1天交集总数n val frame = joinDF .filter(datediff(joinDF("signing_date"),joinDF("register_date")) === 1) .groupBy(joinDF("register_date")).count() .withColumnRenamed("count","signcount") // frame.show(5) //过滤,只拿第n天和当天新增用户总数m val frame1 = registDF2 .groupBy(registDF2("register_date")).count() .withColumnRenamed("count","regcount") // frame1.show(5)
3)留存率=n/m*100%
//将m和n放在一张表格中 val frame2 = frame .join(frame1,"register_date") frame2.show() //新增列名留存率,数值为n/m,求出第n天的用户留存率 frame2.withColumn("留存率",frame2("signcount")/frame2("regcount")) .show()
4.源代码:
DataClear.scala
package spark import org.apache.commons.lang.StringUtils import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.{Row, SparkSession} import java.util.Properties object DataClear { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().master("local[1]").appName("DataClear").getOrCreate() val sc = spark.sparkContext val linesRDD = sc.textFile("C:/Users/Lenovo/Desktop/Working/Python/data/test.log") import spark.implicits._ val line1 = linesRDD.map(x => x.split("\t")) //line1.foreach(println) val rdd = line1 .filter(x => x.length == 8) .map(x => Row(x(0).trim, x(1).trim, x(2).trim, x(3).trim, x(4).trim, x(5).trim, x(6).trim, x(7).trim)) //rdd.foreach(println) // 建立RDD和表格的映射关系 val schema = StructType(Array( StructField("event_time", StringType), StructField("url", StringType), StructField("method", StringType), StructField("status", StringType), StructField("sip", StringType), StructField("user_uip", StringType), StructField("action_prepend", StringType), StructField("action_client", StringType) )) val orgDF = spark.createDataFrame(rdd, schema) // orgDF.show(5) //去重,过滤掉状态码非200,过滤时间为空 //distinct是根据每一条数据进行完整内容的比对和去重,dropDuplicates可以根据指定的字段进行去重。 val ds1 = orgDF.dropDuplicates("event_time", "url") .filter(x => x(3) == "200") .filter(x => StringUtils.isNotEmpty(x(0).toString)) //将url按照"&"以及"="切割,即按照userUID //userSID //userUIP //actionClient //actionBegin //actionEnd //actionType //actionPrepend //actionTest //ifEquipment //actionName //id //progress进行切割 val dfDetail = ds1.map(row => { val urlArray = row.getAs[String]("url").split("\\?") var map = Map("params" -> "null") if (urlArray.length == 2) { map = urlArray(1).split("&") .map(x => x.split("=")) .filter(_.length == 2) .map(x => (x(0), x(1))) .toMap } ( row.getAs[String]("event_time"), row.getAs[String]("user_uip"), row.getAs[String]("method"), row.getAs[String]("status"), row.getAs[String]("sip"), map.getOrElse("actionBegin", ""), map.getOrElse("actionEnd", ""), map.getOrElse("userUID", ""), map.getOrElse("userSID", ""), map.getOrElse("userUIP", ""), map.getOrElse("actionClient", ""), map.getOrElse("actionType", ""), map.getOrElse("actionPrepend", ""), map.getOrElse("actionTest", ""), map.getOrElse("ifEquipment", ""), map.getOrElse("actionName", ""), map.getOrElse("progress", ""), map.getOrElse("id", "") ) }).toDF() // dfDetail.show(5) val detailRDD = dfDetail.rdd val detailSchema = StructType(Array( StructField("event_time", StringType), StructField("user_uip", StringType), StructField("method", StringType), StructField("status", StringType), StructField("sip", StringType), StructField("actionBegin", StringType), StructField("actionEnd", StringType), StructField("userUID", StringType), StructField("userSID", StringType), StructField("userUIP", StringType), StructField("actionClient", StringType), StructField("actionType", StringType), StructField("actionPrepend", StringType), StructField("actionTest", StringType), StructField("ifEquipment", StringType), StructField("actionName", StringType), StructField("progress", StringType), StructField("id", StringType) )) val detailDF = spark.createDataFrame(detailRDD, detailSchema) detailDF.show(10) // overwrite重写,append追加 val prop = new Properties() prop.put("user", "root") prop.put("password", "******") prop.put("driver","com.mysql.jdbc.Driver") val url = "jdbc:mysql://localhost:3306/python_db" println("开始写入数据库") detailDF.write.mode("overwrite").jdbc(url,"logDetail",prop) println("完成写入数据库") } }
UserAnaylsis.scala
package spark import java.text.SimpleDateFormat import java.util.Properties import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions.{datediff, unix_timestamp} object UserAnalysis { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("userAnalysis").master("local").getOrCreate() val sc = spark.sparkContext val prop = new Properties() prop.put("user", "root") prop.put("password", "******") prop.put("driver", "com.mysql.jdbc.Driver") val url = "jdbc:mysql://localhost:3306/python_db" val dataFrame = spark.read.jdbc(url, "logdetail", prop) dataFrame.show(10) //所有的注册用户信息(userID,register_time,注册行为) val registerDF = dataFrame.filter(dataFrame("actionName") === ("Registered")) .select("userUID","event_time", "actionName") .withColumnRenamed("event_time","register_time") .withColumnRenamed("userUID","regUID") // registerDF.show(5) //原获取的日期格式为2018-09-04T20:27:31+08:00,只需要获取前10个字段(yyyy-mm-dd) val registDF2 = registerDF .select(registerDF("regUID"),registerDF("register_time") .substr(1,10).as("register_date"),registerDF("actionName")) .distinct() // registDF2.show(5) //所有的用户登录信息DF(userUID,signin_time,登录行为) val signinDF = dataFrame.filter(dataFrame("actionName") === ("Signin")) .select("userUID","event_time", "actionName") .withColumnRenamed("event_time","signing_time") .withColumnRenamed("userUID","signUID") // signinDF.show(5) val signiDF2 = signinDF .select(signinDF("signUID"),signinDF("signing_time") .substr(1,10).as("signing_date"),signinDF("actionName")) .distinct() // signiDF2.show(5) //以inner方式将相同userUID加在一起 val joinDF = registDF2 .join(signiDF2,signiDF2("signUID") === registDF2("regUID"),joinType = "inner") // joinDF.show(5) //Spark内置的datediff函数求出第n和n+1天交集总数n val frame = joinDF .filter(datediff(joinDF("signing_date"),joinDF("register_date")) === 1) .groupBy(joinDF("register_date")).count() .withColumnRenamed("count","signcount") // frame.show(5) //过滤,只拿第n天和当天新增用户总数m val frame1 = registDF2 .groupBy(registDF2("register_date")).count() .withColumnRenamed("count","regcount") // frame1.show(5) //将m和n放在一张表格中 val frame2 = frame .join(frame1,"register_date") // frame2.show() //新增列名留存率,数值为n/m,求出第n天的用户留存率 frame2.withColumn("留存率",frame2("signcount")/frame2("regcount")) .show() sc.stop() } }
Be First to Comment