Commit cb4b4b8a by wangjing

no message

parent db55f872
"""
author: 方星钧(ffman)
description: 清洗6大站点对应的 单周的zr,sp,sb,ac,bs,er,tr等7大类型数据表(计算zr,sp类型表的page_rank+合并7张表)
table_read_name: ods_st_rank_zr/sp/sb/ac/bs/er/tr
table_save_name: dim_st_asin_info
table_save_level: dim
version: 3.0
created_date: 2022-05-10
updated_date: 2022-11-07
"""
import json
import os
import sys
from datetime import datetime, timedelta
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from utils.templates import Templates
# from ..utils.templates import Templates
from pyspark.sql.types import IntegerType
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, StringType
class DwtSelfAsinDetailRating(Templates):
def __init__(self, site_name='us', date_type="day", date_info='2026-02-27'):
super().__init__()
self.site_name = site_name
self.date_type = date_type
self.date_info = date_info
self.date_info_last_180day = (datetime.strptime(date_info, "%Y-%m-%d") - timedelta(days=179)).strftime("%Y-%m-%d")
self.db_save = f'dwt_self_asin_detail_rating'
self.spark = self.create_spark_object(
app_name=f"{self.db_save}: {self.site_name},{self.date_type}, {self.date_info}")
self.df_self_asin_detail = self.spark.sql(f"select 1+1;")
self.df_save = self.spark.sql(f"select 1+1;")
self.partitions_by = ['site_name', 'date_type', 'date_info']
self.reset_partitions(partitions_num=2)
def read_data(self):
sql = f"""
select asin, rating, total_comments, date_info from ods_self_asin_detail WHERE
site_name ='{self.site_name}'
and date_type ='{self.date_type}'
and date_info >='{self.date_info_last_180day}'
and date_info <= '{date_info}'
and asin_type not like '%4%'
"""
self.df_self_asin_detail = self.spark.sql(sql).cache()
print("df_self_asin_detail数量",self.df_self_asin_detail.count())
# sql = f"""
# SELECT COUNT(DISTINCT date_info) AS total_days,
# MIN(date_info) AS min_date,
# MAX(date_info) AS max_date
# FROM ods_self_asin_detail
# WHERE date_info >= '{self.date_info_last_180day}'
# AND date_info <= '{date_info}'
# """
#
# self.spark.sql(sql).show()
def handle_data_asin_comments(self):
self.df_self_asin_detail = self.df_self_asin_detail.withColumn(
"total_comments", F.col("total_comments").cast("float").cast("int")
).withColumn(
"rating", F.col("rating").cast("float"))
self.df_self_asin_detail = self.df_self_asin_detail.groupBy("asin", "date_info").agg(
F.max("total_comments").alias("total_comments"), # 取当天最大的评论数
F.max("rating").alias("rating") # 不是分组的字段 不聚合就会丢失
).na.fill({"total_comments": 0}) # 将 Null 评论数填充为 0
# 1. 计算增量(lag 窗口函数,按日期排序取前一条有数据的记录,自动跳过缺失日期)
window_spec = Window.partitionBy("asin").orderBy("date_info")
self.df_self_asin_detail = self.df_self_asin_detail.withColumn(
"prev_comments", F.lag("total_comments").over(window_spec) # 按 asin 分组,按 date_info 排序 找前一天的评论数
).withColumn(
"daily_increment",
F.when(F.col("prev_comments").isNull(), 0) # 如果没有上一行 → 增量为 0
.otherwise(F.col("total_comments") - F.col("prev_comments")) # 否则 今天 - 前一天 评论数
)
# 2. 按 asin 分组,收集每天的数据到列表 单个asin 180天的数据字段放在一个列表
# [{"date_info":"10-19","rating":4.0,...}, {"date_info":"10-22",...}, ...]
df_grouped = self.df_self_asin_detail.groupBy("asin").agg(
F.collect_list(
F.struct("date_info", "rating", "total_comments", "daily_increment")
).alias("data_list")
)
# df_grouped.show(truncate=False,vertical=True)
# 3. UDF:将列表转为 JSON 格式 {"日期": [评分, 评论数, 增量], ...}
@F.udf(StringType())
def to_json(data_list):
if not data_list:
return None
# 按日期排序
sorted_data = sorted(data_list, key=lambda x: x["date_info"])
result = {}
# '{"2025-10-19": [4.0, 100, 0], "2025-10-22": [4.1, 108, 8], "2025-10-23": [4.2, 112, 4]}'
for item in sorted_data:
rating = float(item["rating"]) if item["rating"] is not None else 0.0
rating = round(rating, 1)
total_comments = int(item["total_comments"]) if item["total_comments"] is not None else 0
daily_increment = int(item["daily_increment"]) if item["daily_increment"] is not None else 0
result[item["date_info"]] = [rating, total_comments, daily_increment]
return json.dumps(result, ensure_ascii=False)
# 4. 应用 UDF,每个 ASIN 一条记录
self.df_self_asin_detail = df_grouped.withColumn(
"rating_and_comments_info", to_json("data_list") # 对每行的 data_list 调用 to_json
).select("asin", "rating_and_comments_info")
def handle_data(self):
# rating_and_comments_info
self.handle_data_asin_comments()
self.df_save = self.df_self_asin_detail
self.df_save = self.df_save.withColumn("site_name", F.lit(self.site_name))
self.df_save = self.df_save.withColumn("date_type", F.lit(self.date_type))
self.df_save = self.df_save.withColumn("date_info", F.lit(self.date_info))
print("df_save数量为",self.df_save.count())
self.df_save.show( truncate=False,vertical=True) # vertical=True 会让每一行数据竖着显示,字段名和值一一对应
def save_data(self):
pass
if __name__ == '__main__':
site_name = sys.argv[1] # 参数1:站点
date_type = sys.argv[2] # 参数2:类型:week/4_week/month/quarter/day
date_info = sys.argv[3] # 参数3:年-周/年-月/年-季/年-月-日, 比如: 2022-1
handle_obj = DwtSelfAsinDetailRating(site_name=site_name, date_type=date_type, date_info=date_info)
handle_obj.run()
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.secure_db_client import get_remote_engine
engine = get_remote_engine(
site_name="us", # -> database "selection"
db_type="mysql", # -> 服务端 alias "mysql"
# user="fangxingjun", # -> 服务端 alias "mysql"
# user_token="5f1b2e9c3a4d7f60" # 可不传,走默认
)
# site_name = 'us'
# date_type = 'month'
# date_info = '2026-01'
site_name = sys.argv[1] # 参数1:站点
date_type = sys.argv[2] # 参数2:类型:week/4_week/month/quarter/day
date_info = sys.argv[3] # 参数3:年-周/年-月/年-季/年-月-日, 比如: 2022-1
partitions = {
'site_name': site_name,
'date_type': date_type,
'date_info': date_info,
}
cols_list = ['asin', 'rating_and_comments_info', 'date_info']
engine.sqoop_raw_export(
hive_table='dwt_self_asin_detail_rating',
import_table=f'{site_name}_self_rating_180day',
partitions=partitions,
m=1,
cols=','.join(cols_list)
)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment