1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
import json
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from pyspark.sql.types import StringType, ArrayType
from utils.common_util import CommonUtil
from utils.redis_utils import RedisUtils
from utils.spark_util import SparkUtil
from pyspark.sql import functions as F, Window
"""
标题历史数据
"""
def partition(lst, size):
for i in range(0, len(lst), size):
yield lst[i:i + size]
class DwtAsinTitleHistory(object):
def __init__(self, site_name, run_type):
self.site_name = site_name
self.run_type = run_type
app_name = ":".join([self.__class__.__name__, self.site_name, self.run_type])
self.spark = SparkUtil.get_spark_session(app_name)
self.hive_tb = "dwt_asin_title_history"
pass
def merge_add(self, add_sql):
print("===================当前新增的标题数据来源于:===================")
print(add_sql)
add_df = self.spark.sql(add_sql).cache()
exist_df = self.spark.sql(
f"""
select asin,
title_time_list,
title_list
from dwt_asin_title_history
where site_name = '{self.site_name}'
"""
)
exist_df = exist_df.withColumn("map", F.explode(F.arrays_zip(
F.from_json(F.col('title_list'), ArrayType(StringType())).alias("title"),
F.from_json(F.col('title_time_list'), ArrayType(StringType())).alias("time")
))).select(
F.col("asin"),
F.col("map.time").alias("time"),
F.col("map.title").alias("title"),
)
exist_df = exist_df.unionByName(add_df, allowMissingColumns=True)
# 根据时间序列判断异同
exist_df = exist_df.withColumn("lastTitle", F.lag(F.col("title"), 1, None).over(window=Window.partitionBy(["asin"]).orderBy(
F.col("time").asc()
)))
exist_df = exist_df.filter(' title != lastTitle or lastTitle is null')
exist_df = exist_df.groupby("asin", "title").agg(
F.min("time").alias("time")
)
# 合并成一条
exist_df = exist_df.groupby(F.col("asin")).agg(
F.sort_array(F.collect_list(F.struct("title", "time"))).alias("sort_array")
).select(
F.col('asin'),
F.to_json(F.col("sort_array.time")).alias("title_time_list"),
F.to_json(F.col("sort_array.title")).alias("title_list"),
F.lit(self.site_name).alias("site_name")
)
CommonUtil.save_or_update_table(
spark_session=self.spark,
hive_tb_name=self.hive_tb,
partition_dict={
"site_name": self.site_name
},
df_save=exist_df
)
pass
def get_redis_set_key(self):
if self.site_name == 'us':
return "DwtAsinTitleHistory:set"
return f"DwtAsinTitleHistory:{self.site_name}:set"
def run_all(self):
df_asin_detail_part = CommonUtil.select_partitions_df(self.spark, "ods_asin_detail")
# for date_type in ['month', 'week', 'day', 'month_week']:
# for date_type in ['month']:
batchMap = {
"month": 10,
"week": 50,
"day": 60,
"month_week": 5,
}
redis_set_key = self.get_redis_set_key()
date_type = CommonUtil.get_sys_arg(3, "week")
for date_type in [date_type]:
redis_client = RedisUtils.get_redis_client(decode_responses=True)
exist_list = redis_client.smembers(redis_set_key)
part_list = df_asin_detail_part.filter(f" site_name = '{self.site_name}' and date_type = '{date_type}' ").sort(
F.col("date_info").asc())
rows = part_list.toPandas().to_dict(orient='records')
# 过滤掉已经计算过的
rows = list(filter(lambda it: json.dumps(it) not in exist_list, rows))
for part in partition(rows, batchMap.get(date_type)):
site_name = part[0]['site_name']
date_type = part[0]['date_type']
date_infos = [it['date_info'] for it in part]
print("=====================当前时间纬度为===============================")
print(date_infos)
if len(date_infos) > 0:
sql = f"""
select asin, title, date_format(updated_at,'yyyy-MM-dd') as time
from ods_asin_detail
where site_name = '{site_name}'
and date_type = '{date_type}'
and date_info in ({CommonUtil.list_to_insql(date_infos)})
and title is not null
and updated_at is not null
"""
# 合并
self.merge_add(sql)
print("success")
redis_client = RedisUtils.get_redis_client(decode_responses=True)
redis_client.sadd(redis_set_key, *[json.dumps(it) for it in part])
break
pass
def run(self):
df_asin_detail_part = CommonUtil.select_partitions_df(self.spark, "ods_asin_detail")
# 计算当前month_week最新5周
redis_set_key = self.get_redis_set_key()
redis_client = RedisUtils.get_redis_client(decode_responses=True)
exist_list = redis_client.smembers(self.get_redis_set_key())
if site_name == 'us':
date_type = "month"
else:
date_type = "week"
part_list = df_asin_detail_part.filter(f" site_name = '{self.site_name}' and date_type = '{date_type}' ") \
.sort(F.col("date_info").desc()).limit(5)
# 过滤掉已经计算过的
rows = list(filter(lambda it: json.dumps(it) not in exist_list, part_list.toPandas().to_dict(orient='records')))
if len(rows) > 0:
date_infos = [it['date_info'] for it in rows]
sql = f"""
select asin, title, date_format(updated_at,'yyyy-MM-dd') as time
from ods_asin_detail
where site_name = '{self.site_name}'
and date_type = '{date_type}'
and date_info in ({CommonUtil.list_to_insql(date_infos)})
and title is not null
and updated_at is not null
"""
self.merge_add(sql)
print("success")
redis_client = RedisUtils.get_redis_client(decode_responses=True)
redis_client.sadd(redis_set_key, *[json.dumps(it) for it in rows])
pass
pass
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
run_type = CommonUtil.get_sys_arg(2, None)
obj = DwtAsinTitleHistory(site_name=site_name, run_type=run_type, )
if run_type == 'all':
obj.run_all()
elif run_type == 'current':
obj.run()