大数据应用技术模块b任务一

大数据竞赛(任务书一)-离线数据处理任务一:数据抽取

任务描述

使用 Scala 编写 Spark 程序,实现 MySQL 中 shtd_store 库下的表:

  • user_info
  • sku_info
  • base_province
  • base_region
  • order_info
  • order_detail

增量数据提取,并将其写入 Hive 中的 ods 层对应分区表

启动Hive Metastore服务

Spark读写Hive表,需要访问Metastore服务。在终端中执行如下命令:

hive --servicemetastore

这将保持Hive Metastore服务一直运行,请勿关闭终端。如果要将其作为后台服务启动,则可以使用下面的命令:

nohup hive--service metastore &

这个命令将启动Hive Metastore服务,并在后台持续运行

子任务1:从 MySQL 增量同步 user_info 数据至 Hive ODS 层

任务说明

请从 shtd_store 数据库中的 user_info 表中提取新增记录,写入 Hive 中的 ods.user_info 表(已存在)。增量判断依据如下:

  • 增量字段计算:对于每一行记录,取 operate_timecreate_time 中较晚的时间,作为当前记录的“变更时间”
  • 增量抽取逻辑:仅抽取变更时间晚于 Hive 中现有 user_info 表的最大变更时间的记录(即:只抽取新增或更新的数据)
  • 字段结构保持与原表一致
  • 需添加一个 静态分区字段 etl_date,类型为 String,格式为 yyyyMMdd,其值设定为比赛日的前一天
  • 数据写入 Hive 后,使用 Hive CLI 执行以下命令查看分区情况,并将截图写入报告:
show partitions ods.user_info;

实现思路

该任务为标准的 增量抽取 + 分区加载 类型离线 ETL

  1. 从 MySQL 抽取数据
  2. 构造分区字段
  3. 判断增量记录
  4. 使用 .insertInto() 实现分区追加
  5. 使用 Hive CLI 查看分区信息

这个子任务需要了解的技术:

  • Spark教程:”构造DataFrame”中“使用JDBC从数据库创建DataFrame”部分。
  • Spark教程:“读写Hive表”部分
  1. 首先定义一个ETL工具类,封装ETL方法实现
import org.apache.spark.sql._

object EtlUtil {

  /** 从 MySQL 数据库加载数据,返回 DataFrame */
  def extractFromJDBC(spark: SparkSession, jdbcOptions: Map[String, String]): Dataset[Row] = {
    spark.read
      .format("jdbc")
      .options(jdbcOptions)
      .load()
  }

  /** 将 DataFrame 写入 Hive 表中,可指定静态分区 */
  def appendToHive(spark: SparkSession, df: Dataset[Row], hiveOptions: Map[String, String]): Unit = {
    val db = hiveOptions("db")
    val table = hiveOptions("tb")
    val partitionColumn = hiveOptions.get("partitionColumn")

    spark.sql(s"USE $db")

    partitionColumn match {
      case Some(partCol) =>
        df.write
          .mode("append")
          .format("parquet")
          .partitionBy(partCol)
          .insertInto(table)

      case None =>
        df.write
          .mode("append")
          .format("parquet")
          .insertInto(table)
    }
  }
}

实现代码

  • 模拟存量数据加载(etlTask11):
    • 因为任务需求说明要求实现增量抽取,但官方并未给出存量数据说明。所以这里自行先抽取部分数据写入Hive分区表用来模拟存量数据:
def etlTask11(spark: SparkSession): Unit = {
  val jdbcUrl = "jdbc:mysql://localhost:3306/shtd_store?useSSL=false"

  val stockSql =
    """
      |(SELECT * FROM user_info
      | WHERE create_time = '2020-04-26 18:57:55'
      |   AND (operate_time <= '2020-04-26 18:57:55' OR operate_time IS NULL)) AS tmp
    """.stripMargin

  val jdbcOptions = Map(
    "url" -> jdbcUrl,
    "dbtable" -> stockSql,
    "user" -> "root",
    "password" -> "admin"
  )

  val rawDF = EtlUtil.extractFromJDBC(spark, jdbcOptions)

  val partitionedDF = rawDF.withColumn("etl_date", lit("20221031"))

  val hiveOptions = Map("db" -> "ods", "tb" -> "user_info", "partitionColumn" -> "etl_date")

  EtlUtil.appendToHive(spark, partitionedDF, hiveOptions)

  spark.table("ods.user_info").show()
}
  • 增量数据同步(etlTask12):
def etlTask12(spark: SparkSession): Unit = {
  // 1. 获取 ODS 表中最大时间
  val maxTimeDF = spark.sql(
    "SELECT greatest(max(create_time), max(operate_time)) AS max_time FROM ods.user_info"
  )
  val maxTime = maxTimeDF.first.getAs[java.sql.Timestamp]("max_time")

  // 2. 构建增量查询 SQL
  val jdbcUrl = "jdbc:mysql://localhost:3306/shtd_store?useSSL=false"
  val incSql =
    s"(SELECT * FROM user_info WHERE create_time > '$maxTime' OR operate_time > '$maxTime') AS tmp"

  val jdbcOptions = Map(
    "url" -> jdbcUrl,
    "dbtable" -> incSql,
    "user" -> "root",
    "password" -> "admin"
  )

  val incDF = EtlUtil.extractFromJDBC(spark, jdbcOptions)

  val dfWithPartition = incDF.withColumn("etl_date", lit("20221031"))

  val hiveOptions = Map("db" -> "ods", "tb" -> "user_info", "partitionColumn" -> "etl_date")

  EtlUtil.appendToHive(spark, dfWithPartition, hiveOptions)

  // 打印加载后数据总量
  println(s"Total records in ods.user_info: ${spark.table("ods.user_info").count()}")
}

主程序入口

object EtlJob {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("UserInfo ETL Job")
      .master("local[*]")
      .enableHiveSupport()
      .config("hive.exec.dynamic.partition", "true")
      .config("hive.exec.dynamic.partition.mode", "nonstrict")
      .config("spark.sql.sources.partitionOverwriteMode", "dynamic")
      .config("spark.sql.parquet.writeLegacyFormat", "true")
      .getOrCreate()

    // 运行子任务1
    etlTask11(spark) // 模拟存量数据
    etlTask12(spark) // 实现增量同步
  }
}

Hive CLI 查询

-- 查看 user_info 表的分区情况
show partitions ods.user_info;

子任务2:sku_info表增量数据抽取与静态分区加载

任务说明

本任务要求从 MySQL 数据库 shtd_store 中提取 sku_info 表的增量数据,并加载至 Hive 的 ods.sku_info 分区表中。

  • 以该表字段 create_time 作为增量判断依据,仅抽取新增记录

  • 加载数据时需设置静态分区字段 etl_date(格式为 yyyyMMdd,为比赛日前一日)

  • 字段结构保持不变

  • 数据写入 Hive 后,使用 Hive CLI 执行以下命令查看分区情况,并将截图写入报告:

show partitions ods.sku_info;

实现思路

核心为时间字段的增量控制与静态分区管理,可以拆分为以下几个步骤:

  1. 构建初始模拟数据:插入一条历史记录到 Hive,模拟已有存量
  2. 计算最大时间点:从 Hive 查询当前 create_time 最大值
  3. 从 MySQL 增量抽取数据:仅加载 create_time 大于该时间的记录
  4. 添加静态分区列并写入 Hive
  5. 验证写入是否成功:通过查询 Hive 分区表展示结果

实现代码

  • 构造存量数据并写入 Hive(模拟初始数据写入)
def etlTask21(spark: SparkSession): Unit = {
  import spark.implicits._
  import org.apache.spark.sql.types._

  val baseData = Seq(
    (0L, 0L, 1000.00, "测试商品", "测试商品描述", 0.06, 0L, 0L, "http://test", "2021-01-01 12:00:00")
  )

  val columnNames = Seq("id", "spu_id", "price", "sku_name", "sku_desc", "weight", "tm_id", "category3_id", "sku_default_img", "create_time")

  val df = baseData.toDF(columnNames: _*)
    .withColumn("price", $"price".cast(DecimalType(10, 2)))
    .withColumn("weight", $"weight".cast(DecimalType(10, 2)))
    .withColumn("create_time", $"create_time".cast(TimestampType))
    .withColumn("etl_date", lit("20221031")) // 静态分区值

  val hiveConf = Map("db" -> "ods", "tb" -> "sku_info", "partitionColumn" -> "etl_date")
  EtlUtil.appendToHive(spark, df, hiveConf)

  spark.table("ods.sku_info").show(false)
}
  • 抽取增量数据并加载 Hive 分区表
