大数据应用技术模块b任务二

大数据竞赛(任务书一)-离线数据处理任务二:数据清洗

注:!!!合并集的两个表的变量名是错的自行修改

任务描述

使用Scala编写spark工程代码,将ods库中相应表数据全量抽取到Hive的dwd库中对应表中。表中有涉及到timestamp类型的,均要求按照yyyy-MM-dd HH:mm:ss,不记录毫秒数,若原数据中只有年月日,则在时分秒的位置添加00:00:00,添加之后使其符合yyyy-MM-dd HH:mm:ss

启动Hive Metastore服务

Spark读写Hive表,需要访问Metastore服务。在终端中执行如下命令:

hive --servicemetastore

这将保持Hive Metastore服务一直运行,请勿关闭终端。如果要将其作为后台服务启动,则可以使用下面的命令:

nohup hive--service metastore &

这个命令将启动Hive Metastore服务,并在后台持续运行

子任务1:清洗 user_info 数据并合并至 DWD 层

任务说明

请使用 Scala 编写 Spark 程序,实现以下目标:

  • 从 Hive 中 ods.user_info 表抽取前一日的分区数据(由任务一生成),并与 dwd.dim_user_info 表中最新分区的数据进行整合
  • 整合逻辑为按 id 字段进行合并,取 operate_time 降序排序后最新的一条记录;若 operate_time 为空,则使用 create_time 替代
  • 合并后需添加以下四列:
    • dwd_insert_user, dwd_modify_user 均固定为 "user1"
    • dwd_insert_time, dwd_modify_time 为当前任务执行时间
  • 若为新增数据(首次进入 dwd 层),插入上述四列;若为已有数据的更新,保留原 dwd_insert_time,仅修改其他字段与 dwd_modify_time
  • 数据写入目标为 dwd.dim_user_info 的分区表,分区字段为 etl_date,其值与 ods.user_info 中保持一致

最后,需使用 Hive CLI 执行以下命令,并将结果截图:

show partitions dwd.dim_user_info;

实现思路

该任务目标为“维表 upsert 操作”:

  1. 数据准备阶段
    • 抽取 ODS 分区数据(昨日),并填补 operate_time
    • 获取当前 DWD 表中对应分区的历史数据
  2. 数据整合
    • 合并历史与新数据
    • 利用窗口函数按 id 分组、operate_time 排序,保留每组中最新一条记录
  3. 标记插入/更新时间
    • 使用窗口函数:对同一个 id 的数据,统一设置最早的 insert_time,最新的 modify_time
    • 保证更新数据不覆盖原始插入时间
  4. 数据写入
    • 将合并结果写入 DWD 分区表

实现代码

def cleanUserInfo(spark: SparkSession): Unit = {
  import spark.implicits._
  import org.apache.spark.sql.functions._
  import org.apache.spark.sql.expressions.Window

  val etlDate = "20221031"

  // 1. 抽取 ODS 中的昨日分区数据
  val odsDF = spark.sql(
    s"""
       |SELECT *, 
       |       COALESCE(operate_time, create_time) AS final_operate_time
       |FROM ods.user_info 
       |WHERE etl_date = '$etlDate'
     """.stripMargin)

  // 2. 补充插入/修改标记字段
  val enrichedOdsDF = odsDF
    .withColumn("dwd_insert_user", lit("user1"))
    .withColumn("dwd_modify_user", lit("user1"))
    .withColumn("dwd_insert_time", current_timestamp())
    .withColumn("dwd_modify_time", current_timestamp())
    .withColumn("etl_date", lit(etlDate)) // 确保分区字段存在

  // 3. 获取 dwd 中同分区已有数据
  val dwdDF = spark.sql(s"SELECT * FROM dwd.dim_user_info WHERE etl_date = '$etlDate'")

  // 4. 字段对齐(为避免命名冲突手动选择字段)
  val unifiedCols = Seq("id", "login_name", "nick_name", "passwd", "name", "phone_num",
    "email", "head_img", "user_level", "birthday", "gender",
    "create_time", "final_operate_time",
    "dwd_insert_user", "dwd_insert_time", "dwd_modify_user", "dwd_modify_time", "etl_date")

  val unionDF = enrichedOdsDF.select(unifiedCols.map(col): _*)
    .unionByName(dwdDF.select(unifiedCols.map(col): _*))

  // 5. 定义窗口并筛选最新记录
  val windowById = Window.partitionBy("id").orderBy($"final_operate_time".desc)
  val windowStatic = Window.partitionBy("id")

  val mergedDF = unionDF
    .withColumn("_row_number", row_number().over(windowById))
    .withColumn("dwd_insert_time", min("dwd_insert_time").over(windowStatic))
    .withColumn("dwd_modify_time", max("dwd_modify_time").over(windowStatic))
    .where($"_row_number" === 1)
    .drop("_row_number")

  // 6. 写入 DWD 表,按 etl_date 分区
  mergedDF.write
    .mode("overwrite")
    .insertInto("dwd.dim_user_info")

  spark.sql("SELECT * FROM dwd.dim_user_info").show()
}

