1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import os
import sys
import re
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from utils.templates import Templates
# from ..utils.templates import Templates
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
# 导入udf公共方法
from yswg_utils.common_udf import udf_asin_to_number
# from ..yswg_utils.common_udf import udf_asin_to_number
# import importlib
# # 动态导入yswg_utils.common_udf模块
# # common_udf_module = importlib.import_module("yswg_utils.common_udf")
# # # 从模块中获取udf_asin_to_number函数
# # udf_asin_to_number = getattr(common_udf_module, "udf_asin_to_number")
# import sys
# sys.path.append('/opt/module/spark/demo/py_demo')
#
# from yswg_utils.common_udf import udf_asin_to_number
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
class DimAsinVariationInfo(Templates):
def __init__(self, site_name='us', date_type='week', date_info='2024-12'):
super().__init__()
self.site_name = site_name
self.date_type = date_type
self.date_info = date_info
# 初始化self.spark对
self.db_save = 'dim_asin_variation_info'
self.spark = self.create_spark_object(
app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}")
self.df_save = self.spark.sql("select 1+1;")
self.df_asin_variation = self.spark.sql("select 1+1;")
self.df_asin_detail = self.spark.sql("select 1+1;")
# self.partitions_by = ['site_name', 'date_type', 'date_info']
self.partitions_by = ['site_name']
self.partitions_dict = {
"site_name": site_name
}
self.reset_partitions(partitions_num=20)
self.u_asin_to_number = self.spark.udf.register("u_asin_to_number", udf_asin_to_number, IntegerType())
self.u_part_int = self.spark.udf.register("u_part_int", self.udf_part_int, IntegerType())
@staticmethod
def udf_part_int(mapped_asin):
part = int(mapped_asin / 10000_0000) + 1
return part
def read_data(self):
# if self.site_name == 'us':
# sql = f"SELECT * from ods_asin_variation where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info in ('0000-00', '{self.date_info}');" # and date_info>='2023-15'
# else:
# sql = f"SELECT * from ods_asin_variat where site_name='{self.site_name}';"
sql = f"SELECT * from ods_asin_variation where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info in ('0000-00', '{self.date_info}');" # and date_info>='2023-15'
print(f"1. 读取ods_asin_variation表: sql -- {sql}")
self.df_asin_variation = self.spark.sql(sqlQuery=sql).cache()
self.df_asin_variation = self.df_asin_variation.withColumn("created_date", F.substring(self.df_asin_variation["created_time"], 1, 10)) # 提取日期部分,格式为 YYYY-MM-DD
self.df_asin_variation = self.df_asin_variation.drop("id")
self.df_asin_variation.show(10, truncate=False)
if self.date_type in ["month", "month_week"] and self.date_info >= '2024-05':
sql = f"SELECT asin, variat_num, created_at as created_time from ods_asin_detail where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info in ('0000-00', '{self.date_info}');" # and date_info>='2023-15'
print(f"2. 读取ods_asin_variation表: sql -- {sql}")
self.df_asin_detail = self.spark.sql(sqlQuery=sql).cache()
# self.df_asin_variation = self.df_asin_variation.withColumn("created_date", F.substring(self.df_asin_variation["created_time"], 1, 10)) # 提取日期部分,格式为 YYYY-MM-DD
self.df_asin_detail.show(10, truncate=False)
self.handle_data_drop_variation()
def handle_data(self):
# 过滤掉 parent_asin 长度不为 10 的行
self.df_asin_variation = self.df_asin_variation.filter(F.length("parent_asin") == 10)
# 新增mapped_asin字段
self.df_asin_variation = self.df_asin_variation.withColumn('mapped_asin', self.u_asin_to_number('parent_asin'))
# self.df_asin_variation.show(10, truncate=False)
# 新增part分区
# self.df_asin_variation = self.df_asin_variation.withColumn('part', self.u_part_int('mapped_asin'))
# 根据"asin", "parent_asin"分组, 降序取最新时间的数据 -- 旧: ["asin", "parent_asin"]
window = Window.partitionBy(["asin"]).orderBy(
self.df_asin_variation.created_time.desc_nulls_last()
)
self.df_asin_variation = self.df_asin_variation.withColumn("type_rank", F.row_number().over(window=window)). \
filter("type_rank=1").drop("type_rank", "id")
# 根据parent_asin取最新日期的数据
window = Window.partitionBy(["parent_asin"]).orderBy(
self.df_asin_variation.created_date.desc_nulls_last()
)
df = self.df_asin_variation.withColumn("type_rank", F.row_number().over(window=window)). \
filter("type_rank=1").select("parent_asin", "created_date")
self.df_asin_variation = df.join(self.df_asin_variation, on=["parent_asin", "created_date"], how='inner')
self.df_asin_variation.show(10, truncate=False)
self.df_save = self.df_asin_variation
# 测试uk,de不需要注释
# self.df_save = self.df_save.withColumn("date_type", F.lit(self.date_type))
# self.df_save = self.df_save.withColumn("date_info", F.lit(self.date_info))
# self.df_save = self.df_save.drop("date_type", "date_info")
print(f"self.df_save.count(): {self.df_save.count()}")
# self.df_save = self.df_save.join(self.df_asin_detail, self.df_save.asin == self.df_asin_detail.asin,
# "left_anti")
print(f"self.df_save.count(): {self.df_save.count()}")
# quit()
hdfs_path = CommonUtil.build_hdfs_path(self.db_save, partition_dict=self.partitions_dict)
print(f"当前存储的表名为:{self.db_save},分区为{self.partitions_dict}")
print(f"清除hdfs目录中.....{hdfs_path}")
HdfsUtils.delete_file_in_folder(hdfs_path)
def handle_data_drop_variation(self):
window = Window.partitionBy(["asin"]).orderBy(
self.df_asin_detail.created_time.desc_nulls_last()
)
self.df_asin_detail = self.df_asin_detail.withColumn("type_rank", F.row_number().over(window=window)).filter("type_rank=1").drop("type_rank", "id")
self.df_asin_detail = self.df_asin_detail.filter("variat_num is null")
self.df_asin_detail.show(10, truncate=False)
if __name__ == '__main__':
site_name = sys.argv[1] # 参数1:站点
date_type = sys.argv[2] # 参数1:站点
date_info = sys.argv[3] # 参数1:站点
handle_obj = DimAsinVariationInfo(site_name=site_name, date_type=date_type, date_info=date_info)
handle_obj.run()