def etlTask22(spark: SparkSession): Unit = {
  // 获取现有数据中的最大 create_time
  val maxCreateTimeQuery = "SELECT MAX(create_time) FROM ods.sku_info"
  val maxTime = spark.sql(maxCreateTimeQuery).first().getAs 

  // 构造 MySQL JDBC 查询,仅加载增量数据
  val mysqlURL = "jdbc:mysql://localhost:3306/shtd_store?useSSL=false"
  val query =
    s"""
       |(SELECT * FROM sku_info
       | WHERE create_time > '${maxTime}') tmp
     """.stripMargin

  val jdbcOptions = Map(
    "url" -> mysqlURL,
    "dbtable" -> query,
    "user" -> "root",
    "password" -> "admin"
  )

  val incrementDF = EtlUtil.extractFromJDBC(spark, jdbcOptions)
    .withColumn("etl_date", lit("20221031"))

  val hiveConf = Map("db" -> "ods", "tb" -> "sku_info", "partitionColumn" -> "etl_date")
  EtlUtil.appendToHive(spark, incrementDF, hiveConf)

  spark.table("ods.sku_info").show(false)
}

主程序入口

object EtlJob {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("ETL Job")
      .master("local[*]")
      .enableHiveSupport()
      .config("hive.exec.dynamic.partition", "true")
      .config("hive.exec.dynamic.partition.mode", "nonstrict")
      .config("spark.sql.sources.partitionOverwriteMode", "dynamic")
      .config("spark.sql.parquet.writeLegacyFormat", "true")
      .getOrCreate()

    // 子任务1:user_info 表
    etlTask11(spark)  // 构造模拟数据
    etlTask12(spark)  // 增量加载

    // 子任务2:sku_info 表
    etlTask21(spark)  // 初始存量插入
    etlTask22(spark)  // 增量抽取并分区写入
  }
}

Hive CLI 查询

show partitions ods.sku_info;

子任务3:base_province 增量数据同步与静态分区写入

任务说明

本任务要求从 shtd_store 数据库中的 base_province 表抽取新增数据,加载至 Hive 的 ods.base_province 分区表

  • 使用 id 字段作为增量判断条件,仅插入 id 大于 Hive 当前最大值的记录
  • 加入一个新字段 create_time,值为任务执行时的当前系统时间
  • 加载数据时需设置静态分区字段 etl_date(格式为 yyyyMMdd,为比赛日前一日)
  • 数据写入 Hive 后,使用 Hive CLI 执行以下命令查看分区情况,并将截图写入报告:
show partitions ods.base_province;

实现思路

该任务与前两个子任务在流程上基本一致,仅增量判断字段改为 id。操作步骤如下:

  1. 构造模拟存量数据写入 Hive 表;
  2. 查询 Hive 中已存在的最大 id
  3. 从 MySQL 读取新增记录(id > max_id);
  4. 为数据添加 create_time 和静态分区列;
  5. 将数据以追加方式写入 Hive 对应分区。

实现代码

  • 构造存量数据并写入 Hive(静态分区)
def etlTask31(spark: SparkSession): Unit = {
  import spark.implicits._

  val sampleData = Seq(
    (0L, "test", "0", "000000", "CN-00")
  )

  val df = sampleData.toDF("id", "name", "region_id", "area_code", "iso_code")
    .withColumn("create_time", current_timestamp())
    .withColumn("etl_date", lit("20221031")) // 比赛日前一天作为分区字段

  val hiveOptions = Map(
    "db" -> "ods",
    "tb" -> "base_province",
    "partitionColumn" -> "etl_date"
  )

  EtlUtil.loadToHive(spark, df, hiveOptions)

  spark.table("ods.base_province").show(false)
}
  • 从 MySQL 抽取新增数据并写入 Hive
