大数据应用技术模块b任务三

大数据竞赛(任务书一)-离线数据处理任务三:指标计算

任务描述

使用 Scala 编写 Spark 工程代码,完成相关指标的计算任务

在指标计算过程中,请忽略订单信息表中的 order_status 字段,所有订单均视为有效订单。涉及订单金额的计算仅使用 final_total_amount 字段。同时,DWD 层所有维度表均使用最新分区的数据

启动Hive Metastore服务

Spark读写Hive表,需要访问Metastore服务。在终端中执行如下命令:

hive --servicemetastore

这将保持Hive Metastore服务一直运行,请勿关闭终端。如果要将其作为后台服务启动,则可以使用下面的命令:

nohup hive--service metastore &

这个命令将启动Hive Metastore服务,并在后台持续运行

子任务1:省份-地区-月份维度的订单指标统计

任务说明

请根据 DWD 层的数据,计算以下指标并写入 MySQL:

  • 统计每个省份、每个地区、每个月的:
    • 订单数量(totalorder)
    • 订单总金额(totalconsumption)

结果存入 MySQL 数据库 shtd_result 中的 provinceeverymonth 表,表结构如下:

字段名 类型 含义 备注
provinceid int 省份主键
provincename text 省份名称
regionid int 地区主键
regionname text 地区名称
totalconsumption double 订单总金额 当月汇总
totalorder int 订单总数 当月汇总
year int 年份 来源订单时间
month int 月份 来源订单时间

按如下规则降序排序(订单数 → 金额 → 省份 ID),选出前 5 条并截图附在报告中。

子任务1分析

  • 从上面的表结构可以看出,数据来自于三张表:dim_region, dim_privnace, fact_order_info。
  • 需要对订单表进行预处理,派生出两个新的字段:year列和month列
  • 最后结果需要在以上三表join连接的基础上执行聚合统计。

实现思路

  • 原始数据来自三张表:fact_order_infodim_provincedim_region
  • 对订单表提取 yearmonth 字段
  • 三表 Join 后按指定维度进行分组聚合
  • 聚合结果写入 MySQL 指定表中

实现代码

def analysisTask1(spark: SparkSession): Unit = {
  // 读取各表数据(维表选取最新分区)
  val dwdOrders = spark.table("dwd.fact_order_info")
  val dwdProvince = spark.sql("SELECT * FROM dwd.dim_province WHERE etl_date = '20221031'")
  val dwdRegion = spark.sql("SELECT * FROM dwd.dim_region WHERE etl_date = '20221031'")

  // 提取 year 和 month 字段
  val transOrders = dwdOrders
    .withColumn("year", year(col("operate_time")))
    .withColumn("month", month(col("operate_time")))
    .select("id", "final_total_amount", "province_id", "year", "month")

  // 选择维表字段
  val transProvince = dwdProvince.select("id", "name", "region_id")
  val transRegion = dwdRegion.select("id", "region_name")

  // 注册临时视图
  transOrders.createOrReplaceTempView("orderinfo")
  transProvince.createOrReplaceTempView("province")
  transRegion.createOrReplaceTempView("region")

  // 三表连接
  val joinedDf = spark.sql(
    """
      |SELECT
      |  r.id AS regionid,
      |  r.region_name AS regionname,
      |  p.id AS provinceid,
      |  p.name AS provincename,
      |  o.year,
      |  o.month,
      |  o.final_total_amount
      |FROM region r
      |JOIN province p ON r.id = p.region_id
      |JOIN orderinfo o ON p.id = o.province_id
      |""".stripMargin)

  joinedDf.createOrReplaceTempView("joined_table")

  // 聚合统计
  val resultDf = spark.sql(
    """
      |SELECT
      |  regionid,
      |  regionname,
      |  provinceid,
      |  provincename,
      |  year,
      |  month,
      |  COUNT(*) AS totalorder,
      |  SUM(final_total_amount) AS totalconsumption
      |FROM joined_table
      |GROUP BY regionid, regionname, provinceid, provincename, year, month
      |ORDER BY regionid, regionname, provinceid, provincename, year, month
      |""".stripMargin)

  // 写入 MySQL shtd_result 库的 provinceeverymonth 表
  exportToMysql(resultDf, "provinceeverymonth")
}

数据写入 MySQL 函数

/**
 * 将 DataFrame 写入 MySQL 数据库
 * @param df 分析结果集
 * @param tb 目标表名
 */
def exportToMysql(df: Dataset[Row], tb: String): Unit = {
  val DB_URL = "jdbc:mysql://localhost:3306/shtd_result?useSSL=false"

  val props = new Properties()
  props.put("user", "root")
  props.put("password", "123456")
  props.put("driver", "com.mysql.jdbc.Driver")

  df.write.mode("overwrite").jdbc(DB_URL, tb, props)
}

MySQL 查询

SELECT *
FROM provinceeverymonth
ORDER BY totalorder DESC, totalconsumption DESC, provinceid DESC
LIMIT 5;

查询结果如下所示:

子任务2:省份平均订单金额对比分析

任务说明

请根据 DWD 层的表计算出 2020 年 4 月份每个省份的平均订单金额,并与全国所有省份的平均订单金额进行对比,结果写入 MySQL 数据库 shtd_result 中的 provinceavgcmp 表,字段结构如下:

字段名 类型 含义 备注
provinceid int 省份主键
provincename text 省份名称
provinceavgconsumption double 该省平均订单金额
allprovinceavgconsumption double 所有省平均订单金额 全国人均订单消费额
comparison text 对比结果 与全国均值的对比结果,可能为:高 / 低 / 相同

按“省份主键降序 + 平均金额降序”排序查询前 5 条记录,并将截图附入报告

实现思路

  • 源数据来自两张表:fact_order_infodim_province
  • 首先筛选出 2020 年 4 月的数据
  • 然后分别计算每个省份与全国的平均订单金额
  • 使用 crossJoin 方式合并计算结果并生成对比字段
  • 最终写入 MySQL 的目标表

实现代码

def analysisTask2(spark: SparkSession): Unit = {
  // 读取订单事实表,筛选 2020 年 4 月数据并提取字段
  val factOrdersApril = spark.table("dwd.fact_order_info")
    .withColumn("year", year(col("operate_time")))
    .withColumn("month", month(col("operate_time")))
    .filter(col("year") === 2020 && col("month") === 4)
    .select("id", "final_total_amount", "province_id")

  // 读取省份维表(使用最新分区)
  val dimProvince = spark.sql("SELECT * FROM dwd.dim_province WHERE etl_date = '20221031'")
    .select(col("id").as("provinceid"), col("name").as("provincename"))

  // 注册临时视图
  factOrdersApril.createOrReplaceTempView("fact_orders_4m")
  dimProvince.createOrReplaceTempView("dim_province")

  // Join 操作:关联省份名称
  val joinedDf = spark.sql(
    """
      |SELECT
      |  p.provinceid,
      |  p.provincename,
      |  o.final_total_amount
      |FROM dim_province p
      |JOIN fact_orders_4m o ON p.provinceid = o.province_id
      |""".stripMargin)

  // 每个省的平均订单金额
  val provinceAvgDf = joinedDf
    .groupBy("provinceid", "provincename")
    .agg(avg("final_total_amount").as("provinceavgconsumption"))

  // 所有省的平均订单金额
  val allProvinceAvgDf = joinedDf
    .agg(avg("final_total_amount").as("allprovinceavgconsumption"))

  // CrossJoin 两者以便对比并派生 comparison 字段
  val resultDf = provinceAvgDf.crossJoin(allProvinceAvgDf)
    .withColumn("comparison",
      when(col("provinceavgconsumption") > col("allprovinceavgconsumption"), "高")
        .when(col("provinceavgconsumption") === col("allprovinceavgconsumption"), "相同")
        .otherwise("低")
    )

  // 写入 MySQL 数据库 shtd_result 的 provinceavgcmp 表
  exportToMysql(resultDf, "provinceavgcmp")
}

