大数据竞赛(任务书一)-离线数据处理任务二:数据清洗
注:!!!合并集的两个表的变量名是错的自行修改
任务描述
使用Scala编写spark工程代码,将ods库中相应表数据全量抽取到Hive的dwd库中对应表中。表中有涉及到timestamp类型的,均要求按照yyyy-MM-dd HH:mm:ss,不记录毫秒数,若原数据中只有年月日,则在时分秒的位置添加00:00:00,添加之后使其符合yyyy-MM-dd HH:mm:ss
启动Hive Metastore服务
Spark读写Hive表,需要访问Metastore服务。在终端中执行如下命令:
hive --servicemetastore
这将保持Hive Metastore服务一直运行,请勿关闭终端。如果要将其作为后台服务启动,则可以使用下面的命令:
nohup hive--service metastore &
这个命令将启动Hive Metastore服务,并在后台持续运行
子任务1:清洗 user_info 数据并合并至 DWD 层
任务说明
请使用 Scala 编写 Spark 程序,实现以下目标:
- 从 Hive 中
ods.user_info表抽取前一日的分区数据(由任务一生成),并与dwd.dim_user_info表中最新分区的数据进行整合 - 整合逻辑为按
id字段进行合并,取operate_time降序排序后最新的一条记录;若operate_time为空,则使用create_time替代 - 合并后需添加以下四列:
dwd_insert_user,dwd_modify_user均固定为"user1"dwd_insert_time,dwd_modify_time为当前任务执行时间
- 若为新增数据(首次进入 dwd 层),插入上述四列;若为已有数据的更新,保留原
dwd_insert_time,仅修改其他字段与dwd_modify_time - 数据写入目标为
dwd.dim_user_info的分区表,分区字段为etl_date,其值与ods.user_info中保持一致
最后,需使用 Hive CLI 执行以下命令,并将结果截图:
show partitions dwd.dim_user_info;
实现思路
该任务目标为“维表 upsert 操作”:
- 数据准备阶段
- 抽取 ODS 分区数据(昨日),并填补
operate_time - 获取当前 DWD 表中对应分区的历史数据
- 抽取 ODS 分区数据(昨日),并填补
- 数据整合
- 合并历史与新数据
- 利用窗口函数按
id分组、operate_time排序,保留每组中最新一条记录
- 标记插入/更新时间
- 使用窗口函数:对同一个
id的数据,统一设置最早的insert_time,最新的modify_time - 保证更新数据不覆盖原始插入时间
- 使用窗口函数:对同一个
- 数据写入
- 将合并结果写入 DWD 分区表
实现代码
def cleanUserInfo(spark: SparkSession): Unit = {
import spark.implicits._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val etlDate = "20221031"
// 1. 抽取 ODS 中的昨日分区数据
val odsDF = spark.sql(
s"""
|SELECT *,
| COALESCE(operate_time, create_time) AS final_operate_time
|FROM ods.user_info
|WHERE etl_date = '$etlDate'
""".stripMargin)
// 2. 补充插入/修改标记字段
val enrichedOdsDF = odsDF
.withColumn("dwd_insert_user", lit("user1"))
.withColumn("dwd_modify_user", lit("user1"))
.withColumn("dwd_insert_time", current_timestamp())
.withColumn("dwd_modify_time", current_timestamp())
.withColumn("etl_date", lit(etlDate)) // 确保分区字段存在
// 3. 获取 dwd 中同分区已有数据
val dwdDF = spark.sql(s"SELECT * FROM dwd.dim_user_info WHERE etl_date = '$etlDate'")
// 4. 字段对齐(为避免命名冲突手动选择字段)
val unifiedCols = Seq("id", "login_name", "nick_name", "passwd", "name", "phone_num",
"email", "head_img", "user_level", "birthday", "gender",
"create_time", "final_operate_time",
"dwd_insert_user", "dwd_insert_time", "dwd_modify_user", "dwd_modify_time", "etl_date")
val unionDF = enrichedOdsDF.select(unifiedCols.map(col): _*)
.unionByName(dwdDF.select(unifiedCols.map(col): _*))
// 5. 定义窗口并筛选最新记录
val windowById = Window.partitionBy("id").orderBy($"final_operate_time".desc)
val windowStatic = Window.partitionBy("id")
val mergedDF = unionDF
.withColumn("_row_number", row_number().over(windowById))
.withColumn("dwd_insert_time", min("dwd_insert_time").over(windowStatic))
.withColumn("dwd_modify_time", max("dwd_modify_time").over(windowStatic))
.where($"_row_number" === 1)
.drop("_row_number")
// 6. 写入 DWD 表,按 etl_date 分区
mergedDF.write
.mode("overwrite")
.insertInto("dwd.dim_user_info")
spark.sql("SELECT * FROM dwd.dim_user_info").show()
}
Hive CLI 查询
使用以下命令,截图结果并粘贴至报告中
-- 分区信息查询
show partitions dwd.dim_user_info;
子任务2:商品维表合并更新(sku_info)
任务说明
请使用 Scala 编写 Spark 程序,对 ods.sku_info 表中前一日(任务一生成)分区数据进行清洗与处理,目标为:
- 将新数据与
dwd.dim_sku_info表当前已有数据进行整合 - 按照
id字段合并记录,依据create_time字段降序,选取每组中最新一条记录 - 对于合并结果,添加以下字段:
dwd_insert_user与dwd_modify_user均为"user1"dwd_insert_time和dwd_modify_time为当前执行时间(首次插入时),若为更新则保留旧insert_time
- 写入至目标表
dwd.dim_sku_info,按etl_date分区,分区值与源表一致 - 数据类型如有不匹配需完成转换
- 最后通过 Hive CLI 查询字段
id、sku_desc、dwd_insert_user、dwd_modify_time、etl_date,条件为最新分区、id在[15, 20]之间,按id升序排序,截图结果供提交
实现思路
该任务核心仍为维度表的 Upsert 操作:
- 数据抽取与清洗:
- 从 ODS 表读取指定分区数据
- 添加 DWD 层要求的四个新增字段
- 时间字段转为标准格式
- 加载 DWD 中对应分区的历史数据
- 使用窗口函数进行数据合并
- 根据
id分组 - 以
create_time排序取最新记录 - 处理插入时间保留、修改时间更新的逻辑
- 根据
- 写入 DWD 分区表
- 执行 Hive CLI 查询
实现代码
def cleanSkuInfoTask(spark: SparkSession): Unit = {
import spark.implicits._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val etlDate = "20221031"
// 1. 读取昨日分区的 ODS 数据
val odsSkuDF = spark.sql(s"SELECT * FROM ods.sku_info WHERE etl_date = '$etlDate'")
// 2. 添加 DWD 所需字段(插入人、修改人、时间戳)
val currentTime = current_timestamp()
val enrichedSkuDF = odsSkuDF
.withColumn("dwd_insert_user", lit("user1"))
.withColumn("dwd_modify_user", lit("user1"))
.withColumn("dwd_insert_time", currentTime)
.withColumn("dwd_modify_time", currentTime)
.withColumn("etl_date", lit(etlDate)) // 添加分区字段
// 3. 加载 DWD 表中已有相同分区数据
val existingDwdDF = spark.sql(s"SELECT * FROM dwd.dim_sku_info WHERE etl_date = '$etlDate'")
// 4. 字段统一,准备合并
val columns = Seq("id", "spu_id", "price", "sku_name", "sku_desc", "weight",
"tm_id", "category3_id", "sku_default_img", "create_time",
"dwd_insert_user", "dwd_insert_time", "dwd_modify_user", "dwd_modify_time", "etl_date")
val combinedDF = enrichedSkuDF.select(columns.map(col): _*)
.unionByName(existingDwdDF.select(columns.map(col): _*))
// 5. 使用窗口函数保留每组 ID 下 create_time 最新的一条
val winById = Window.partitionBy("id").orderBy(col("create_time").desc)
val staticWin = Window.partitionBy("id")
val finalDF = combinedDF
.withColumn("_rn", row_number().over(winById))
.withColumn("dwd_insert_time", min("dwd_insert_time").over(staticWin))
.withColumn("dwd_modify_time", max("dwd_modify_time").over(staticWin))
.filter($"_rn" === 1)
.drop("_rn")
// 6. 写入 DWD 表(按 etl_date 分区)
finalDF.write
.mode("overwrite")
.insertInto("dwd.dim_sku_info")
spark.table("dwd.dim_sku_info").show()
}
Hive CLI 查询
请在 Hive 命令行工具中执行以上 SQL ,截图结果并粘贴至报告中
SELECT id, sku_desc, dwd_insert_user, dwd_modify_time, etl_date
FROM dwd.dim_sku_info
WHERE etl_date = '20221031'
AND id BETWEEN 15 AND 20
ORDER BY id;
子任务3:省份维度表合并更新(base_province)
任务说明
请使用 Spark + Scala 对 ods.base_province 表中前一日分区(任务一生成)数据进行处理,并与 dwd.dim_province 表中已有分区数据进行合并更新
- 按照
id为主键对记录进行合并 - 每组
id下依据create_time字段降序排序,保留最新一条 - 目标表为
dwd.dim_province,按etl_date分区,分区值与 ODS 表保持一致 - 为结果数据添加以下四个字段:
dwd_insert_user和dwd_modify_user均为"user1"- 若记录为首次写入,则
dwd_insert_time和dwd_modify_time均为当前操作时间 - 若发生更新,保留原始插入时间,仅更新
dwd_modify_time
- 最后在 Hive CLI 中查询
dwd.dim_province最新分区的记录总数,并将查询截图粘贴至报告中
实现思路
本任务与子任务1、2处理方式基本一致:
- 抽取前一日分区数据
- 添加 DWD 层新增字段并统一数据格式
- 合并历史 DWD 数据,使用窗口函数根据
id分组、create_time排序 - 写入分区表
- 执行 Hive CLI 查询记录数
实现代码
def cleanProvinceTask(spark: SparkSession): Unit = {
import spark.implicits._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val etlDate = "20221031"
val currentTime = current_timestamp()
// 1. 提取 ODS 数据
val odsProvinceDF = spark.sql(
s"SELECT * FROM ods.base_province WHERE etl_date = '$etlDate'"
)
// 2. 增加 DWD 插入/修改字段
val enrichedDF = odsProvinceDF
.withColumn("dwd_insert_user", lit("user1"))
.withColumn("dwd_insert_time", currentTime)
.withColumn("dwd_modify_user", lit("user1"))
.withColumn("dwd_modify_time", currentTime)
.withColumn("etl_date", lit(etlDate))
// 3. 读取 DWD 当前分区的历史数据
val existingDF = spark.sql(
s"SELECT * FROM dwd.dim_province WHERE etl_date = '$etlDate'"
)
// 4. 字段对齐并合并数据集
val cols = Seq("id", "name", "region_id", "area_code", "iso_code", "create_time",
"dwd_insert_user", "dwd_insert_time", "dwd_modify_user", "dwd_modify_time", "etl_date")
val mergedDF = enrichedDF.select(cols.map(col): _*)
.unionByName(existingDF.select(cols.map(col): _*))
// 5. 执行 Upsert:每个 ID 保留最新记录
val wById = Window.partitionBy("id").orderBy(col("create_time").desc)
val wStatic = Window.partitionBy("id")
val resultDF = mergedDF
.withColumn("_rn", row_number().over(wById))
.withColumn("dwd_insert_time", min("dwd_insert_time").over(wStatic))
.withColumn("dwd_modify_time", max("dwd_modify_time").over(wStatic))
.filter($"_rn" === 1)
.drop("_rn")
// 6. 写入 DWD 分区表
resultDF.write
.mode("overwrite")
.insertInto("dwd.dim_province")
// 可选:展示部分结果
spark.table("dwd.dim_province").show()
}
Hive CLI 查询
使用 Hive 命令行工具查询最新分区的记录条数,截图结果并粘贴至报告中:
SELECT COUNT(*)
FROM dwd.dim_province
WHERE etl_date = '20221031';
子任务4:区域维度表增量合并(base_region)
任务说明
将 ods.base_region 表中 前一日分区数据 合并入 DWD 层 dwd.dim_region 表:
- 根据
id作为主键进行合并操作; - 每组
id记录以create_time字段降序排序,保留最新一条; - 目标表按
etl_date分区,分区值与源表保持一致; - 补充以下四列信息:
dwd_insert_user与dwd_modify_user赋值为"user1";- 首次入仓记录设置
dwd_insert_time与dwd_modify_time为当前时间; - 若发生更新,仅变更
dwd_modify_time;
- 最后在 Hive CLI 中统计分区数据条数,并截图保存。
实现思路
子任务4本质逻辑与前三个任务一致,不再赘述
实现代码
def cleanRegionTask(spark: SparkSession): Unit = {
import spark.implicits._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val etlDate = "20221031"
val currentTime = current_timestamp()
// 1. 读取 ODS 数据
val odsRegionDF = spark.sql(
s"SELECT * FROM ods.base_region WHERE etl_date = '$etlDate'"
)
// 2. 增加 DWD 所需的审计字段
val withMetaCols = odsRegionDF
.withColumn("dwd_insert_user", lit("user1"))
.withColumn("dwd_insert_time", currentTime)
.withColumn("dwd_modify_user", lit("user1"))
.withColumn("dwd_modify_time", currentTime)
.withColumn("etl_date", lit(etlDate))
// 3. 获取目标表中的旧数据
val existingDF = spark.sql(
s"SELECT * FROM dwd.dim_region WHERE etl_date = '$etlDate'"
)
// 4. 字段统一并合并新旧数据
val cols = Seq("id", "region_name", "create_time",
"dwd_insert_user", "dwd_insert_time", "dwd_modify_user", "dwd_modify_time", "etl_date")
val unifiedDF = withMetaCols.select(cols.map(col): _*)
.unionByName(existingDF.select(cols.map(col): _*))
// 5. 利用窗口函数保留每组 ID 下 create_time 最新的记录
val wSort = Window.partitionBy("id").orderBy($"create_time".desc)
val wStatic = Window.partitionBy("id")
val finalDF = unifiedDF
.withColumn("_rn", row_number().over(wSort))
.withColumn("dwd_insert_time", min("dwd_insert_time").over(wStatic))
.withColumn("dwd_modify_time", max("dwd_modify_time").over(wStatic))
.filter($"_rn" === 1)
.drop("_rn")
// 6. 写入到 DWD 分区表
finalDF.write
.mode("overwrite")
.insertInto("dwd.dim_region")
// 写入后的数据
spark.table("dwd.dim_region").show()
}
Hive CLI 查询
截图结果并粘贴至报告中
SELECT COUNT(*)
FROM dwd.dim_region
WHERE etl_date = '20221031';
子任务5:事实表数据抽取与清洗
任务说明
将 ODS 层的 order_info 表中“昨天”分区(由任务一生成)中的数据抽取到 DWD 层的 fact_order_info 表中。该目标表为动态分区表,分区字段为 etl_date,类型为 String,其值来自 create_time 字段并转换为 yyyyMMdd 格式。
在抽取过程中,需要处理以下字段:
- 若
operate_time为空,则用create_time替代 - 添加四个字段:
dwd_insert_user、dwd_modify_user均设为"user1"dwd_insert_time、dwd_modify_time设置为当前时间
- 所有字段应根据需求进行数据类型转换
- 最终结果写入
dwd.fact_order_info表中
完成后,使用 Hive CLI 执行以下命令,截图结果并粘贴入报告:
show partitions dwd.fact_order_info;
实现思路
本任务为 DWD 层的事实表构建任务,与前四个以维表为主的子任务不同。目标是实现对原始订单数据的加工处理与入仓,确保数据标准化
- 分区字段的提取与格式化
- 空值处理与字段新增
- 使用 Spark 动态分区写入 Hive 表
实现代码
def cleanTask5(spark: SparkSession): Unit = {
import spark.implicits._
import org.apache.spark.sql.functions._
val today = java.time.LocalDate.now()
val yesterday = today.minusDays(1).format(java.time.format.DateTimeFormatter.ofPattern("yyyyMMdd"))
val odsDF = spark.table("ods.order_info")
.filter(to_date($"create_time") === expr(s"date_sub(current_date(), 1)"))
.withColumn("etl_date", date_format($"create_time", "yyyyMMdd"))
.withColumn("operate_time", coalesce($"operate_time", $"create_time"))
.withColumn("dwd_insert_user", lit("user1"))
.withColumn("dwd_insert_time", current_timestamp())
.withColumn("dwd_modify_user", lit("user1"))
.withColumn("dwd_modify_time", current_timestamp())
odsDF
.write
.mode("overwrite")
.format("hive")
.partitionBy("etl_date")
.insertInto("dwd.fact_order_info")
println("任务完成,执行 Hive CLI 查看分区:")
println("show partitions dwd.fact_order_info;")
}
如果你的集群配置支持动态分区,请确保执行前设置以下参数:
set hive.exec.dynamic.partition=true; set hive.exec.dynamic.partition.mode=nonstrict;
Hive CLI 查询
截图结果并粘贴至报告中
show partitions dwd.fact_order_info;
子任务6:事实表数据抽取与加工
任务描述
将 ODS 层 order_detail 表中昨天分区的数据抽取到 DWD 层 fact_order_detail 表中。该目标表为动态分区表,分区字段为 etl_date(类型为 String),取 create_time 的日期并格式化为 yyyyMMdd
同时,为数据增加以下字段:
dwd_insert_user、dwd_modify_user均为固定值"user1"dwd_insert_time、dwd_modify_time为当前系统时间
完成后,使用 Hive CLI 执行以下命令并将结果截图粘贴至任务报告中:
show partitions dwd.fact_order_detail;
实现思路
该子任务主要涉及从 ODS 层向 DWD 层事实表进行数据抽取与加工操作。核心操作是使用 Spark SQL 实现动态分区插入(insertInto),并在写入过程中添加字段与格式转换
- 依据
create_time派生分区字段etl_date - 使用 Spark 动态分区写入 Hive 表
- 数据写入模式使用
overwrite保证分区可重复执行 - 插入字段中涉及当前时间获取与静态值填充
实现代码
def cleanTask6(spark: SparkSession): Unit = {
spark.table("ods.order_detail")
.withColumn("etl_date", date_format(col("create_time"), "yyyyMMdd")) // 生成分区字段
.withColumn("dwd_insert_user", lit("user1"))
.withColumn("dwd_insert_time", current_timestamp())
.withColumn("dwd_modify_user", lit("user1"))
.withColumn("dwd_modify_time", current_timestamp())
.write
.mode("overwrite") // 支持覆盖相同分区
.partitionBy("etl_date") // 动态分区字段
.insertInto("dwd.fact_order_detail") // 插入DWD事实表
}
Hive CLI查询
查询分区,截图结果并粘贴至报告中
show partitions dwd.fact_order_detail;
主方法入口
整体框架如下:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
object CleaningJob {
def main(args: Array[String]): Unit = {
// 创建SparkSession实例
val spark = SparkSession.builder()
.master("local[*]")
.appName("Data Clean")
// 打开Hive动态分区的标志
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode","nonstrict")
// 需要根据分区值,覆盖原来的分区时,需要配置的参数
.config("spark.sql.sources.partitionOverwriteMode","dynamic")
.config("spark.sql.parquet.writeLegacyFormat","true") //开启遗留格式,否则可能会因为parquet格式与原格式不兼容
.enableHiveSupport()
.getOrCreate()
// 子任务调用
cleaningTask1(spark)
cleanTask2(spark)
cleanTask3(spark)
cleanTask4(spark)
cleanTask5(spark)
cleanTask6(spark)
}
}
欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 1701220998@qq.com