def etlTask32(spark: SparkSession): Unit = {
  // 查询 Hive 表中最大的 id
  val maxIdQuery = "SELECT MAX(id) FROM ods.base_province"
  val maxId = spark.sql(maxIdQuery).first().getLong(0)

  // 构造 JDBC 查询,仅读取 id 大于当前最大值的记录
  val mysqlURL = "jdbc:mysql://localhost:3306/shtd_store?useSSL=false"
  val sql =
    s"""
       |(SELECT * FROM base_province
       | WHERE id > $maxId) tmp
     """.stripMargin

  val jdbcOptions = Map(
    "url" -> mysqlURL,
    "dbtable" -> sql,
    "user" -> "root",
    "password" -> "admin"
  )

  val df = EtlUtil.extractFromJDBC(spark, jdbcOptions)
    .withColumn("create_time", current_timestamp())
    .withColumn("etl_date", lit("20221031"))

  val hiveOptions = Map(
    "db" -> "ods",
    "tb" -> "base_province",
    "partitionColumn" -> "etl_date"
  )

  EtlUtil.appendToHive(spark, df, hiveOptions)

  spark.table("ods.base_province").show(50, truncate = false)
}

主程序入口

object EtlJob {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("ETL Job")
      .master("local[*]")
      .enableHiveSupport()
      .config("hive.exec.dynamic.partition", "true")
      .config("hive.exec.dynamic.partition.mode", "nonstrict")
      .config("spark.sql.sources.partitionOverwriteMode", "dynamic")
      .getOrCreate()

    // 用户信息数据同步
    etlTask11(spark)  // 初始化 user_info 存量数据
    etlTask12(spark)  // 抽取 user_info 增量数据

    // 商品数据同步
    etlTask21(spark)
    etlTask22(spark)

    // 省份信息同步
    etlTask31(spark)
    etlTask32(spark)
  }
}

Hive CLI 查询

show partitions ods.base_province;

子任务4:base_region 增量抽取与分区同步

任务说明

本任务要求将 MySQL 中 base_region 表的新增数据抽取至 Hive 的 ods.base_region 表。实现过程中需要满足以下要求:

  • 使用 id 字段作为增量判断条件,仅插入 id 大于 Hive 当前最大值的记录
  • 加入一个新字段 create_time,值为任务执行时的当前系统时间
  • 加载数据时需设置静态分区字段 etl_date(格式为 yyyyMMdd,为比赛日前一日)
  • 数据写入 Hive 后,使用 Hive CLI 执行以下命令查看分区情况,并将截图写入报告:
show partitions ods.base_region;

实现思路

该过程与子任务3基本一致,仅表名、字段数及类型略有不同,这里不再赘述

实现代码

  • 构造存量数据并写入 Hive
def etlTask41(spark: SparkSession): Unit = {
  import spark.implicits._

  val sampleData = Seq(("0", "test"))
  val df = sampleData.toDF("id", "region_name")
    .withColumn("create_time", current_timestamp())
    .withColumn("etl_date", lit("20221031")) // 比赛日前一日

  val hiveOptions = Map(
    "db" -> "ods",
    "tb" -> "base_region",
    "partitionColumn" -> "etl_date"
  )

  EtlUtil.loadToHive(spark, df, hiveOptions)

  spark.table("ods.base_region").show()
}
  • 从 MySQL 抽取新增数据并写入 Hive
def etlTask42(spark: SparkSession): Unit = {
  // 获取当前 Hive 表中最大 id
  val maxIdDf = spark.sql("SELECT MAX(id) FROM ods.base_region")
  val maxIdStr = maxIdDf.first().getString(0)
  val maxId = if (maxIdStr != null) maxIdStr.toInt else 0

  val mysqlUrl = "jdbc:mysql://localhost:3306/shtd_store?useSSL=false"
  val sql =
    s"""
       |(SELECT * FROM base_region
       | WHERE id > $maxId) tmp
     """.stripMargin

  val jdbcOptions = Map(
    "url" -> mysqlUrl,
    "dbtable" -> sql,
    "user" -> "root",
    "password" -> "admin"
  )

  val df = EtlUtil.extractFromJDBC(spark, jdbcOptions)
    .withColumn("create_time", current_timestamp())
    .withColumn("etl_date", lit("20221031"))

  val hiveOptions = Map(
    "db" -> "ods",
    "tb" -> "base_region",
    "partitionColumn" -> "etl_date"
  )

  EtlUtil.appendToHive(spark, df, hiveOptions)

  spark.table("ods.base_region").show()
}

主程序入库

