dim_asin_launchtime_info.py
10.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
"""
@Author : HuangJian
@Description : 上架日期补充表
@SourceTable :
①dim_asin_launchtime_info
@SinkTable : dim_asin_label
@CreateTime : 2023/12/12 11:20
@UpdateTime : 2022/12/12 11:20
"""
import os
import sys
import re
sys.path.append(os.path.dirname(sys.path[0]))
from utils.common_util import CommonUtil, DateTypes
from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, IntegerType,DoubleType
from yswg_utils.common_udf import udf_handle_string_null_value as NullUDF
from utils.redis_utils import RedisUtils
class DimAsinLaunchtimeInfo(object):
def __init__(self, site_name, date_type, date_info):
self.site_name = site_name
self.date_type = date_type
self.date_info = date_info
app_name = f"{self.__class__.__name__}:{site_name}:{date_type}:{date_info}"
self.spark = SparkUtil.get_spark_session(app_name)
self.hive_table = "dim_asin_launchtime_info"
self.partition_dict = {
"site_name": site_name
}
# 落表路径校验
self.hdfs_path = CommonUtil.build_hdfs_path(self.hive_table, partition_dict=self.partition_dict)
self.partitions_num = CommonUtil.reset_partitions(site_name, 50)
# 自定义全局df
self.df_asin_detail = self.spark.sql(f"select 1+1;")
self.df_history_launchtime = self.spark.sql(f"select 1+1;")
self.df_asin_keepa_date = self.spark.sql(f"select 1+1;")
self.df_asin_handle_launchtime = self.spark.sql(f"select 1+1;")
# udf函数
self.handle_string_num_value = F.udf(NullUDF, StringType())
self.spark.udf.register('u_handle_string_num_value', NullUDF,
StringType())
def read_data(self):
# 读取asin_detail数据
if self.date_type == 'all':
# 说明全局数据量执行
print("==================执行全量数据整合=================")
sql = f"""
with st_asin as(
select
asin,
u_handle_string_num_value(launch_time) as crawl_asin_launch_time,
date_format(updated_at, '{CommonUtil._date_time_format}') as updated_time,
concat(site_name,',',date_type,',',date_info) as period_label
from ods_asin_detail
where site_name = '{self.site_name}'
),
bsr_asin as(
select
asin,
u_handle_string_num_value(launch_time) as crawl_asin_launch_time,
date_format(created_at, '{CommonUtil._date_time_format}') as updated_time,
concat(site_name,',',date_type,',',date_info) as period_label
from ods_self_asin_detail
where site_name = '{self.site_name}'
)
select asin, crawl_asin_launch_time,updated_time,period_label from st_asin
union
select asin, crawl_asin_launch_time,updated_time,period_label from bsr_asin
"""
print(sql)
self.df_asin_detail = self.spark.sql(sql)
else:
print("==================执行分区数据整合=================")
# 按分区检测是否有新增的asin
if self.date_type in (DateTypes.week.name,DateTypes.month.name,DateTypes.month_week.name):
# 取st维度的st下的asin数据
sql = f"""
select
asin,
u_handle_string_num_value(launch_time) as crawl_asin_launch_time,
date_format(updated_at, '{CommonUtil._date_time_format}') as updated_time,
concat(site_name,',',date_type,',',date_info) as period_label
from ods_asin_detail
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.date_info}'
"""
elif self.date_type in (DateTypes.day.name):
# 取bsr日维度的asin
sql = f"""
select
asin,
u_handle_string_num_value(launch_time) as crawl_asin_launch_time,
date_format(created_at, '{CommonUtil._date_time_format}') as updated_time,
concat(site_name,',',date_type,',',date_info) as period_label
from ods_self_asin_detail
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.date_info}'
"""
print(sql)
# 如果是分区执行可进行缓存数据
self.df_asin_detail = self.spark.sql(sql).cache()
# 读取keepa数据
if self.site_name == 'us':
sql = f"""select asin,
launch_time as keepa_asin_launch_time,
updated_time as updated_time,
1 as keepa_crawl_flag
from ods_asin_keepa_date_tmp
where site_name='{self.site_name}'
and state = 1 """
else:
sql = f"""select asin,
launch_time as keepa_asin_launch_time,
updated_at as updated_time,
1 as keepa_crawl_flag
from ods_asin_keep_date
where site_name='{self.site_name}'
and state = 3 """
print(sql)
self.df_asin_keepa_date = self.spark.sql(sqlQuery=sql).cache()
self.df_asin_keepa_date = self.df_asin_keepa_date.orderBy(self.df_asin_keepa_date.updated_time.desc_nulls_last())
self.df_asin_keepa_date = self.df_asin_keepa_date.drop_duplicates(['asin'])
# 读取历史已经整合好的上架日期数据
sql = f"""
select asin,
crawl_asin_launch_time,
'9999-12-31 23:59:59' as updated_time,
appear_period_label as period_label
from dim_asin_launchtime_info
where site_name='{self.site_name}'
"""
print(sql)
self.df_history_launchtime = self.spark.sql(sqlQuery=sql)
def handle_launchtime_data(self):
if self.date_type != 'all':
# 如果不是走all逻辑,则说明走日常调度逻辑;可以将我们已有的历史上架日期放入逻辑中去重
self.df_asin_detail = self.df_asin_detail.unionByName(self.df_history_launchtime).cache()
df_all_asin = self.df_asin_detail.orderBy(self.df_asin_detail.updated_time.desc_nulls_last())
# 去重,保留所有的asin
df_all_asin = df_all_asin.drop_duplicates(['asin'])
df_all_asin = df_all_asin.select(
F.col("asin"),F.col("crawl_asin_launch_time").alias("crawl_asin_launch_time_left"), 'period_label')
# 过滤找出抓取的上架日期不为空的数据
df_not_null_asin = self.df_asin_detail.filter(" crawl_asin_launch_time is not null")
df_not_null_asin = df_not_null_asin.orderBy(df_not_null_asin.updated_time.desc_nulls_last())
df_not_null_asin = df_not_null_asin.drop_duplicates(['asin'])
df_not_null_asin = df_not_null_asin.select(
F.col("asin"),
F.col("crawl_asin_launch_time").alias("crawl_asin_launch_time_right"),
F.col("period_label").alias("period_label_right")
)
self.df_asin_handle_launchtime = df_all_asin.join(
df_not_null_asin, on='asin', how='left'
)
self.df_asin_handle_launchtime = self.df_asin_handle_launchtime.select(
F.col("asin"),
F.when((F.col("crawl_asin_launch_time_left").isNull()) & (F.col("crawl_asin_launch_time_right").isNotNull()), F.col("crawl_asin_launch_time_right"))
.otherwise(F.col("crawl_asin_launch_time_left")).alias("crawl_asin_launch_time"),
F.when((F.col("crawl_asin_launch_time_left").isNull()) & (F.col("crawl_asin_launch_time_right").isNotNull()), F.col("period_label_right"))
.otherwise(F.col("period_label")).alias("period_label")
)
# 跟keepa_date进行关联,补充launch_time数据
self.df_asin_handle_launchtime = self.df_asin_handle_launchtime.join(
self.df_asin_keepa_date, on='asin', how='left'
)
self.df_asin_handle_launchtime = self.df_asin_handle_launchtime.select(
F.col("asin"),
F.when(F.col("crawl_asin_launch_time").isNull(), F.col("keepa_asin_launch_time"))
.otherwise(F.col("crawl_asin_launch_time")).alias("asin_launch_time"),
F.col("crawl_asin_launch_time"),
F.col("keepa_asin_launch_time"),
F.col("period_label").alias("appear_period_label"),
F.when(F.col("keepa_crawl_flag") == 1, F.lit(1)).otherwise(F.lit(0)).alias("keepa_crawl_flag"),
F.lit(self.site_name).alias("site_name")
)
def save_data(self):
# print(f"清除hdfs目录中:{self.hdfs_path}")
# HdfsUtils.delete_file_in_folder(self.hdfs_path)
df_save = self.df_asin_handle_launchtime.repartition(self.partitions_num)
CommonUtil.save_or_update_table(spark_session=self.spark,hive_tb_name=self.hive_table,partition_dict=self.partition_dict,df_save=df_save,drop_exist_tmp_flag=False)
print("success")
def run(self):
self.read_data()
self.handle_launchtime_data()
self.save_data()
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
date_type = CommonUtil.get_sys_arg(2, None)
date_info = CommonUtil.get_sys_arg(3, None)
lock_name = "dim_asin_launchtime_info"
if date_type == "all":
# 如果执行数据为all的情况,非自然解锁情况,则需锁定该表2h
lock_flag = RedisUtils.acquire_redis_lock(lock_name, expire_time=120 * 60, retry_flag=True, retry_time=10*60)
else:
lock_flag = RedisUtils.acquire_redis_lock(lock_name, expire_time=30 * 60, retry_flag=True, retry_time=10 * 60)
if lock_flag:
try:
obj = DimAsinLaunchtimeInfo(site_name, date_type, date_info)
obj.run()
finally:
# 执行完成后释放锁
RedisUtils.release_redis_lock(lock_name)