大数据应用技术模块c任务一

大数据竞赛(任务书一)-数据挖掘任务一:特征工程

任务描述

剔除订单信息表与订单详情信息表中用户 ID 和商品 ID 在维表中不存在的记录。同时建议充分利用缓存机制,并合理设置并行度参数,以提升代码运行效率和计算性能

启动Hive Metastore服务

Spark读写Hive表,需要访问Metastore服务。在终端中执行如下命令:

hive --servicemetastore

这将保持Hive Metastore服务一直运行,请勿关闭终端。如果要将其作为后台服务启动,则可以使用下面的命令:

nohup hive--service metastore &

这个命令将启动Hive Metastore服务,并在后台持续运行

子任务1:计算相似用户 Top 10

任务说明

根据 Hive 中 dwd 库的相关表(或 MySQL 中 shtd_store 数据库中的 order_detailsku_info 表),计算出与用户 ID 为 6708 的用户购买商品类别最为相似的前 10 位用户。

  • 相似度定义:仅考虑购买过多少个相同类别的商品,而不考虑购买次数
  • 输出结果仅为用户 ID,格式如下:
-------------------相同种类前10的id结果展示为:----------
1,2,901,4,5,21,32,91,14,52

实现思路

经样例数据验证,发现用户 ID 为 6708 的用户无有效商品购买记录。为此,改用用户 ID 为 1605 替代执行本任务

本任务的核心在于:基于商品 三级分类(category3_id) 计算用户间的相似度,找到与用户 1605 相似度最高的前 10 位用户。由于只考虑商品种类的重合情况,最合适的相似度计算方式为 Jaccard 相似度

Jaccard 相似度简介

Jaccard 相似度用于衡量两个集合的相似程度,其计算公式如下:

J(A,B)= \frac{|A \cap B|}{|A \cup B|}

其中 AAA 和 BBB 是两个集合,交集表示两用户购买的商品类别中相同的部分,合集表示总的不同商品类别数

Jaccard 距离则为:

D(A,B)=1−J(A,B)

相似度越高,距离越小。

如下图所示:

例如,有三个用户C1、C2和C3,他们购买的所有物品构成的物品集合为(item1、item2、…、item9)。由此构成的”用户-物品”交易表,以及三个客户每对之间的相似度可以用Jaccard系数来计算,如下图所示:

上图中这些测量结果表明,C1和C3有相似的购物行为,而C2和C3并不相似,因为他们购买了完全不同的商品。

最后,用计算两个二元属性之间的不相似度来代替相似度,这称为”Jaccard距离“。注意区分Jaccard距离与Jaccard相似系数的区别!!!!

实现代码

以下是使用 Spark + Hive 实现用户 1605 与其他用户 Jaccard 相似度计算的完整代码:

def featureTask1(spark: SparkSession): Unit = {
  import org.apache.spark.sql.functions._
  import spark.implicits._

  // 加载三个 Hive 表(订单表、订单明细表、商品维度表)
  val order_info = spark.table("dwd.fact_order_info")
  val order_detail = spark.table("dwd.fact_order_detail")
  val sku_info = spark.sql("SELECT * FROM dwd.dim_sku_info WHERE etl_date='20221031'")

  // 三表关联,获取用户与商品三级分类(category3_id)信息
  val joined = order_info
    .join(order_detail, $"id" === order_detail("order_id"))
    .join(sku_info, $"sku_id" === sku_info("id"))
    .select($"user_id", $"order_id", $"sku_id", order_detail("sku_name"), $"category3_id")

  // 获取用户1605的购买商品种类
  val user1605ItemsDF = joined
    .filter($"user_id" === 1605)
    .select($"user_id".as("user"), $"category3_id".as("item"))

  // 获取其他用户的购买商品种类
  val userOtherItemsDF = joined
    .filter($"user_id" =!= 1605)
    .select($"user_id".as("user"), $"category3_id".as("item"))

  // 聚合用户1605的商品类别集合
  val user1605Items = user1605ItemsDF
    .groupBy("user")
    .agg(collect_set("item").as("user1605_items"))

  // 聚合其他用户的商品类别集合
  val userOtherItems = userOtherItemsDF
    .groupBy("user")
    .agg(collect_set("item").as("items_array"))

  // 笛卡尔积配对计算 Jaccard 相似度(交并比)
  val user1605AndOtherItems = userOtherItems
    .crossJoin(broadcast(user1605Items.select($"user1605_items")))

  // 转为RDD计算Jaccard距离并取Top 10
  val top10Users = user1605AndOtherItems.rdd
    .map(row => (
      row.getAs[Int]("user"),
      row.getAs[Seq[Int]]("items_array"),
      row.getAs[Seq[Int]]("user1605_items")
    ))
    .map { case (user, items, user1605Items) =>
      val intersectionSize = items.intersect(user1605Items).size.toFloat
      val unionSize = (items ++ user1605Items).distinct.size.toFloat
      val jaccardDistance = 1 - (intersectionSize / unionSize)
      (user, jaccardDistance)
    }
    .toDF("user", "jaccard_distance")
    .orderBy($"jaccard_distance".asc)
    .limit(10)
    .collect()

  // 输出结果
  println("-------------------相同种类前10的id结果展示为:----------")
  println(top10Users.map(_.get(0)).mkString(","))
}

执行结果展示

-------------------相同种类前10的id结果展示为:----------
1089,417,1064,6956,3102,1694,8611,5729,4131,9321

子任务 2:商品数据特征工程

任务说明

从 Hive 的 dwd 库中的相关表(或 MySQL 中 shtd_store 库的 sku_info 表)中获取以下字段:

  • id
  • spu_id
  • price
  • weight
  • tm_id
  • category3_id

并完成以下预处理任务:

  1. priceweight 两个连续型字段进行规范化处理(使用 min-max 标准化)
  2. spu_idtm_idcategory3_id 三个类别型字段进行 One-Hot 编码(即:若属于该类则置为 1,否则为 0)
  3. 按照 id 字段进行升序排序
字段 类型 中文含义 备注
id double 主键
price double 价格
weight double 重量
spu_id#1 double spu_id 1 若属于该spu_id,则内容为1,否则为0
spu_id#2 double spu_id 2 若属于该spu_id,则内容为1,否则为0
…… double
tm_id#1 double 品牌1 若属于该品牌,则内容为1,否则为0
tm_id#2 double 品牌2 若属于该品牌,则内容为1,否则为0
…… double
category3_id#1 double 分类级别3 1 若属于该分类级别3,则内容为1,否则为0
category3_id#2 double 分类级别3 2 若属于该分类级别3,则内容为1,否则为0
…… double
  1. 输出第一条数据的前 10 个字段值(无需包含字段名),结果示例如下:
--------------------第一条数据前10列结果展示为:------------------
1.0,0.89,0.72,0.0,0.0,0.0,0.0,1.0,0.0,0.0

实现思路

  • 本任务属于特征工程预处理,主要涉及:
    • 数值型字段归一化处理
    • 类别型字段的 One-Hot 编码
    • 输出至 Hive 表 dwd.sku_info_cleaned,供后续任务使用

实现代码

def featureTask2(spark: SparkSession): Unit = {
  import org.apache.spark.sql.functions._
  import spark.implicits._

  // 1. 读取原始数据(Hive 表)
  val sku_info_data = spark
    .sql("SELECT * FROM dwd.dim_sku_info WHERE etl_date='20221031'")
    .select("id", "spu_id", "price", "weight", "tm_id", "category3_id")

  // 2. 计算 min-max 值用于归一化
  val min_price = sku_info_data.agg(min("price")).as[Double].first()
  val max_price = sku_info_data.agg(max("price")).as[Double].first()
  val min_weight = sku_info_data.agg(min("weight")).as[Double].first()
  val max_weight = sku_info_data.agg(max("weight")).as[Double].first()

  // 3. 对 price、weight 进行归一化处理
  val sku_info_data_norm = sku_info_data
    .withColumn("price", (col("price") - min_price) / (max_price - min_price))
    .withColumn("weight", (col("weight") - min_weight) / (max_weight - min_weight))

  // 4. 构建 One-Hot 编码空间(唯一值集合)
  val spuIdValues = sku_info_data_norm.select("spu_id").distinct().as[Long].collect()
  val tmIdValues = sku_info_data_norm.select("tm_id").distinct().as[Long].collect()
  val category3IdValues = sku_info_data_norm.select("category3_id").distinct().as[Long].collect()

  // 5. 对 spu_id 进行 One-Hot 编码
  val sku_info_dummy_spu = spuIdValues.foldLeft(sku_info_data_norm) {
    (df, value) => df.withColumn(s"spu_id#$value", when(col("spu_id") === value, 1).otherwise(0))
  }

  // 6. 对 tm_id 进行 One-Hot 编码
  val sku_info_dummy_tm = tmIdValues.foldLeft(sku_info_dummy_spu) {
    (df, value) => df.withColumn(s"tm_id#$value", when(col("tm_id") === value, 1).otherwise(0))
  }

  // 7. 对 category3_id 进行 One-Hot 编码
  val sku_info_dummy_cat3 = category3IdValues.foldLeft(sku_info_dummy_tm) {
    (df, value) => df.withColumn(s"category3_id#$value", when(col("category3_id") === value, 1).otherwise(0))
  }

  // 8. 删除原始类别字段,并按 id 升序排序
  val sku_info_dummy = sku_info_dummy_cat3
    .drop("spu_id", "tm_id", "category3_id")
    .orderBy("id")

  // 9. 输出第一条数据前10列
  val firstRow = sku_info_dummy.first()
  println("--------------------第一条数据前10列结果展示为:------------------")
  println((0 until 10).map(firstRow.get).mkString(","))

  // 10. 保存为 Hive 表(供后续任务使用)
  sku_info_dummy.write
    .format("parquet")
    .mode("overwrite")
    .saveAsTable("dwd.sku_info_cleaned")

  //展示前5行
  spark.table("dwd.sku_info_cleaned").show(5)
}

后面的任务二我自己也没有实践过,模块c就到此结束


欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 1701220998@qq.com
导航页 GitHub