object EtlJob {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("ETL Job")
      .master("local[*]")
      .config("hive.exec.dynamic.partition", "true")
      .config("hive.exec.dynamic.partition.mode", "nonstrict")
      .config("spark.sql.sources.partitionOverwriteMode", "dynamic")
      .config("spark.sql.parquet.writeLegacyFormat", "true")
      .enableHiveSupport()
      .getOrCreate()

    // 子任务1:用户信息
    etlTask11(spark)
    etlTask12(spark)

    // 子任务2:SKU信息
    etlTask21(spark)
    etlTask22(spark)

    // 子任务3:省份信息
    etlTask31(spark)
    etlTask32(spark)

    // 子任务4:区域信息
    etlTask41(spark)
    etlTask42(spark)
  }
}

Hive 查询

show partitions ods.base_region;

子任务5:抽取 order_info 增量数据并分区入库

任务说明

本任务要求从 MySQL 的 shtd_store.order_info 表中提取新增数据,并加载至 Hive 中的 ods.order_info 表。要求如下:

  • 使用 create_timeoperate_time 两字段中的最大值作为时间戳判断依据
  • 仅同步比 Hive 当前数据更新的记录,实现增量抽取
  • 保持字段名与数据类型不变
  • 加载数据时需设置静态分区字段 etl_date(格式为 yyyyMMdd,为比赛日前一日)
  • 数据写入 Hive 后,使用 Hive CLI 执行以下命令查看分区情况,并将截图写入报告:
show partitions ods.order_info;

实现思路

  1. 增量判断:使用 GREATEST(create_time, operate_time) 判断最大时间
  2. 增量抽取:从 MySQL 中获取时间更大的新数据行;只取新记录
  3. 数据转换:添加分区字段 etl_date,并准备写入格式
  4. Hive装载:将数据写入 Hive 分区表 ods.order_info

实现代码

  • 构造存量数据并写入 Hive
def etlTask51(spark: SparkSession): Unit = {
  val jdbcUrl = "jdbc:mysql://localhost:3306/shtd_store?useSSL=false"

  val baseSql =
    """
      |SELECT *
      |FROM order_info
      |WHERE (create_time <= '2020-04-26 18:00:00' OR create_time IS NULL)
      |  AND (operate_time <= '2020-04-26 18:00:00' OR operate_time IS NULL)
    """.stripMargin

  val jdbcOptions = Map(
    "url" -> jdbcUrl,
    "dbtable" -> baseSql,
    "user" -> "root",
    "password" -> "admin"
  )

  val df = EtlUtil.extractFromJDBC(spark, jdbcOptions)
    .withColumn("etl_date", lit("20221031")) // 比赛日前一天

  val hiveOptions = Map(
    "db" -> "ods",
    "tb" -> "order_info",
    "partitionColumn" -> "etl_date"
  )

  EtlUtil.loadToHive(spark, df, hiveOptions)

  spark.table("ods.order_info").show()
}
  • 实现增量数据提取并追加写入 Hive
def etlTask52(spark: SparkSession): Unit = {
  // 查询现有数据中最大时间(create_time 与 operate_time 的最大值)
  val maxTimeSQL =
    "SELECT GREATEST(MAX(create_time), MAX(operate_time)) FROM ods.order_info"

  val maxTime = spark.sql(maxTimeSQL)
    .first()
    .getAs 

  val mysqlUrl = "jdbc:mysql://localhost:3306/shtd_store?useSSL=false"
  val incrSql =
    s"""
       |(SELECT *
       | FROM order_info
       | WHERE create_time > '$maxTime' OR operate_time > '$maxTime') tmp
     """.stripMargin

  val jdbcOptions = Map(
    "url" -> mysqlUrl,
    "dbtable" -> incrSql,
    "user" -> "root",
    "password" -> "admin"
  )

  val dfIncr = EtlUtil.extractFromJDBC(spark, jdbcOptions)
    .withColumn("etl_date", lit("20221031"))

  val hiveOptions = Map(
    "db" -> "ods",
    "tb" -> "order_info",
    "partitionColumn" -> "etl_date"
  )

  EtlUtil.appendToHive(spark, dfIncr, hiveOptions)

  spark.table("ods.order_info").show()
}

主程序入口

