大数据竞赛(任务书一)-数据挖掘任务一:特征工程
任务描述
剔除订单信息表与订单详情信息表中用户 ID 和商品 ID 在维表中不存在的记录。同时建议充分利用缓存机制,并合理设置并行度参数,以提升代码运行效率和计算性能
启动Hive Metastore服务
Spark读写Hive表,需要访问Metastore服务。在终端中执行如下命令:
hive --servicemetastore
这将保持Hive Metastore服务一直运行,请勿关闭终端。如果要将其作为后台服务启动,则可以使用下面的命令:
nohup hive--service metastore &
这个命令将启动Hive Metastore服务,并在后台持续运行
子任务1:计算相似用户 Top 10
任务说明
根据 Hive 中 dwd 库的相关表(或 MySQL 中 shtd_store 数据库中的 order_detail、sku_info 表),计算出与用户 ID 为 6708 的用户购买商品类别最为相似的前 10 位用户。
- 相似度定义:仅考虑购买过多少个相同类别的商品,而不考虑购买次数;
- 输出结果仅为用户 ID,格式如下:
-------------------相同种类前10的id结果展示为:----------
1,2,901,4,5,21,32,91,14,52
实现思路
经样例数据验证,发现用户 ID 为 6708 的用户无有效商品购买记录。为此,改用用户 ID 为 1605 替代执行本任务
本任务的核心在于:基于商品 三级分类(category3_id) 计算用户间的相似度,找到与用户 1605 相似度最高的前 10 位用户。由于只考虑商品种类的重合情况,最合适的相似度计算方式为 Jaccard 相似度
Jaccard 相似度简介
Jaccard 相似度用于衡量两个集合的相似程度,其计算公式如下:
J(A,B)= \frac{|A \cap B|}{|A \cup B|}
其中 AAA 和 BBB 是两个集合,交集表示两用户购买的商品类别中相同的部分,合集表示总的不同商品类别数
Jaccard 距离则为:
D(A,B)=1−J(A,B)
相似度越高,距离越小。
如下图所示:
例如,有三个用户C1、C2和C3,他们购买的所有物品构成的物品集合为(item1、item2、…、item9)。由此构成的”用户-物品”交易表,以及三个客户每对之间的相似度可以用Jaccard系数来计算,如下图所示:
上图中这些测量结果表明,C1和C3有相似的购物行为,而C2和C3并不相似,因为他们购买了完全不同的商品。
最后,用计算两个二元属性之间的不相似度来代替相似度,这称为”Jaccard距离“。注意区分Jaccard距离与Jaccard相似系数的区别!!!!
实现代码
以下是使用 Spark + Hive 实现用户 1605 与其他用户 Jaccard 相似度计算的完整代码:
def featureTask1(spark: SparkSession): Unit = {
import org.apache.spark.sql.functions._
import spark.implicits._
// 加载三个 Hive 表(订单表、订单明细表、商品维度表)
val order_info = spark.table("dwd.fact_order_info")
val order_detail = spark.table("dwd.fact_order_detail")
val sku_info = spark.sql("SELECT * FROM dwd.dim_sku_info WHERE etl_date='20221031'")
// 三表关联,获取用户与商品三级分类(category3_id)信息
val joined = order_info
.join(order_detail, $"id" === order_detail("order_id"))
.join(sku_info, $"sku_id" === sku_info("id"))
.select($"user_id", $"order_id", $"sku_id", order_detail("sku_name"), $"category3_id")
// 获取用户1605的购买商品种类
val user1605ItemsDF = joined
.filter($"user_id" === 1605)
.select($"user_id".as("user"), $"category3_id".as("item"))
// 获取其他用户的购买商品种类
val userOtherItemsDF = joined
.filter($"user_id" =!= 1605)
.select($"user_id".as("user"), $"category3_id".as("item"))
// 聚合用户1605的商品类别集合
val user1605Items = user1605ItemsDF
.groupBy("user")
.agg(collect_set("item").as("user1605_items"))
// 聚合其他用户的商品类别集合
val userOtherItems = userOtherItemsDF
.groupBy("user")
.agg(collect_set("item").as("items_array"))
// 笛卡尔积配对计算 Jaccard 相似度(交并比)
val user1605AndOtherItems = userOtherItems
.crossJoin(broadcast(user1605Items.select($"user1605_items")))
// 转为RDD计算Jaccard距离并取Top 10
val top10Users = user1605AndOtherItems.rdd
.map(row => (
row.getAs[Int]("user"),
row.getAs[Seq[Int]]("items_array"),
row.getAs[Seq[Int]]("user1605_items")
))
.map { case (user, items, user1605Items) =>
val intersectionSize = items.intersect(user1605Items).size.toFloat
val unionSize = (items ++ user1605Items).distinct.size.toFloat
val jaccardDistance = 1 - (intersectionSize / unionSize)
(user, jaccardDistance)
}
.toDF("user", "jaccard_distance")
.orderBy($"jaccard_distance".asc)
.limit(10)
.collect()
// 输出结果
println("-------------------相同种类前10的id结果展示为:----------")
println(top10Users.map(_.get(0)).mkString(","))
}
执行结果展示
-------------------相同种类前10的id结果展示为:----------
1089,417,1064,6956,3102,1694,8611,5729,4131,9321
子任务 2:商品数据特征工程
任务说明
从 Hive 的 dwd 库中的相关表(或 MySQL 中 shtd_store 库的 sku_info 表)中获取以下字段:
idspu_idpriceweighttm_idcategory3_id
并完成以下预处理任务:
- 对
price和weight两个连续型字段进行规范化处理(使用 min-max 标准化) - 对
spu_id、tm_id、category3_id三个类别型字段进行 One-Hot 编码(即:若属于该类则置为 1,否则为 0) - 按照
id字段进行升序排序
| 字段 | 类型 | 中文含义 | 备注 |
|---|---|---|---|
| id | double | 主键 | |
| price | double | 价格 | |
| weight | double | 重量 | |
| spu_id#1 | double | spu_id 1 | 若属于该spu_id,则内容为1,否则为0 |
| spu_id#2 | double | spu_id 2 | 若属于该spu_id,则内容为1,否则为0 |
| …… | double | ||
| tm_id#1 | double | 品牌1 | 若属于该品牌,则内容为1,否则为0 |
| tm_id#2 | double | 品牌2 | 若属于该品牌,则内容为1,否则为0 |
| …… | double | ||
| category3_id#1 | double | 分类级别3 1 | 若属于该分类级别3,则内容为1,否则为0 |
| category3_id#2 | double | 分类级别3 2 | 若属于该分类级别3,则内容为1,否则为0 |
| …… | double |
- 输出第一条数据的前 10 个字段值(无需包含字段名),结果示例如下:
--------------------第一条数据前10列结果展示为:------------------
1.0,0.89,0.72,0.0,0.0,0.0,0.0,1.0,0.0,0.0
实现思路
- 本任务属于特征工程预处理,主要涉及:
- 数值型字段归一化处理
- 类别型字段的 One-Hot 编码
- 输出至 Hive 表
dwd.sku_info_cleaned,供后续任务使用
实现代码
def featureTask2(spark: SparkSession): Unit = {
import org.apache.spark.sql.functions._
import spark.implicits._
// 1. 读取原始数据(Hive 表)
val sku_info_data = spark
.sql("SELECT * FROM dwd.dim_sku_info WHERE etl_date='20221031'")
.select("id", "spu_id", "price", "weight", "tm_id", "category3_id")
// 2. 计算 min-max 值用于归一化
val min_price = sku_info_data.agg(min("price")).as[Double].first()
val max_price = sku_info_data.agg(max("price")).as[Double].first()
val min_weight = sku_info_data.agg(min("weight")).as[Double].first()
val max_weight = sku_info_data.agg(max("weight")).as[Double].first()
// 3. 对 price、weight 进行归一化处理
val sku_info_data_norm = sku_info_data
.withColumn("price", (col("price") - min_price) / (max_price - min_price))
.withColumn("weight", (col("weight") - min_weight) / (max_weight - min_weight))
// 4. 构建 One-Hot 编码空间(唯一值集合)
val spuIdValues = sku_info_data_norm.select("spu_id").distinct().as[Long].collect()
val tmIdValues = sku_info_data_norm.select("tm_id").distinct().as[Long].collect()
val category3IdValues = sku_info_data_norm.select("category3_id").distinct().as[Long].collect()
// 5. 对 spu_id 进行 One-Hot 编码
val sku_info_dummy_spu = spuIdValues.foldLeft(sku_info_data_norm) {
(df, value) => df.withColumn(s"spu_id#$value", when(col("spu_id") === value, 1).otherwise(0))
}
// 6. 对 tm_id 进行 One-Hot 编码
val sku_info_dummy_tm = tmIdValues.foldLeft(sku_info_dummy_spu) {
(df, value) => df.withColumn(s"tm_id#$value", when(col("tm_id") === value, 1).otherwise(0))
}
// 7. 对 category3_id 进行 One-Hot 编码
val sku_info_dummy_cat3 = category3IdValues.foldLeft(sku_info_dummy_tm) {
(df, value) => df.withColumn(s"category3_id#$value", when(col("category3_id") === value, 1).otherwise(0))
}
// 8. 删除原始类别字段,并按 id 升序排序
val sku_info_dummy = sku_info_dummy_cat3
.drop("spu_id", "tm_id", "category3_id")
.orderBy("id")
// 9. 输出第一条数据前10列
val firstRow = sku_info_dummy.first()
println("--------------------第一条数据前10列结果展示为:------------------")
println((0 until 10).map(firstRow.get).mkString(","))
// 10. 保存为 Hive 表(供后续任务使用)
sku_info_dummy.write
.format("parquet")
.mode("overwrite")
.saveAsTable("dwd.sku_info_cleaned")
//展示前5行
spark.table("dwd.sku_info_cleaned").show(5)
}
后面的任务二我自己也没有实践过,模块c就到此结束
欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 1701220998@qq.com