MySQL 查询

SELECT provinceid, provinceavgconsumption
FROM provinceavgcmp
ORDER BY provinceid DESC, provinceavgconsumption DESC
LIMIT 5;

结果如下所示:

子任务3

任务说明

dwd.fact_order_info 表中,统计在连续两天内下单且下单金额保持增长的用户,并将结果写入 MySQL 数据库 shtd_result 中的 usercontinueorder 表。该表结构如下:

字段 类型 中文含义 备注
userid int 客户主键
username text 客户名称
day text 下单日 连续下单的两天,格式为 yyyyMMdd_yyyyMMdd,如:20220101_20220102
totalconsumption double 订单总金额 两天的订单金额之和
totalorder int 订单总数 两天的订单数量之和

根据订单总数、订单总金额、客户主键进行降序排序,选出前 5 条并截图附在报告中

实现思路

  • 数据来源于 fact_order_info

  • 为了判断用户是否连续两天下单,且金额递增,需要对用户按日期排序后进行跨行比较,必须使用**窗口函数

  • 需将每日数据聚合后,再进行前后两日的配对与筛选

实现代码

def analysisTask3(spark: SparkSession): Unit = {
  val fact_orders = spark.table("dwd.fact_order_info")

  // 提取所需字段并格式化日期
  val simple_fact_orders = fact_orders
    .withColumn("day", to_date(col("operate_time")))
    .select(
      col("id").as("orderid"),
      col("user_id").as("userid"),
      col("consignee").as("username"),
      col("day"),
      col("final_total_amount")
    )

  // 聚合每天的订单金额与订单数量
  val agg_fact_orders = simple_fact_orders
    .groupBy("userid", "username", "day")
    .agg(
      sum("final_total_amount").as("daytotalconsumption"),
      count("orderid").as("daytotalorder")
    )
    .orderBy("userid", "username", "day")

  import org.apache.spark.sql.expressions.Window
  val w = Window.partitionBy("userid", "username").orderBy("day")

  // 构建窗口列:是否连续、前一日金额、前一日订单数
  val winOrders = agg_fact_orders
    .withColumn("haslastday", when(datediff(col("day"), lag("day", 1).over(w)) === 1, 1).otherwise(lit(0)))
    .withColumn("lastday_total", when(col("haslastday") === 1, lag("daytotalconsumption", 1).over(w)).otherwise(lit(0.0)))
    .withColumn("lastday_count", when(col("haslastday") === 1, lag("daytotalorder", 1).over(w)).otherwise(lit(0)))

  // 筛选连续两天且金额递增的记录,组装目标字段
  val resultDF = winOrders
    .filter(col("haslastday") === 1 && col("daytotalconsumption") > col("lastday_total"))
    .withColumn("totalconsumption", col("daytotalconsumption") + col("lastday_total"))
    .withColumn("totalorder", col("daytotalorder") + col("lastday_count"))
    .withColumn("day", concat(date_format(date_sub(col("day"), 1), "yyyyMMdd"), lit("_"), date_format(col("day"), "yyyyMMdd")))
    .select(
      col("userid"),
      col("username"),
      col("day"),
      bround(col("totalconsumption"), 2).as("totalconsumption"),
      col("totalorder")
    )

  // 写入 MySQL 数据库 shtd_result.usercontinueorder 表
  exportToMysql(resultDF, "usercontinueorder")
}

MySQL 查询

SELECT *
FROM shtd_result.usercontinueorder
ORDER BY totalorder DESC, totalconsumption DESC, userid DESC
LIMIT 5;

结果如下所示:

由以上查询结果可知,由于提供的样例数据问题,基于该样例数据并没有找到连续两日下单的用户(不是何种原因比赛时也依旧没有找到数据)。

补充建议

前两个子任务可直接作为练习,指标计算类任务在正式比赛中题型变化较大,建议好好去看看spark教程和各种函数,题大体相近但是很多地方用的函数会变,题目也会多出一题,一定要多去了解各种函数


欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 1701220998@qq.com
导航页 GitHub