大数据竞赛(任务书一)-离线数据处理任务一:数据抽取
任务描述
使用 Scala 编写 Spark 程序,实现 MySQL 中 shtd_store 库下的表:
user_infosku_infobase_provincebase_regionorder_infoorder_detail
的增量数据提取,并将其写入 Hive 中的 ods 层对应分区表
启动Hive Metastore服务
Spark读写Hive表,需要访问Metastore服务。在终端中执行如下命令:
hive --servicemetastore
这将保持Hive Metastore服务一直运行,请勿关闭终端。如果要将其作为后台服务启动,则可以使用下面的命令:
nohup hive--service metastore &
这个命令将启动Hive Metastore服务,并在后台持续运行
子任务1:从 MySQL 增量同步 user_info 数据至 Hive ODS 层
任务说明
请从 shtd_store 数据库中的 user_info 表中提取新增记录,写入 Hive 中的 ods.user_info 表(已存在)。增量判断依据如下:
- 增量字段计算:对于每一行记录,取
operate_time和create_time中较晚的时间,作为当前记录的“变更时间” - 增量抽取逻辑:仅抽取变更时间晚于 Hive 中现有
user_info表的最大变更时间的记录(即:只抽取新增或更新的数据) - 字段结构保持与原表一致
- 需添加一个 静态分区字段
etl_date,类型为String,格式为yyyyMMdd,其值设定为比赛日的前一天 - 数据写入 Hive 后,使用 Hive CLI 执行以下命令查看分区情况,并将截图写入报告:
show partitions ods.user_info;
实现思路
该任务为标准的 增量抽取 + 分区加载 类型离线 ETL
- 从 MySQL 抽取数据
- 构造分区字段
- 判断增量记录
- 使用
.insertInto()实现分区追加 - 使用 Hive CLI 查看分区信息
这个子任务需要了解的技术:
- Spark教程:”构造DataFrame”中“使用JDBC从数据库创建DataFrame”部分。
- Spark教程:“读写Hive表”部分
- 首先定义一个ETL工具类,封装ETL方法实现
import org.apache.spark.sql._
object EtlUtil {
/** 从 MySQL 数据库加载数据,返回 DataFrame */
def extractFromJDBC(spark: SparkSession, jdbcOptions: Map[String, String]): Dataset[Row] = {
spark.read
.format("jdbc")
.options(jdbcOptions)
.load()
}
/** 将 DataFrame 写入 Hive 表中,可指定静态分区 */
def appendToHive(spark: SparkSession, df: Dataset[Row], hiveOptions: Map[String, String]): Unit = {
val db = hiveOptions("db")
val table = hiveOptions("tb")
val partitionColumn = hiveOptions.get("partitionColumn")
spark.sql(s"USE $db")
partitionColumn match {
case Some(partCol) =>
df.write
.mode("append")
.format("parquet")
.partitionBy(partCol)
.insertInto(table)
case None =>
df.write
.mode("append")
.format("parquet")
.insertInto(table)
}
}
}
实现代码
- 模拟存量数据加载(etlTask11):
- 因为任务需求说明要求实现增量抽取,但官方并未给出存量数据说明。所以这里自行先抽取部分数据写入Hive分区表用来模拟存量数据:
def etlTask11(spark: SparkSession): Unit = {
val jdbcUrl = "jdbc:mysql://localhost:3306/shtd_store?useSSL=false"
val stockSql =
"""
|(SELECT * FROM user_info
| WHERE create_time = '2020-04-26 18:57:55'
| AND (operate_time <= '2020-04-26 18:57:55' OR operate_time IS NULL)) AS tmp
""".stripMargin
val jdbcOptions = Map(
"url" -> jdbcUrl,
"dbtable" -> stockSql,
"user" -> "root",
"password" -> "admin"
)
val rawDF = EtlUtil.extractFromJDBC(spark, jdbcOptions)
val partitionedDF = rawDF.withColumn("etl_date", lit("20221031"))
val hiveOptions = Map("db" -> "ods", "tb" -> "user_info", "partitionColumn" -> "etl_date")
EtlUtil.appendToHive(spark, partitionedDF, hiveOptions)
spark.table("ods.user_info").show()
}
- 增量数据同步(etlTask12):
def etlTask12(spark: SparkSession): Unit = {
// 1. 获取 ODS 表中最大时间
val maxTimeDF = spark.sql(
"SELECT greatest(max(create_time), max(operate_time)) AS max_time FROM ods.user_info"
)
val maxTime = maxTimeDF.first.getAs[java.sql.Timestamp]("max_time")
// 2. 构建增量查询 SQL
val jdbcUrl = "jdbc:mysql://localhost:3306/shtd_store?useSSL=false"
val incSql =
s"(SELECT * FROM user_info WHERE create_time > '$maxTime' OR operate_time > '$maxTime') AS tmp"
val jdbcOptions = Map(
"url" -> jdbcUrl,
"dbtable" -> incSql,
"user" -> "root",
"password" -> "admin"
)
val incDF = EtlUtil.extractFromJDBC(spark, jdbcOptions)
val dfWithPartition = incDF.withColumn("etl_date", lit("20221031"))
val hiveOptions = Map("db" -> "ods", "tb" -> "user_info", "partitionColumn" -> "etl_date")
EtlUtil.appendToHive(spark, dfWithPartition, hiveOptions)
// 打印加载后数据总量
println(s"Total records in ods.user_info: ${spark.table("ods.user_info").count()}")
}
主程序入口
object EtlJob {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("UserInfo ETL Job")
.master("local[*]")
.enableHiveSupport()
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.config("spark.sql.sources.partitionOverwriteMode", "dynamic")
.config("spark.sql.parquet.writeLegacyFormat", "true")
.getOrCreate()
// 运行子任务1
etlTask11(spark) // 模拟存量数据
etlTask12(spark) // 实现增量同步
}
}
Hive CLI 查询
-- 查看 user_info 表的分区情况
show partitions ods.user_info;
子任务2:sku_info表增量数据抽取与静态分区加载
任务说明
本任务要求从 MySQL 数据库 shtd_store 中提取 sku_info 表的增量数据,并加载至 Hive 的 ods.sku_info 分区表中。
以该表字段
create_time作为增量判断依据,仅抽取新增记录加载数据时需设置静态分区字段
etl_date(格式为yyyyMMdd,为比赛日前一日)字段结构保持不变
数据写入 Hive 后,使用 Hive CLI 执行以下命令查看分区情况,并将截图写入报告:
show partitions ods.sku_info;
实现思路
核心为时间字段的增量控制与静态分区管理,可以拆分为以下几个步骤:
- 构建初始模拟数据:插入一条历史记录到 Hive,模拟已有存量
- 计算最大时间点:从 Hive 查询当前
create_time最大值 - 从 MySQL 增量抽取数据:仅加载
create_time大于该时间的记录 - 添加静态分区列并写入 Hive
- 验证写入是否成功:通过查询 Hive 分区表展示结果
实现代码
- 构造存量数据并写入 Hive(模拟初始数据写入)
def etlTask21(spark: SparkSession): Unit = {
import spark.implicits._
import org.apache.spark.sql.types._
val baseData = Seq(
(0L, 0L, 1000.00, "测试商品", "测试商品描述", 0.06, 0L, 0L, "http://test", "2021-01-01 12:00:00")
)
val columnNames = Seq("id", "spu_id", "price", "sku_name", "sku_desc", "weight", "tm_id", "category3_id", "sku_default_img", "create_time")
val df = baseData.toDF(columnNames: _*)
.withColumn("price", $"price".cast(DecimalType(10, 2)))
.withColumn("weight", $"weight".cast(DecimalType(10, 2)))
.withColumn("create_time", $"create_time".cast(TimestampType))
.withColumn("etl_date", lit("20221031")) // 静态分区值
val hiveConf = Map("db" -> "ods", "tb" -> "sku_info", "partitionColumn" -> "etl_date")
EtlUtil.appendToHive(spark, df, hiveConf)
spark.table("ods.sku_info").show(false)
}
- 抽取增量数据并加载 Hive 分区表
def etlTask22(spark: SparkSession): Unit = {
// 获取现有数据中的最大 create_time
val maxCreateTimeQuery = "SELECT MAX(create_time) FROM ods.sku_info"
val maxTime = spark.sql(maxCreateTimeQuery).first().getAs
// 构造 MySQL JDBC 查询,仅加载增量数据
val mysqlURL = "jdbc:mysql://localhost:3306/shtd_store?useSSL=false"
val query =
s"""
|(SELECT * FROM sku_info
| WHERE create_time > '${maxTime}') tmp
""".stripMargin
val jdbcOptions = Map(
"url" -> mysqlURL,
"dbtable" -> query,
"user" -> "root",
"password" -> "admin"
)
val incrementDF = EtlUtil.extractFromJDBC(spark, jdbcOptions)
.withColumn("etl_date", lit("20221031"))
val hiveConf = Map("db" -> "ods", "tb" -> "sku_info", "partitionColumn" -> "etl_date")
EtlUtil.appendToHive(spark, incrementDF, hiveConf)
spark.table("ods.sku_info").show(false)
}
主程序入口
object EtlJob {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("ETL Job")
.master("local[*]")
.enableHiveSupport()
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.config("spark.sql.sources.partitionOverwriteMode", "dynamic")
.config("spark.sql.parquet.writeLegacyFormat", "true")
.getOrCreate()
// 子任务1:user_info 表
etlTask11(spark) // 构造模拟数据
etlTask12(spark) // 增量加载
// 子任务2:sku_info 表
etlTask21(spark) // 初始存量插入
etlTask22(spark) // 增量抽取并分区写入
}
}
Hive CLI 查询
show partitions ods.sku_info;
子任务3:base_province 增量数据同步与静态分区写入
任务说明
本任务要求从 shtd_store 数据库中的 base_province 表抽取新增数据,加载至 Hive 的 ods.base_province 分区表
- 使用
id字段作为增量判断条件,仅插入id大于 Hive 当前最大值的记录 - 加入一个新字段
create_time,值为任务执行时的当前系统时间 - 加载数据时需设置静态分区字段
etl_date(格式为yyyyMMdd,为比赛日前一日) - 数据写入 Hive 后,使用 Hive CLI 执行以下命令查看分区情况,并将截图写入报告:
show partitions ods.base_province;
实现思路
该任务与前两个子任务在流程上基本一致,仅增量判断字段改为 id。操作步骤如下:
- 构造模拟存量数据写入 Hive 表;
- 查询 Hive 中已存在的最大
id; - 从 MySQL 读取新增记录(
id > max_id); - 为数据添加
create_time和静态分区列; - 将数据以追加方式写入 Hive 对应分区。
实现代码
- 构造存量数据并写入 Hive(静态分区)
def etlTask31(spark: SparkSession): Unit = {
import spark.implicits._
val sampleData = Seq(
(0L, "test", "0", "000000", "CN-00")
)
val df = sampleData.toDF("id", "name", "region_id", "area_code", "iso_code")
.withColumn("create_time", current_timestamp())
.withColumn("etl_date", lit("20221031")) // 比赛日前一天作为分区字段
val hiveOptions = Map(
"db" -> "ods",
"tb" -> "base_province",
"partitionColumn" -> "etl_date"
)
EtlUtil.loadToHive(spark, df, hiveOptions)
spark.table("ods.base_province").show(false)
}
- 从 MySQL 抽取新增数据并写入 Hive
def etlTask32(spark: SparkSession): Unit = {
// 查询 Hive 表中最大的 id
val maxIdQuery = "SELECT MAX(id) FROM ods.base_province"
val maxId = spark.sql(maxIdQuery).first().getLong(0)
// 构造 JDBC 查询,仅读取 id 大于当前最大值的记录
val mysqlURL = "jdbc:mysql://localhost:3306/shtd_store?useSSL=false"
val sql =
s"""
|(SELECT * FROM base_province
| WHERE id > $maxId) tmp
""".stripMargin
val jdbcOptions = Map(
"url" -> mysqlURL,
"dbtable" -> sql,
"user" -> "root",
"password" -> "admin"
)
val df = EtlUtil.extractFromJDBC(spark, jdbcOptions)
.withColumn("create_time", current_timestamp())
.withColumn("etl_date", lit("20221031"))
val hiveOptions = Map(
"db" -> "ods",
"tb" -> "base_province",
"partitionColumn" -> "etl_date"
)
EtlUtil.appendToHive(spark, df, hiveOptions)
spark.table("ods.base_province").show(50, truncate = false)
}
主程序入口
object EtlJob {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("ETL Job")
.master("local[*]")
.enableHiveSupport()
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.config("spark.sql.sources.partitionOverwriteMode", "dynamic")
.getOrCreate()
// 用户信息数据同步
etlTask11(spark) // 初始化 user_info 存量数据
etlTask12(spark) // 抽取 user_info 增量数据
// 商品数据同步
etlTask21(spark)
etlTask22(spark)
// 省份信息同步
etlTask31(spark)
etlTask32(spark)
}
}
Hive CLI 查询
show partitions ods.base_province;
子任务4:base_region 增量抽取与分区同步
任务说明
本任务要求将 MySQL 中 base_region 表的新增数据抽取至 Hive 的 ods.base_region 表。实现过程中需要满足以下要求:
- 使用
id字段作为增量判断条件,仅插入id大于 Hive 当前最大值的记录 - 加入一个新字段
create_time,值为任务执行时的当前系统时间 - 加载数据时需设置静态分区字段
etl_date(格式为yyyyMMdd,为比赛日前一日) - 数据写入 Hive 后,使用 Hive CLI 执行以下命令查看分区情况,并将截图写入报告:
show partitions ods.base_region;
实现思路
该过程与子任务3基本一致,仅表名、字段数及类型略有不同,这里不再赘述
实现代码
- 构造存量数据并写入 Hive
def etlTask41(spark: SparkSession): Unit = {
import spark.implicits._
val sampleData = Seq(("0", "test"))
val df = sampleData.toDF("id", "region_name")
.withColumn("create_time", current_timestamp())
.withColumn("etl_date", lit("20221031")) // 比赛日前一日
val hiveOptions = Map(
"db" -> "ods",
"tb" -> "base_region",
"partitionColumn" -> "etl_date"
)
EtlUtil.loadToHive(spark, df, hiveOptions)
spark.table("ods.base_region").show()
}
- 从 MySQL 抽取新增数据并写入 Hive
def etlTask42(spark: SparkSession): Unit = {
// 获取当前 Hive 表中最大 id
val maxIdDf = spark.sql("SELECT MAX(id) FROM ods.base_region")
val maxIdStr = maxIdDf.first().getString(0)
val maxId = if (maxIdStr != null) maxIdStr.toInt else 0
val mysqlUrl = "jdbc:mysql://localhost:3306/shtd_store?useSSL=false"
val sql =
s"""
|(SELECT * FROM base_region
| WHERE id > $maxId) tmp
""".stripMargin
val jdbcOptions = Map(
"url" -> mysqlUrl,
"dbtable" -> sql,
"user" -> "root",
"password" -> "admin"
)
val df = EtlUtil.extractFromJDBC(spark, jdbcOptions)
.withColumn("create_time", current_timestamp())
.withColumn("etl_date", lit("20221031"))
val hiveOptions = Map(
"db" -> "ods",
"tb" -> "base_region",
"partitionColumn" -> "etl_date"
)
EtlUtil.appendToHive(spark, df, hiveOptions)
spark.table("ods.base_region").show()
}
主程序入库
object EtlJob {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("ETL Job")
.master("local[*]")
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.config("spark.sql.sources.partitionOverwriteMode", "dynamic")
.config("spark.sql.parquet.writeLegacyFormat", "true")
.enableHiveSupport()
.getOrCreate()
// 子任务1:用户信息
etlTask11(spark)
etlTask12(spark)
// 子任务2:SKU信息
etlTask21(spark)
etlTask22(spark)
// 子任务3:省份信息
etlTask31(spark)
etlTask32(spark)
// 子任务4:区域信息
etlTask41(spark)
etlTask42(spark)
}
}
Hive 查询
show partitions ods.base_region;
子任务5:抽取 order_info 增量数据并分区入库
任务说明
本任务要求从 MySQL 的 shtd_store.order_info 表中提取新增数据,并加载至 Hive 中的 ods.order_info 表。要求如下:
- 使用
create_time和operate_time两字段中的最大值作为时间戳判断依据 - 仅同步比 Hive 当前数据更新的记录,实现增量抽取
- 保持字段名与数据类型不变
- 加载数据时需设置静态分区字段
etl_date(格式为yyyyMMdd,为比赛日前一日) - 数据写入 Hive 后,使用 Hive CLI 执行以下命令查看分区情况,并将截图写入报告:
show partitions ods.order_info;
实现思路
- 增量判断:使用
GREATEST(create_time, operate_time)判断最大时间 - 增量抽取:从 MySQL 中获取时间更大的新数据行;只取新记录
- 数据转换:添加分区字段
etl_date,并准备写入格式 - Hive装载:将数据写入 Hive 分区表
ods.order_info中
实现代码
- 构造存量数据并写入 Hive
def etlTask51(spark: SparkSession): Unit = {
val jdbcUrl = "jdbc:mysql://localhost:3306/shtd_store?useSSL=false"
val baseSql =
"""
|SELECT *
|FROM order_info
|WHERE (create_time <= '2020-04-26 18:00:00' OR create_time IS NULL)
| AND (operate_time <= '2020-04-26 18:00:00' OR operate_time IS NULL)
""".stripMargin
val jdbcOptions = Map(
"url" -> jdbcUrl,
"dbtable" -> baseSql,
"user" -> "root",
"password" -> "admin"
)
val df = EtlUtil.extractFromJDBC(spark, jdbcOptions)
.withColumn("etl_date", lit("20221031")) // 比赛日前一天
val hiveOptions = Map(
"db" -> "ods",
"tb" -> "order_info",
"partitionColumn" -> "etl_date"
)
EtlUtil.loadToHive(spark, df, hiveOptions)
spark.table("ods.order_info").show()
}
- 实现增量数据提取并追加写入 Hive
def etlTask52(spark: SparkSession): Unit = {
// 查询现有数据中最大时间(create_time 与 operate_time 的最大值)
val maxTimeSQL =
"SELECT GREATEST(MAX(create_time), MAX(operate_time)) FROM ods.order_info"
val maxTime = spark.sql(maxTimeSQL)
.first()
.getAs
val mysqlUrl = "jdbc:mysql://localhost:3306/shtd_store?useSSL=false"
val incrSql =
s"""
|(SELECT *
| FROM order_info
| WHERE create_time > '$maxTime' OR operate_time > '$maxTime') tmp
""".stripMargin
val jdbcOptions = Map(
"url" -> mysqlUrl,
"dbtable" -> incrSql,
"user" -> "root",
"password" -> "admin"
)
val dfIncr = EtlUtil.extractFromJDBC(spark, jdbcOptions)
.withColumn("etl_date", lit("20221031"))
val hiveOptions = Map(
"db" -> "ods",
"tb" -> "order_info",
"partitionColumn" -> "etl_date"
)
EtlUtil.appendToHive(spark, dfIncr, hiveOptions)
spark.table("ods.order_info").show()
}
主程序入口
object EtlJob {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("ETL Job")
.master("local[*]")
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.config("spark.sql.sources.partitionOverwriteMode", "dynamic")
.config("spark.sql.parquet.writeLegacyFormat", "true")
.enableHiveSupport()
.getOrCreate()
// 子任务5:订单信息
etlTask51(spark) // 构造存量数据
etlTask52(spark) // 执行增量抽取与加载
}
}
Hive CLI 查询
show partitions ods.order_info;
子任务6:抽取 order_detail 表增量数据并静态分区写入 Hive
任务说明
本任务要求从 MySQL 数据库 shtd_store 中抽取 order_detail 表的数据,仅同步新增记录,写入 Hive 中的 ods.order_detail 表。核心要求包括:
- 以
create_time字段为增量标识 - 保持字段名与数据类型不变
- 加载数据时需设置静态分区字段
etl_date(格式为yyyyMMdd,为比赛日前一日) - 数据写入 Hive 后,使用 Hive CLI 执行以下命令查看分区情况,并将截图写入报告:
show partitions ods.order_detail
实现思路
- 增量抽取:查询 ODS 表中
create_time的最大值,再从 MySQL 中同步更新记录 - 数据转换:在增量数据中添加静态分区字段
etl_date - Hive装载:将数据写入 Hive 分区表中,使用静态分区方式,确保历史数据不被覆盖
实现代码
- 构建存量数据并入库 Hive
def etlTask61(spark: SparkSession): Unit = {
val jdbcUrl = "jdbc:mysql://localhost:3306/shtd_store?useSSL=false"
val stockSql =
"""
|SELECT *
|FROM order_detail
|WHERE create_time <= '2020-04-26 18:48:00'
""".stripMargin
val jdbcOptions = Map(
"url" -> jdbcUrl,
"dbtable" -> stockSql,
"user" -> "root",
"password" -> "admin"
)
val df = EtlUtil.extractFromJDBC(spark, jdbcOptions)
.withColumn("etl_date", lit("20221031"))
val hiveOptions = Map(
"db" -> "ods",
"tb" -> "order_detail",
"partitionColumn" -> "etl_date"
)
EtlUtil.loadToHive(spark, df, hiveOptions)
spark.table("ods.order_detail").show()
}
- 增量数据提取并追加写入 Hive
def etlTask62(spark: SparkSession): Unit = {
// 获取 ODS 表中最大 create_time 值
val maxTimeSql = "SELECT MAX(create_time) FROM ods.order_detail"
val latestTime = spark.sql(maxTimeSql)
.first()
.getAs
val jdbcUrl = "jdbc:mysql://localhost:3306/shtd_store?useSSL=false"
val incrSql =
s"""
|(SELECT *
| FROM order_detail
| WHERE create_time > '$latestTime') tmp
""".stripMargin
val jdbcOptions = Map(
"url" -> jdbcUrl,
"dbtable" -> incrSql,
"user" -> "root",
"password" -> "admin"
)
val dfIncr = EtlUtil.extractFromJDBC(spark, jdbcOptions)
.withColumn("etl_date", lit("20221031"))
val hiveOptions = Map(
"db" -> "ods",
"tb" -> "order_detail",
"partitionColumn" -> "etl_date"
)
EtlUtil.appendToHive(spark, dfIncr, hiveOptions)
spark.table("ods.order_detail").show()
}
主程序入口
object EtlJob {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("ETL Job")
.master("local[*]")
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.config("spark.sql.sources.partitionOverwriteMode", "dynamic")
.config("spark.sql.parquet.writeLegacyFormat", "true")
.enableHiveSupport()
.getOrCreate()
// 子任务6:订单明细表
etlTask61(spark) // 加载存量数据
etlTask62(spark) // 加载增量数据
}
}
Hive CLI 查询
show partitions ods.order_detail;
欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 1701220998@qq.com