dwd_asin_to_pg.py
14.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
import os
import random
import sys
import time
import traceback
import pandas as pd
from pyspark.storagelevel import StorageLevel
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from utils.templates import Templates
# from ..utils.templates import Templates
# from AmazonSpider.pyspark_job.utils.templates import Templates
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
# 分组排序的udf窗口函数
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from utils.db_util import DbTypes, DBUtil
from utils.common_util import CommonUtil
class DwdAsinToPg(Templates):
def __init__(self, site_name="us", date_type="week", date_info="2022-1"):
super().__init__()
self.site_name = site_name
self.date_type = date_type
self.date_info = date_info
self.db_save = f"dwd_asin_to_pg"
self.spark = self.create_spark_object(app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}")
self.df_save = self.spark.sql(f"select 1+1;")
self.df_st_asin_today = self.spark.sql(f"select 1+1;")
self.df_st_asin_last_5_day = self.spark.sql(f"select 1+1;")
self.df_asin_variation = self.spark.sql(f"select 1+1;")
self.df_asin_stable = self.spark.sql(f"select 1+1;")
self.partitions_by = ['site_name', 'date_type', 'date_info']
self.date_info_tuple = "('2022-11-02')"
self.date_info_tuple_last_5_day = "('2022-11-02')"
self.reset_partitions(partitions_num=1)
# self.date_info_tuple = ('2022-11-01', '2022-11-02', '2022-11-03', '2022-11-04', '2022-11-05', '2022-11-06')
self.date_today = ''
self.date_last_5_day_tuple = tuple()
self.get_date_info_tuple()
self.engine_pg14 = DBUtil.get_db_engine(db_type=DbTypes.postgresql_14.name, site_name=self.site_name)
self.engine_mysql = DBUtil.get_db_engine(db_type=DbTypes.mysql.name, site_name=self.site_name)
def truncate_or_update_table_syn(self):
table = f"{self.site_name}_all_syn_st_month_{self.date_info.replace('-', '_')}"
year, month = self.date_info.split("-")
sql = f"select count(*) as st_count from {self.site_name}_brand_analytics_month_{year} where year={year} and month={int(month)} ;"
df = pd.read_sql(sql, con=self.engine_mysql)
print("sql:", sql, df.shape)
if list(df.st_count)[0] >= 1_0000:
# sql = f"select asin from {self.site_name}_all_syn_st_month_{year} where date_info='{self.date_info}'"
sql = f"select asin from {table} where date_info='{self.date_info}';"
print("sql:", sql, df.shape)
pdf_asin = pd.read_sql(sql, con=self.engine_pg14)
schema = StructType([
StructField('asin', StringType(), True),
])
df_asin = self.spark.createDataFrame(pdf_asin, schema=schema)
# self.df_save = self.df_save.join(self.df_save, df_asin.asin == self.df_save.asin, "left_anti")
df_save_alias = self.df_save.alias("df_save")
df_asin_alias = df_asin.alias("df_asin")
self.df_save = df_save_alias.join(df_asin_alias, df_asin_alias.asin == df_save_alias.asin, "left_anti")
self.df_save.show(10, truncate=False)
print(f"df_asin: {df_asin.count()}, self.df_save: {self.df_save.count()}")
else:
while True:
try:
with self.engine_pg14.begin() as conn:
sql_truncate = f"truncate {table};"
print("月搜索词没有导入进来, 需要先清空表, sql_truncate:", sql_truncate)
conn.execute(sql_truncate)
break
except Exception as e:
print(e, traceback.format_exc())
time.sleep(random.randint(3, 10))
self.engine_pg14 = DBUtil.get_db_engine(db_type=DbTypes.postgresql_14.name,
site_name=self.site_name)
continue
def truncate_or_update_table_syn_old(self):
table = f"{self.site_name}_all_syn_st_month_{self.date_info.replace('-', '_')}" if self.site_name == 'us' else f"{self.site_name}_all_syn_st_{self.date_info.replace('-', '_')}"
if site_name == 'us':
year, month = self.date_info.split("-")
sql = f"select count(*) as st_count from {self.site_name}_brand_analytics_month_{year} where year={year} and month={int(month)} ;"
# else:
# year, week = self.date_info.split("-")
# sql = f"select count(*) from {self.site_name}_brand_analytics_{year} where week={week};"
df = pd.read_sql(sql, con=self.engine_mysql)
print("sql:", sql, df.shape)
if list(df.st_count)[0] >= 100_0000:
sql = f"select asin from us_all_syn_st_month_{year} where date_info='{self.date_info}'"
pdf_asin = pd.read_sql(sql, con=self.engine_pg14)
schema = StructType([
StructField('asin', StringType(), True),
])
df_asin = self.spark.createDataFrame(pdf_asin, schema=schema)
# self.df_save = self.df_save.join(self.df_save, df_asin.asin == self.df_save.asin, "left_anti")
df_save_alias = self.df_save.alias("df_save")
df_asin_alias = df_asin.alias("df_asin")
self.df_save = df_save_alias.join(df_asin_alias, df_asin_alias.asin == df_save_alias.asin, "left_anti")
self.df_save.show(10, truncate=False)
print(f"df_asin: {df_asin.count()}, self.df_save: {self.df_save.count()}")
else:
while True:
try:
with self.engine_pg14.begin() as conn:
sql_truncate = f"truncate {table};"
print("sql_truncate:", sql_truncate)
conn.execute(sql_truncate)
break
except Exception as e:
print(e, traceback.format_exc())
time.sleep(random.randint(3, 10))
self.engine_pg14 = DBUtil.get_db_engine(db_type=DbTypes.postgresql_14.name,
site_name=self.site_name)
continue
else:
while True:
try:
with self.engine_pg14.begin() as conn:
sql_truncate = f"truncate {table};"
print("sql_truncate:", sql_truncate)
conn.execute(sql_truncate)
break
except Exception as e:
print(e, traceback.format_exc())
time.sleep(random.randint(3, 10))
self.engine_pg14 = DBUtil.get_db_engine(db_type=DbTypes.postgresql_14.name, site_name=self.site_name)
continue
def get_date_info_tuple(self):
self.df_date = self.spark.sql(f"select * from dim_date_20_to_30;")
df = self.df_date.toPandas()
if self.date_type == 'day':
df_today = df.loc[df.date == f'{self.date_info}']
id_today = list(df_today.id)[0]
id_last_5_day = id_today - 4
print("id_today, id_last_5_day:", id_today, id_last_5_day)
df_last_5_day = df.loc[(df.id < id_today) & (df.id >= id_last_5_day)]
self.date_last_5_day_tuple = tuple(df_last_5_day.date)
def read_data(self):
# 测试月流程用
# sql = f"select asin from us_all_syn_st_month_{2024} where date_info='{2023-12}' limit 100000"
# print("sql===:", sql)
# pdf_asin = pd.read_sql(sql, con=self.engine_pg14)
# schema = StructType([
# StructField('asin', StringType(), True),
# ])
# df_asin = self.spark.createDataFrame(pdf_asin, schema=schema)
# df_asin.show(10, truncate=False)
if self.date_type == 'day':
print("1.1 读取dim_st_asin_info表(当前日)")
sql = f"select asin, site_name from dim_st_asin_info where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}';"
print("sql:", sql)
self.df_st_asin_today = self.spark.sql(sqlQuery=sql).cache()
self.df_st_asin_today.show(10, truncate=False)
self.df_st_asin_today = self.df_st_asin_today.drop_duplicates(["asin"])
print("self.df_st_asin_today:", self.df_st_asin_today.count())
print("1.2 读取dim_st_asin_info表(当前日的前6天)")
sql = f"select asin, 1 as asin_isin_flag from {self.db_save} where site_name='{self.site_name}' and date_type='{self.date_type}' " \
f"and date_info in {self.date_last_5_day_tuple} and date_info >= '2022-11-02';"
print("sql:", sql)
self.df_st_asin_last_5_day = self.spark.sql(sqlQuery=sql).cache()
self.df_st_asin_last_5_day.show(10, truncate=False)
print("self.df_st_asin_last_5_day去重前:", self.df_st_asin_last_5_day.count())
self.df_st_asin_last_5_day = self.df_st_asin_last_5_day.drop_duplicates(["asin"])
print("self.df_st_asin_last_5_day去重后:", self.df_st_asin_last_5_day.count())
else:
sql = f"select asin, site_name from dim_st_asin_info where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}';"
print("sql:", sql)
self.df_st_asin = self.spark.sql(sqlQuery=sql).cache()
self.df_st_asin = self.df_st_asin.drop_duplicates(["asin"])
print("self.df_st_asin.count:", self.df_st_asin.count())
print("2. 读取dim_asin_variation_info表")
sql = f"""select asin, 1 as asin_is_variation from dim_asin_variation_info where site_name="{self.site_name}";"""
self.df_asin_variation = self.spark.sql(sqlQuery=sql).cache()
self.df_asin_variation.show(10, truncate=False)
self.df_asin_variation = self.df_asin_variation.drop_duplicates(["asin"])
print("3. 读取dim_asin_stable_info表")
sql = f"""select asin, asin_volume as volume, asin_weight_str as weight_str from dim_asin_stable_info where site_name="{self.site_name}";"""
self.df_asin_stable = self.spark.sql(sqlQuery=sql).cache()
self.df_asin_stable.show(10, truncate=False)
self.df_asin_stable = self.df_asin_stable.drop_duplicates(["asin"])
def handle_data(self):
if self.date_type == 'day':
if self.date_info >= '2022-11-02':
self.df_st_asin_today = self.df_st_asin_today.join(
self.df_st_asin_last_5_day, on='asin', how='left'
)
print("self.df_st_asin_today合并:", self.df_st_asin_today.count())
self.df_st_asin_today = self.df_st_asin_today.filter("asin_isin_flag is null")
print("self.df_st_asin_today新出现:", self.df_st_asin_today.count())
self.df_save = self.df_st_asin_today.join(
self.df_asin_variation, on='asin', how='left'
)
else:
self.df_save = self.df_st_asin.join(
self.df_asin_variation, on='asin', how='left'
).join(
self.df_asin_stable, on='asin', how='left'
)
self.df_save = self.df_save.drop("asin_isin_flag")
self.handle_temp()
self.truncate_or_update_table_syn() # 清空表/更新表數據
self.df_save = self.df_save.withColumn("site_name", F.lit(self.site_name))
self.df_save = self.df_save.withColumn("date_type", F.lit(self.date_type))
self.df_save = self.df_save.withColumn("date_info", F.lit(self.date_info))
self.df_save = self.df_save.fillna({"asin_is_variation": 0})
self.df_save.show(10, truncate=False)
print("self.df_save.count:", self.df_save.count())
users = ["fangxingjun", "wangrui4", "pengyanbing"]
title = f"dwd_asin_to_pg: {self.site_name}, {self.date_type}, {self.date_info}"
content = f"整合asin完成--等待导出到pg提供爬虫使用--self.df_save.count: {self.df_save.count()}"
CommonUtil().send_wx_msg(users=users, title=title, content=content)
# quit()
def handle_temp(self):
if self.site_name == 'us' and self.date_type == 'week' and self.date_info == '2023-44':
sql = f"select asin from dwd_asin_to_pg where site_name='{self.site_name}' and date_type='month' and date_info='2023-11';"
print("sql:", sql)
self.df_asin_month = self.spark.sql(sqlQuery=sql).cache()
self.df_asin_month = self.df_asin_month.drop_duplicates(["asin"])
self.df_save = self.df_save.withColumn("data_type", F.lit(100))
# self.df_save = self.df_save.join(
# self.df_asin_month, on='asin'
# )
result_df = self.df_asin_month.join(self.df_save, self.df_asin_month.asin == self.df_save.asin, "left_anti")
print("result_df.count:", result_df.count())
# 确保两个 DataFrame 有相同的列
columns1 = self.df_save.columns
columns2 = result_df.columns
print(f"columns1:{columns1}, columns2:{columns2}")
# 为 df1 添加在 df2 中存在但 df1 中缺失的列
for col in set(columns2) - set(columns1):
self.df_save = self.df_save.withColumn(col, F.lit(None))
# 为 df2 添加在 df1 中存在但 df2 中缺失的列
for col in set(columns1) - set(columns2):
result_df = result_df.withColumn(col, F.lit(None))
# self.df_save = self.df_save.join(
# result_df
# )
print("self.df_save.count11:", self.df_save.count())
self.df_save = self.df_save.unionByName(result_df)
print("self.df_save.count22:", self.df_save.count())
self.site_name = "us"
self.date_type = "month"
self.date_info = "2023-11"
else:
self.df_save = self.df_save.withColumn("data_type", F.lit(1))
if __name__ == "__main__":
site_name = sys.argv[1] # 参数1:站点
date_type = sys.argv[2] # 参数2:类型:week/4_week/month/quarter
date_info = sys.argv[3] # 参数3:年-周/年-月/年-季, 比如: 2022-1
handle_obj = DwdAsinToPg(site_name=site_name, date_type=date_type, date_info=date_info)
handle_obj.run()