Hive CLI 查询

使用以下命令,截图结果并粘贴至报告中

-- 分区信息查询
show partitions dwd.dim_user_info;

子任务2:商品维表合并更新(sku_info

任务说明

请使用 Scala 编写 Spark 程序,对 ods.sku_info 表中前一日(任务一生成)分区数据进行清洗与处理,目标为:

  • 将新数据与 dwd.dim_sku_info 表当前已有数据进行整合
  • 按照 id 字段合并记录,依据 create_time 字段降序,选取每组中最新一条记录
  • 对于合并结果,添加以下字段:
    • dwd_insert_userdwd_modify_user 均为 "user1"
    • dwd_insert_timedwd_modify_time 为当前执行时间(首次插入时),若为更新则保留旧 insert_time
  • 写入至目标表 dwd.dim_sku_info,按 etl_date 分区,分区值与源表一致
  • 数据类型如有不匹配需完成转换
  • 最后通过 Hive CLI 查询字段 id、sku_desc、dwd_insert_user、dwd_modify_time、etl_date,条件为最新分区、id[15, 20] 之间,按 id 升序排序,截图结果供提交

实现思路

该任务核心仍为维度表的 Upsert 操作

  1. 数据抽取与清洗
    • 从 ODS 表读取指定分区数据
    • 添加 DWD 层要求的四个新增字段
    • 时间字段转为标准格式
  2. 加载 DWD 中对应分区的历史数据
  3. 使用窗口函数进行数据合并
    • 根据 id 分组
    • create_time 排序取最新记录
    • 处理插入时间保留、修改时间更新的逻辑
  4. 写入 DWD 分区表
  5. 执行 Hive CLI 查询

实现代码

def cleanSkuInfoTask(spark: SparkSession): Unit = {
  import spark.implicits._
  import org.apache.spark.sql.functions._
  import org.apache.spark.sql.expressions.Window

  val etlDate = "20221031"

  // 1. 读取昨日分区的 ODS 数据
  val odsSkuDF = spark.sql(s"SELECT * FROM ods.sku_info WHERE etl_date = '$etlDate'")

  // 2. 添加 DWD 所需字段(插入人、修改人、时间戳)
  val currentTime = current_timestamp()
  val enrichedSkuDF = odsSkuDF
    .withColumn("dwd_insert_user", lit("user1"))
    .withColumn("dwd_modify_user", lit("user1"))
    .withColumn("dwd_insert_time", currentTime)
    .withColumn("dwd_modify_time", currentTime)
    .withColumn("etl_date", lit(etlDate)) // 添加分区字段

  // 3. 加载 DWD 表中已有相同分区数据
  val existingDwdDF = spark.sql(s"SELECT * FROM dwd.dim_sku_info WHERE etl_date = '$etlDate'")

  // 4. 字段统一,准备合并
  val columns = Seq("id", "spu_id", "price", "sku_name", "sku_desc", "weight",
    "tm_id", "category3_id", "sku_default_img", "create_time",
    "dwd_insert_user", "dwd_insert_time", "dwd_modify_user", "dwd_modify_time", "etl_date")

  val combinedDF = enrichedSkuDF.select(columns.map(col): _*)
    .unionByName(existingDwdDF.select(columns.map(col): _*))

  // 5. 使用窗口函数保留每组 ID 下 create_time 最新的一条
  val winById = Window.partitionBy("id").orderBy(col("create_time").desc)
  val staticWin = Window.partitionBy("id")

  val finalDF = combinedDF
    .withColumn("_rn", row_number().over(winById))
    .withColumn("dwd_insert_time", min("dwd_insert_time").over(staticWin))
    .withColumn("dwd_modify_time", max("dwd_modify_time").over(staticWin))
    .filter($"_rn" === 1)
    .drop("_rn")

  // 6. 写入 DWD 表(按 etl_date 分区)
  finalDF.write
    .mode("overwrite")
    .insertInto("dwd.dim_sku_info")

  spark.table("dwd.dim_sku_info").show()
}

Hive CLI 查询

请在 Hive 命令行工具中执行以上 SQL ,截图结果并粘贴至报告中

SELECT id, sku_desc, dwd_insert_user, dwd_modify_time, etl_date
FROM dwd.dim_sku_info
WHERE etl_date = '20221031'
  AND id BETWEEN 15 AND 20
ORDER BY id;

子任务3:省份维度表合并更新(base_province

任务说明

请使用 Spark + Scala 对 ods.base_province 表中前一日分区(任务一生成)数据进行处理,并与 dwd.dim_province 表中已有分区数据进行合并更新

  • 按照 id 为主键对记录进行合并
  • 每组 id 下依据 create_time 字段降序排序,保留最新一条
  • 目标表为 dwd.dim_province,按 etl_date 分区,分区值与 ODS 表保持一致
  • 为结果数据添加以下四个字段:
    • dwd_insert_userdwd_modify_user 均为 "user1"
    • 若记录为首次写入,则 dwd_insert_timedwd_modify_time 均为当前操作时间
    • 若发生更新,保留原始插入时间,仅更新 dwd_modify_time
  • 最后在 Hive CLI 中查询 dwd.dim_province 最新分区的记录总数,并将查询截图粘贴至报告中

实现思路

本任务与子任务1、2处理方式基本一致:

  1. 抽取前一日分区数据
  2. 添加 DWD 层新增字段并统一数据格式
  3. 合并历史 DWD 数据,使用窗口函数根据 id 分组、create_time 排序
  4. 写入分区表
  5. 执行 Hive CLI 查询记录数

实现代码

def cleanProvinceTask(spark: SparkSession): Unit = {
  import spark.implicits._
  import org.apache.spark.sql.functions._
  import org.apache.spark.sql.expressions.Window

  val etlDate = "20221031"
  val currentTime = current_timestamp()

  // 1. 提取 ODS 数据
  val odsProvinceDF = spark.sql(
    s"SELECT * FROM ods.base_province WHERE etl_date = '$etlDate'"
  )

  // 2. 增加 DWD 插入/修改字段
  val enrichedDF = odsProvinceDF
    .withColumn("dwd_insert_user", lit("user1"))
    .withColumn("dwd_insert_time", currentTime)
    .withColumn("dwd_modify_user", lit("user1"))
    .withColumn("dwd_modify_time", currentTime)
    .withColumn("etl_date", lit(etlDate))

  // 3. 读取 DWD 当前分区的历史数据
  val existingDF = spark.sql(
    s"SELECT * FROM dwd.dim_province WHERE etl_date = '$etlDate'"
  )

  // 4. 字段对齐并合并数据集
  val cols = Seq("id", "name", "region_id", "area_code", "iso_code", "create_time",
    "dwd_insert_user", "dwd_insert_time", "dwd_modify_user", "dwd_modify_time", "etl_date")

  val mergedDF = enrichedDF.select(cols.map(col): _*)
    .unionByName(existingDF.select(cols.map(col): _*))

  // 5. 执行 Upsert:每个 ID 保留最新记录
  val wById = Window.partitionBy("id").orderBy(col("create_time").desc)
  val wStatic = Window.partitionBy("id")

  val resultDF = mergedDF
    .withColumn("_rn", row_number().over(wById))
    .withColumn("dwd_insert_time", min("dwd_insert_time").over(wStatic))
    .withColumn("dwd_modify_time", max("dwd_modify_time").over(wStatic))
    .filter($"_rn" === 1)
    .drop("_rn")

  // 6. 写入 DWD 分区表
  resultDF.write
    .mode("overwrite")
    .insertInto("dwd.dim_province")

  // 可选:展示部分结果
  spark.table("dwd.dim_province").show()
}

Hive CLI 查询

使用 Hive 命令行工具查询最新分区的记录条数,截图结果并粘贴至报告中:

SELECT COUNT(*) 
FROM dwd.dim_province 
WHERE etl_date = '20221031';

子任务4:区域维度表增量合并(base_region

任务说明

ods.base_region 表中 前一日分区数据 合并入 DWD 层 dwd.dim_region 表:

  • 根据 id 作为主键进行合并操作;
  • 每组 id 记录以 create_time 字段降序排序,保留最新一条;
  • 目标表按 etl_date 分区,分区值与源表保持一致;
  • 补充以下四列信息:
    • dwd_insert_userdwd_modify_user 赋值为 "user1"
    • 首次入仓记录设置 dwd_insert_timedwd_modify_time 为当前时间;
    • 若发生更新,仅变更 dwd_modify_time
  • 最后在 Hive CLI 中统计分区数据条数,并截图保存。

实现思路

子任务4本质逻辑与前三个任务一致,不再赘述

实现代码

def cleanRegionTask(spark: SparkSession): Unit = {
  import spark.implicits._
  import org.apache.spark.sql.functions._
  import org.apache.spark.sql.expressions.Window

  val etlDate = "20221031"
  val currentTime = current_timestamp()

  // 1. 读取 ODS 数据
  val odsRegionDF = spark.sql(
    s"SELECT * FROM ods.base_region WHERE etl_date = '$etlDate'"
  )

  // 2. 增加 DWD 所需的审计字段
  val withMetaCols = odsRegionDF
    .withColumn("dwd_insert_user", lit("user1"))
    .withColumn("dwd_insert_time", currentTime)
    .withColumn("dwd_modify_user", lit("user1"))
    .withColumn("dwd_modify_time", currentTime)
    .withColumn("etl_date", lit(etlDate))

  // 3. 获取目标表中的旧数据
  val existingDF = spark.sql(
    s"SELECT * FROM dwd.dim_region WHERE etl_date = '$etlDate'"
  )

  // 4. 字段统一并合并新旧数据
  val cols = Seq("id", "region_name", "create_time",
    "dwd_insert_user", "dwd_insert_time", "dwd_modify_user", "dwd_modify_time", "etl_date")

  val unifiedDF = withMetaCols.select(cols.map(col): _*)
    .unionByName(existingDF.select(cols.map(col): _*))

  // 5. 利用窗口函数保留每组 ID 下 create_time 最新的记录
  val wSort = Window.partitionBy("id").orderBy($"create_time".desc)
  val wStatic = Window.partitionBy("id")

  val finalDF = unifiedDF
    .withColumn("_rn", row_number().over(wSort))
    .withColumn("dwd_insert_time", min("dwd_insert_time").over(wStatic))
    .withColumn("dwd_modify_time", max("dwd_modify_time").over(wStatic))
    .filter($"_rn" === 1)
    .drop("_rn")

  // 6. 写入到 DWD 分区表
  finalDF.write
    .mode("overwrite")
    .insertInto("dwd.dim_region")

  // 写入后的数据
  spark.table("dwd.dim_region").show()
}

Hive CLI 查询

截图结果并粘贴至报告中

SELECT COUNT(*) 
FROM dwd.dim_region 
WHERE etl_date = '20221031';

子任务5:事实表数据抽取与清洗

任务说明

将 ODS 层的 order_info 表中“昨天”分区(由任务一生成)中的数据抽取到 DWD 层的 fact_order_info 表中。该目标表为动态分区表,分区字段为 etl_date,类型为 String,其值来自 create_time 字段并转换为 yyyyMMdd 格式。

在抽取过程中,需要处理以下字段:

  • operate_time 为空,则用 create_time 替代
  • 添加四个字段:
    • dwd_insert_userdwd_modify_user 均设为 "user1"
    • dwd_insert_timedwd_modify_time 设置为当前时间
  • 所有字段应根据需求进行数据类型转换
  • 最终结果写入 dwd.fact_order_info 表中

完成后,使用 Hive CLI 执行以下命令,截图结果并粘贴入报告:

show partitions dwd.fact_order_info;

实现思路

本任务为 DWD 层的事实表构建任务,与前四个以维表为主的子任务不同。目标是实现对原始订单数据的加工处理与入仓,确保数据标准化

  • 分区字段的提取与格式化
  • 空值处理与字段新增
  • 使用 Spark 动态分区写入 Hive 表

实现代码

def cleanTask5(spark: SparkSession): Unit = {
  import spark.implicits._
  import org.apache.spark.sql.functions._

  val today = java.time.LocalDate.now()
  val yesterday = today.minusDays(1).format(java.time.format.DateTimeFormatter.ofPattern("yyyyMMdd"))

  val odsDF = spark.table("ods.order_info")
    .filter(to_date($"create_time") === expr(s"date_sub(current_date(), 1)"))
    .withColumn("etl_date", date_format($"create_time", "yyyyMMdd"))
    .withColumn("operate_time", coalesce($"operate_time", $"create_time"))
    .withColumn("dwd_insert_user", lit("user1"))
    .withColumn("dwd_insert_time", current_timestamp())
    .withColumn("dwd_modify_user", lit("user1"))
    .withColumn("dwd_modify_time", current_timestamp())

  odsDF
    .write
    .mode("overwrite")
    .format("hive")
    .partitionBy("etl_date")
    .insertInto("dwd.fact_order_info")

  println("任务完成,执行 Hive CLI 查看分区:")
  println("show partitions dwd.fact_order_info;")
}

如果你的集群配置支持动态分区,请确保执行前设置以下参数:

set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;

Hive CLI 查询

截图结果并粘贴至报告中

show partitions dwd.fact_order_info;

子任务6:事实表数据抽取与加工

任务描述

将 ODS 层 order_detail 表中昨天分区的数据抽取到 DWD 层 fact_order_detail 表中。该目标表为动态分区表,分区字段为 etl_date(类型为 String),取 create_time 的日期并格式化为 yyyyMMdd

同时,为数据增加以下字段:

  • dwd_insert_userdwd_modify_user 均为固定值 "user1"
  • dwd_insert_timedwd_modify_time 为当前系统时间

完成后,使用 Hive CLI 执行以下命令并将结果截图粘贴至任务报告中:

show partitions dwd.fact_order_detail;

实现思路

该子任务主要涉及从 ODS 层向 DWD 层事实表进行数据抽取与加工操作。核心操作是使用 Spark SQL 实现动态分区插入(insertInto),并在写入过程中添加字段与格式转换

  • 依据 create_time 派生分区字段 etl_date
  • 使用 Spark 动态分区写入 Hive 表
  • 数据写入模式使用 overwrite 保证分区可重复执行
  • 插入字段中涉及当前时间获取与静态值填充

实现代码

def cleanTask6(spark: SparkSession): Unit = {
  spark.table("ods.order_detail")
    .withColumn("etl_date", date_format(col("create_time"), "yyyyMMdd")) // 生成分区字段
    .withColumn("dwd_insert_user", lit("user1"))
    .withColumn("dwd_insert_time", current_timestamp())
    .withColumn("dwd_modify_user", lit("user1"))
    .withColumn("dwd_modify_time", current_timestamp())
    .write
    .mode("overwrite")                       // 支持覆盖相同分区
    .partitionBy("etl_date")                 // 动态分区字段
    .insertInto("dwd.fact_order_detail")     // 插入DWD事实表
}

Hive CLI查询

查询分区,截图结果并粘贴至报告中

show partitions dwd.fact_order_detail;

主方法入口

整体框架如下:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
object CleaningJob {
  def main(args: Array[String]): Unit = {
      // 创建SparkSession实例
      val spark = SparkSession.builder()
          .master("local[*]")
          .appName("Data Clean")
          // 打开Hive动态分区的标志
          .config("hive.exec.dynamic.partition", "true")
          .config("hive.exec.dynamic.partition.mode","nonstrict")
          // 需要根据分区值,覆盖原来的分区时,需要配置的参数
          .config("spark.sql.sources.partitionOverwriteMode","dynamic")
          .config("spark.sql.parquet.writeLegacyFormat","true")     //开启遗留格式,否则可能会因为parquet格式与原格式不兼容
          .enableHiveSupport()
          .getOrCreate()
    // 子任务调用
    cleaningTask1(spark)
    cleanTask2(spark)
    cleanTask3(spark)
    cleanTask4(spark)
    cleanTask5(spark)
    cleanTask6(spark)
  }
}

欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 1701220998@qq.com
导航页 GitHub