object EtlJob {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("ETL Job")
      .master("local[*]")
      .config("hive.exec.dynamic.partition", "true")
      .config("hive.exec.dynamic.partition.mode", "nonstrict")
      .config("spark.sql.sources.partitionOverwriteMode", "dynamic")
      .config("spark.sql.parquet.writeLegacyFormat", "true")
      .enableHiveSupport()
      .getOrCreate()

    // 子任务5:订单信息
    etlTask51(spark) // 构造存量数据
    etlTask52(spark) // 执行增量抽取与加载
  }
}

Hive CLI 查询

show partitions ods.order_info;

子任务6:抽取 order_detail 表增量数据并静态分区写入 Hive

任务说明

本任务要求从 MySQL 数据库 shtd_store 中抽取 order_detail 表的数据,仅同步新增记录,写入 Hive 中的 ods.order_detail 表。核心要求包括:

  • create_time 字段为增量标识
  • 保持字段名与数据类型不变
  • 加载数据时需设置静态分区字段 etl_date(格式为 yyyyMMdd,为比赛日前一日)
  • 数据写入 Hive 后,使用 Hive CLI 执行以下命令查看分区情况,并将截图写入报告:
show partitions ods.order_detail

实现思路

  1. 增量抽取:查询 ODS 表中 create_time 的最大值,再从 MySQL 中同步更新记录
  2. 数据转换:在增量数据中添加静态分区字段 etl_date
  3. Hive装载:将数据写入 Hive 分区表中,使用静态分区方式,确保历史数据不被覆盖

实现代码

  • 构建存量数据并入库 Hive
def etlTask61(spark: SparkSession): Unit = {
  val jdbcUrl = "jdbc:mysql://localhost:3306/shtd_store?useSSL=false"

  val stockSql =
    """
      |SELECT *
      |FROM order_detail
      |WHERE create_time <= '2020-04-26 18:48:00'
    """.stripMargin

  val jdbcOptions = Map(
    "url" -> jdbcUrl,
    "dbtable" -> stockSql,
    "user" -> "root",
    "password" -> "admin"
  )

  val df = EtlUtil.extractFromJDBC(spark, jdbcOptions)
    .withColumn("etl_date", lit("20221031"))

  val hiveOptions = Map(
    "db" -> "ods",
    "tb" -> "order_detail",
    "partitionColumn" -> "etl_date"
  )

  EtlUtil.loadToHive(spark, df, hiveOptions)

  spark.table("ods.order_detail").show()
}
  • 增量数据提取并追加写入 Hive
def etlTask62(spark: SparkSession): Unit = {
  // 获取 ODS 表中最大 create_time 值
  val maxTimeSql = "SELECT MAX(create_time) FROM ods.order_detail"
  val latestTime = spark.sql(maxTimeSql)
    .first()
    .getAs 

  val jdbcUrl = "jdbc:mysql://localhost:3306/shtd_store?useSSL=false"
  val incrSql =
    s"""
       |(SELECT *
       | FROM order_detail
       | WHERE create_time > '$latestTime') tmp
     """.stripMargin

  val jdbcOptions = Map(
    "url" -> jdbcUrl,
    "dbtable" -> incrSql,
    "user" -> "root",
    "password" -> "admin"
  )

  val dfIncr = EtlUtil.extractFromJDBC(spark, jdbcOptions)
    .withColumn("etl_date", lit("20221031"))

  val hiveOptions = Map(
    "db" -> "ods",
    "tb" -> "order_detail",
    "partitionColumn" -> "etl_date"
  )

  EtlUtil.appendToHive(spark, dfIncr, hiveOptions)

  spark.table("ods.order_detail").show()
}

主程序入口

object EtlJob {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("ETL Job")
      .master("local[*]")
      .config("hive.exec.dynamic.partition", "true")
      .config("hive.exec.dynamic.partition.mode", "nonstrict")
      .config("spark.sql.sources.partitionOverwriteMode", "dynamic")
      .config("spark.sql.parquet.writeLegacyFormat", "true")
      .enableHiveSupport()
      .getOrCreate()

    // 子任务6:订单明细表
    etlTask61(spark) // 加载存量数据
    etlTask62(spark) // 加载增量数据
  }
}

Hive CLI 查询

show partitions ods.order_detail;

欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 1701220998@qq.com
导航页 GitHub