大数据竞赛(任务书一)-离线数据处理任务三:指标计算
任务描述
使用 Scala 编写 Spark 工程代码,完成相关指标的计算任务
在指标计算过程中,请忽略订单信息表中的
order_status字段,所有订单均视为有效订单。涉及订单金额的计算仅使用final_total_amount字段。同时,DWD 层所有维度表均使用最新分区的数据
启动Hive Metastore服务
Spark读写Hive表,需要访问Metastore服务。在终端中执行如下命令:
hive --servicemetastore
这将保持Hive Metastore服务一直运行,请勿关闭终端。如果要将其作为后台服务启动,则可以使用下面的命令:
nohup hive--service metastore &
这个命令将启动Hive Metastore服务,并在后台持续运行
子任务1:省份-地区-月份维度的订单指标统计
任务说明
请根据 DWD 层的数据,计算以下指标并写入 MySQL:
- 统计每个省份、每个地区、每个月的:
- 订单数量(totalorder)
- 订单总金额(totalconsumption)
结果存入 MySQL 数据库 shtd_result 中的 provinceeverymonth 表,表结构如下:
| 字段名 | 类型 | 含义 | 备注 |
|---|---|---|---|
| provinceid | int | 省份主键 | |
| provincename | text | 省份名称 | |
| regionid | int | 地区主键 | |
| regionname | text | 地区名称 | |
| totalconsumption | double | 订单总金额 | 当月汇总 |
| totalorder | int | 订单总数 | 当月汇总 |
| year | int | 年份 | 来源订单时间 |
| month | int | 月份 | 来源订单时间 |
按如下规则降序排序(订单数 → 金额 → 省份 ID),选出前 5 条并截图附在报告中。
子任务1分析
- 从上面的表结构可以看出,数据来自于三张表:dim_region, dim_privnace, fact_order_info。
- 需要对订单表进行预处理,派生出两个新的字段:year列和month列
- 最后结果需要在以上三表join连接的基础上执行聚合统计。
实现思路
- 原始数据来自三张表:
fact_order_info、dim_province和dim_region - 对订单表提取
year和month字段 - 三表 Join 后按指定维度进行分组聚合
- 聚合结果写入 MySQL 指定表中
实现代码
def analysisTask1(spark: SparkSession): Unit = {
// 读取各表数据(维表选取最新分区)
val dwdOrders = spark.table("dwd.fact_order_info")
val dwdProvince = spark.sql("SELECT * FROM dwd.dim_province WHERE etl_date = '20221031'")
val dwdRegion = spark.sql("SELECT * FROM dwd.dim_region WHERE etl_date = '20221031'")
// 提取 year 和 month 字段
val transOrders = dwdOrders
.withColumn("year", year(col("operate_time")))
.withColumn("month", month(col("operate_time")))
.select("id", "final_total_amount", "province_id", "year", "month")
// 选择维表字段
val transProvince = dwdProvince.select("id", "name", "region_id")
val transRegion = dwdRegion.select("id", "region_name")
// 注册临时视图
transOrders.createOrReplaceTempView("orderinfo")
transProvince.createOrReplaceTempView("province")
transRegion.createOrReplaceTempView("region")
// 三表连接
val joinedDf = spark.sql(
"""
|SELECT
| r.id AS regionid,
| r.region_name AS regionname,
| p.id AS provinceid,
| p.name AS provincename,
| o.year,
| o.month,
| o.final_total_amount
|FROM region r
|JOIN province p ON r.id = p.region_id
|JOIN orderinfo o ON p.id = o.province_id
|""".stripMargin)
joinedDf.createOrReplaceTempView("joined_table")
// 聚合统计
val resultDf = spark.sql(
"""
|SELECT
| regionid,
| regionname,
| provinceid,
| provincename,
| year,
| month,
| COUNT(*) AS totalorder,
| SUM(final_total_amount) AS totalconsumption
|FROM joined_table
|GROUP BY regionid, regionname, provinceid, provincename, year, month
|ORDER BY regionid, regionname, provinceid, provincename, year, month
|""".stripMargin)
// 写入 MySQL shtd_result 库的 provinceeverymonth 表
exportToMysql(resultDf, "provinceeverymonth")
}
数据写入 MySQL 函数
/**
* 将 DataFrame 写入 MySQL 数据库
* @param df 分析结果集
* @param tb 目标表名
*/
def exportToMysql(df: Dataset[Row], tb: String): Unit = {
val DB_URL = "jdbc:mysql://localhost:3306/shtd_result?useSSL=false"
val props = new Properties()
props.put("user", "root")
props.put("password", "123456")
props.put("driver", "com.mysql.jdbc.Driver")
df.write.mode("overwrite").jdbc(DB_URL, tb, props)
}
MySQL 查询
SELECT *
FROM provinceeverymonth
ORDER BY totalorder DESC, totalconsumption DESC, provinceid DESC
LIMIT 5;
查询结果如下所示:
子任务2:省份平均订单金额对比分析
任务说明
请根据 DWD 层的表计算出 2020 年 4 月份每个省份的平均订单金额,并与全国所有省份的平均订单金额进行对比,结果写入 MySQL 数据库 shtd_result 中的 provinceavgcmp 表,字段结构如下:
| 字段名 | 类型 | 含义 | 备注 |
|---|---|---|---|
| provinceid | int | 省份主键 | |
| provincename | text | 省份名称 | |
| provinceavgconsumption | double | 该省平均订单金额 | |
| allprovinceavgconsumption | double | 所有省平均订单金额 | 全国人均订单消费额 |
| comparison | text | 对比结果 | 与全国均值的对比结果,可能为:高 / 低 / 相同 |
按“省份主键降序 + 平均金额降序”排序查询前 5 条记录,并将截图附入报告
实现思路
- 源数据来自两张表:
fact_order_info和dim_province - 首先筛选出 2020 年 4 月的数据
- 然后分别计算每个省份与全国的平均订单金额
- 使用 crossJoin 方式合并计算结果并生成对比字段
- 最终写入 MySQL 的目标表
实现代码
def analysisTask2(spark: SparkSession): Unit = {
// 读取订单事实表,筛选 2020 年 4 月数据并提取字段
val factOrdersApril = spark.table("dwd.fact_order_info")
.withColumn("year", year(col("operate_time")))
.withColumn("month", month(col("operate_time")))
.filter(col("year") === 2020 && col("month") === 4)
.select("id", "final_total_amount", "province_id")
// 读取省份维表(使用最新分区)
val dimProvince = spark.sql("SELECT * FROM dwd.dim_province WHERE etl_date = '20221031'")
.select(col("id").as("provinceid"), col("name").as("provincename"))
// 注册临时视图
factOrdersApril.createOrReplaceTempView("fact_orders_4m")
dimProvince.createOrReplaceTempView("dim_province")
// Join 操作:关联省份名称
val joinedDf = spark.sql(
"""
|SELECT
| p.provinceid,
| p.provincename,
| o.final_total_amount
|FROM dim_province p
|JOIN fact_orders_4m o ON p.provinceid = o.province_id
|""".stripMargin)
// 每个省的平均订单金额
val provinceAvgDf = joinedDf
.groupBy("provinceid", "provincename")
.agg(avg("final_total_amount").as("provinceavgconsumption"))
// 所有省的平均订单金额
val allProvinceAvgDf = joinedDf
.agg(avg("final_total_amount").as("allprovinceavgconsumption"))
// CrossJoin 两者以便对比并派生 comparison 字段
val resultDf = provinceAvgDf.crossJoin(allProvinceAvgDf)
.withColumn("comparison",
when(col("provinceavgconsumption") > col("allprovinceavgconsumption"), "高")
.when(col("provinceavgconsumption") === col("allprovinceavgconsumption"), "相同")
.otherwise("低")
)
// 写入 MySQL 数据库 shtd_result 的 provinceavgcmp 表
exportToMysql(resultDf, "provinceavgcmp")
}
MySQL 查询
SELECT provinceid, provinceavgconsumption
FROM provinceavgcmp
ORDER BY provinceid DESC, provinceavgconsumption DESC
LIMIT 5;
结果如下所示:
子任务3
任务说明
从 dwd.fact_order_info 表中,统计在连续两天内下单且下单金额保持增长的用户,并将结果写入 MySQL 数据库 shtd_result 中的 usercontinueorder 表。该表结构如下:
| 字段 | 类型 | 中文含义 | 备注 |
|---|---|---|---|
| userid | int | 客户主键 | |
| username | text | 客户名称 | |
| day | text | 下单日 | 连续下单的两天,格式为 yyyyMMdd_yyyyMMdd,如:20220101_20220102 |
| totalconsumption | double | 订单总金额 | 两天的订单金额之和 |
| totalorder | int | 订单总数 | 两天的订单数量之和 |
根据订单总数、订单总金额、客户主键进行降序排序,选出前 5 条并截图附在报告中
实现思路
数据来源于
fact_order_info表为了判断用户是否连续两天下单,且金额递增,需要对用户按日期排序后进行跨行比较,必须使用**窗口函数
需将每日数据聚合后,再进行前后两日的配对与筛选
实现代码
def analysisTask3(spark: SparkSession): Unit = {
val fact_orders = spark.table("dwd.fact_order_info")
// 提取所需字段并格式化日期
val simple_fact_orders = fact_orders
.withColumn("day", to_date(col("operate_time")))
.select(
col("id").as("orderid"),
col("user_id").as("userid"),
col("consignee").as("username"),
col("day"),
col("final_total_amount")
)
// 聚合每天的订单金额与订单数量
val agg_fact_orders = simple_fact_orders
.groupBy("userid", "username", "day")
.agg(
sum("final_total_amount").as("daytotalconsumption"),
count("orderid").as("daytotalorder")
)
.orderBy("userid", "username", "day")
import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy("userid", "username").orderBy("day")
// 构建窗口列:是否连续、前一日金额、前一日订单数
val winOrders = agg_fact_orders
.withColumn("haslastday", when(datediff(col("day"), lag("day", 1).over(w)) === 1, 1).otherwise(lit(0)))
.withColumn("lastday_total", when(col("haslastday") === 1, lag("daytotalconsumption", 1).over(w)).otherwise(lit(0.0)))
.withColumn("lastday_count", when(col("haslastday") === 1, lag("daytotalorder", 1).over(w)).otherwise(lit(0)))
// 筛选连续两天且金额递增的记录,组装目标字段
val resultDF = winOrders
.filter(col("haslastday") === 1 && col("daytotalconsumption") > col("lastday_total"))
.withColumn("totalconsumption", col("daytotalconsumption") + col("lastday_total"))
.withColumn("totalorder", col("daytotalorder") + col("lastday_count"))
.withColumn("day", concat(date_format(date_sub(col("day"), 1), "yyyyMMdd"), lit("_"), date_format(col("day"), "yyyyMMdd")))
.select(
col("userid"),
col("username"),
col("day"),
bround(col("totalconsumption"), 2).as("totalconsumption"),
col("totalorder")
)
// 写入 MySQL 数据库 shtd_result.usercontinueorder 表
exportToMysql(resultDF, "usercontinueorder")
}
MySQL 查询
SELECT *
FROM shtd_result.usercontinueorder
ORDER BY totalorder DESC, totalconsumption DESC, userid DESC
LIMIT 5;
结果如下所示:
由以上查询结果可知,由于提供的样例数据问题,基于该样例数据并没有找到连续两日下单的用户(不是何种原因比赛时也依旧没有找到数据)。
补充建议
前两个子任务可直接作为练习,指标计算类任务在正式比赛中题型变化较大,建议好好去看看spark教程和各种函数,题大体相近但是很多地方用的函数会变,题目也会多出一题,一定要多去了解各种函数
欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 1701220998@qq.com