judge.py
18.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
import os
import sys
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from utils.templates_mysql import TemplatesMysql
# from ..utils.templates_mysql import TemplatesMysql
import time
import datetime
import pandas as pd
class Judge(TemplatesMysql):
def __init__(self, site_name='us', data_type=1):
super().__init__()
self.site_name = site_name
# 1: 关键词, 2: asin详情, 3, 4: 店铺产品, 5, 6, 7: ao_val,
# 11: last_week, 12: last_4_week, 13: asin_last_4_week, 14: st_month_年_月(每个月最后一周), 15: asin_st
# 21: brand_analytics_week, 22: brand_analytics_month
self.data_type = data_type
self.engine = self.mysql_connect()
self.year, self.week, self.week_day = datetime.datetime.now().isocalendar()
self.current_date = time.strftime("%Y-%m-%d", time.localtime())
print("current_date:", self.current_date)
print("site_name, year, week, week_day:", self.site_name, self.year, self.week, self.week_day)
self.df_judge = pd.DataFrame()
self.db_name = "selection" if self.site_name == "us" else f"selection_{self.site_name}"
def read_workflow(self):
# 1.1 data_type的值为1,2,4时, 监听workflow_crawling表的search_term和asin_detail和店铺产品抓取情况
table = "selection.workflow_crawling"
# 1.2 data_type的值为7时, 监听workflow_exhibition表的ao_va计算情况
if self.data_type in [7]:
table = "selection.workflow_exhibition"
sql = f"select * from {table} WHERE week='{self.year}_{self.week}' and site_name='{self.site_name}' and data_type={self.data_type} and status=3;"
print("sql_read:", sql)
self.df_judge = pd.read_sql(sql=sql, con=self.engine)
def judge(self):
if self.data_type in [1, 2]:
if self.data_type == 1:
print_words = "关键词已经抓完, 结束判断, 进行下一步操作"
else:
print_words = "asin详情已经抓完, 结束判断, 进行下一步操作"
elif self.data_type in [4]:
print_words = "店铺产品已经抓完, 结束判断, 进行下一步操作"
elif self.data_type in [7]:
print_words = "关键词的ao_val已经计算完成, 结束判断, 进行下一步操作"
else:
print_words = "输入类型有误, 请重新检查"
print(print_words)
while True:
if self.df_judge.shape[0] == 1:
print("current_time:", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
print("site_name, year, week:", self.site_name, self.year, self.week, print_words)
break
else:
print("current_time:", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
print("site_name, year, week:", self.site_name, self.year, self.week, "当前流程尚未完成,睡眠10min之后继续判断")
time.sleep(600)
self.read_workflow()
continue
def delete_and_rename_and_create_asin_last_4_week(self):
num = 0
while True:
num += 1
print("开始检查数据")
sql_read = f"""SELECT min(id) as counts FROM {self.db_name}.{self.site_name}_asin_last_4_week_copy1 union
SELECT max(id) as counts FROM {self.db_name}.{self.site_name}_asin_last_4_week_copy1;"""
df = pd.read_sql(sql_read, con=self.engine)
count = list(df.counts)[1] - list(df.counts)[0] + 1
print("copy表数据量:", count)
if count > 1000000:
print("3. 删除正式表,将copy表改成正式表")
with self.engine.begin() as conn:
sql_drop = f"drop table if exists {self.db_name}.{self.site_name}_asin_last_4_week;"
sql_rename = f"ALTER TABLE {self.db_name}.{self.site_name}_asin_last_4_week_copy1 RENAME {self.db_name}.{self.site_name}_asin_last_4_week;"
print("sql_drop:", sql_drop)
conn.execute(sql_drop)
print("sql_rename:", sql_rename)
conn.execute(sql_rename)
break
else:
if num > 3:
print("检查次数超过3次异常,停止")
break
else:
print(f"第{num}次检查异常,继续删除+改表面")
continue
# self.update_table_state(data_type=3) # last_4_week
with self.engine.begin() as conn:
sql_drop = f"drop table if exists {self.db_name}.{self.site_name}_asin_last_4_week_copy1;"
print("sql_drop:", sql_drop)
conn.execute(sql_drop)
sql_create = f"""CREATE TABLE {self.db_name}.{self.site_name}_asin_last_4_week_copy1 (
'id' int(11) NOT NULL AUTO_INCREMENT,
'asin' varchar(30) DEFAULT NULL,
'ao_val' decimal(10,3) DEFAULT '0.000' COMMENT 'SP广告词+品牌广告词+视频广告词)/ asin自然词总数',
'zr_counts' int(10) DEFAULT NULL COMMENT '搜索词对应的zr类型的asin总数',
'sp_counts' int(10) DEFAULT NULL COMMENT '搜索词对应的sp类型的asin总数',
'sb_counts' int(10) DEFAULT NULL COMMENT '搜索词对应的sb(底部、头部)类型的asin总数',
'vi_counts' int(10) DEFAULT NULL COMMENT '搜索词对应的视频词类型的asin总数',
'bs_counts' int(10) DEFAULT NULL COMMENT '搜索词对应的Best Seller词类型的asin总数',
'ac_counts' int(10) DEFAULT NULL COMMENT '搜索词对应的AC词类型的asin总数',
'tr_counts' int(10) DEFAULT NULL COMMENT '搜索词对应的TR词类型的asin总数',
'er_counts' int(10) DEFAULT NULL COMMENT '搜索词对应的ER类型的asin总数',
'bsr_orders' int(10) DEFAULT '0' COMMENT 'bsr销量(统一使用当前月该asin最后一周销量,不用求总和)',
'orders' int(10) DEFAULT NULL COMMENT '流量预估销量(统一使用当前月该asin最后一周销量,不用求总和)',
'sales' decimal(20,4) DEFAULT '0.0000' COMMENT 'bsr销量*asin价格=销售额',
'is_self' smallint(2) DEFAULT '1' COMMENT '是否为公司内部asin(1否,2是,3brand表没有抓到的asin)',
'pt_category_id' int(2) DEFAULT NULL COMMENT 'bsr该asin的上级分类id',
'one_category_id' int(2) DEFAULT NULL COMMENT 'bsr该asin的一级分类id',
'img_url' varchar(200) DEFAULT NULL COMMENT 'asin的URL',
'title' varchar(400) DEFAULT NULL COMMENT '标题',
'title_len' int(5) DEFAULT NULL COMMENT '标题长度',
'price' decimal(10,3) DEFAULT '0.000' COMMENT '价格',
'rating' decimal(10,1) DEFAULT NULL COMMENT '评分',
'total_comments' int(20) DEFAULT NULL COMMENT '评论数',
'buy_box_seller_type' double DEFAULT NULL COMMENT 'buybox卖家类型(1:Amazon,2:FBA,3FBM,4无BB卖家)',
'page_inventory' double DEFAULT NULL COMMENT '页面库存(1:充足,2:1-20(正常),3:缺货)',
'volume' varchar(30) DEFAULT NULL COMMENT '体积',
'weight' double DEFAULT NULL COMMENT '重量',
'rank' int(10) DEFAULT NULL COMMENT '排名',
'launch_time' date DEFAULT NULL COMMENT '上架时间',
'img_num' smallint(2) DEFAULT '0' COMMENT 'aisn的图片数量',
'img_type' varchar(10) DEFAULT NULL COMMENT '所有图片类型(1:普通图片,2:视频,3:A+图片)',
'activity_type' varchar(10) DEFAULT NULL COMMENT '1.coupon(with coupon、with coupon) 百分比 2.coupon(with coupon、with coupon) 金额 3.Join Prime(Prime Exclusive Discounts) 百分比 4.Join Prime(Prime Exclusive Discounts) 金额 5.Deal(Lightning Deal,7-day Deal,Deal OF The Day,Outlet deals,With Deal) 6.Top Deal(Top Deal) 7.Extra Savings(绑定促销,Extra Savings) 可允许该ASIN同时做两种促销',
'one_two_val' decimal(10,3) DEFAULT NULL COMMENT 'coupon额度 activity_type 1、2对应的值',
'three_four_val' decimal(10,3) DEFAULT NULL COMMENT 'Prime Exclusive Discounts额度 activity_type 3、4对应的值',
'five_six_val' decimal(10,3) DEFAULT NULL COMMENT 'Join Prime额度activity_type对应5,6的值',
'eight_val' decimal(10,0) DEFAULT NULL COMMENT '9:降低幅度的值 %',
'qa_num' int(10) DEFAULT NULL COMMENT 'QA数量',
'brand_name' varchar(50) DEFAULT NULL COMMENT '品牌名称',
'variation_num' int(10) DEFAULT NULL COMMENT '变体数量',
'one_star' int(2) DEFAULT NULL COMMENT '1星比例',
'two_star' int(2) DEFAULT NULL COMMENT '2星比例',
'three_star' int(2) DEFAULT NULL COMMENT '3星比例',
'four_star' int(2) DEFAULT NULL COMMENT '4星比例',
'five_star' int(2) DEFAULT NULL COMMENT '5星比例',
'low_star' int(2) DEFAULT NULL COMMENT 'one_star + two_star + three_star 比例总和',
'together_asin' varchar(255) DEFAULT NULL COMMENT '一起购买的asin,多个逗号拼接',
'account_name' varchar(255) DEFAULT NULL COMMENT '亚马逊卖家账号名称',
'account_id' bigint(2) DEFAULT NULL COMMENT '卖家账号id',
'site_name' varchar(100) DEFAULT NULL COMMENT '国家名称',
'updated_at' timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
'created_at' timestamp NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id),
UNIQUE KEY 'sel_asin' (asin) USING BTREE,
KEY 'sel_updat' (updated_at) USING BTREE,
KEY 'sel_ao' (ao_val) USING BTREE,
KEY 'sel_is_self' (is_self) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;"""
conn.execute(sql_create)
print(f"site_name: {self.site_name}站点的_asin_last_4_week已经跑完,copy表已经建立")
def delete_and_rename_and_create_st_last_4_week(self):
num = 0
while True:
num += 1
print("开始检查数据")
sql_read = f"""SELECT min(id) as counts FROM {self.db_name}.{self.site_name}_last_4_week_st_copy1 union
SELECT max(id) as counts FROM {self.db_name}.{self.site_name}_last_4_week_st_copy1;"""
df = pd.read_sql(sql_read, con=self.engine)
count = list(df.counts)[1] - list(df.counts)[0] + 1
print("copy表数据量:", count)
if count > 1000000:
print("3. 删除正式表,将copy表改成正式表")
with self.engine.begin() as conn:
sql_drop = f"drop table if exists {self.db_name}.{self.site_name}_last_4_week_st;"
sql_rename = f"ALTER TABLE {self.db_name}.{self.site_name}_last_4_week_st_copy1 RENAME {self.db_name}.{self.site_name}_last_4_week_st;"
print("sql_drop:", sql_drop)
conn.execute(sql_drop)
print("sql_rename:", sql_rename)
conn.execute(sql_rename)
break
else:
if num > 3:
print("检查次数超过3次异常,停止")
break
else:
print(f"第{num}次检查异常,继续删除+改表面")
continue
self.update_table_state(data_type=3) # last_4_week
with self.engine.begin() as conn:
sql_drop = f"drop table if exists {self.db_name}.{self.site_name}_last_4_week_st_copy1;"
print("sql_drop:", sql_drop)
conn.execute(sql_drop)
sql_create = f"""CREATE TABLE {self.db_name}.{self.site_name}_last_4_week_st_copy1 (
id int(11) NOT NULL AUTO_INCREMENT,
week int(10) NOT NULL COMMENT '周',
asin varchar(30) NOT NULL,
search_term varchar(500) NOT NULL COMMENT '搜索词',
ao_val decimal(10,3) DEFAULT '0.000' COMMENT '该asin的搜索词的AO值:当前asin的(sp总数+sb总数+视频总数)/自然总数',
zr_counts int(10) DEFAULT NULL COMMENT '搜索词对应的zr类型的asin总数',
sp_counts int(10) DEFAULT NULL COMMENT '搜索词对应的sp类型的asin总数',
orders int(20) DEFAULT '0' COMMENT '预估销量',
orders_sum int(20) DEFAULT NULL COMMENT '总销量',
flow double(10,4) DEFAULT NULL COMMENT '流量占比',
search_num int(10) DEFAULT NULL COMMENT '搜索量',
search_rank int(10) DEFAULT NULL COMMENT '搜索词排名',
adv_compet int(1) DEFAULT '0' COMMENT '广告竞品数',
compet int(1) DEFAULT '0' COMMENT '竞品数',
quantity_being_sold bigint(10) DEFAULT NULL COMMENT '关键词搜索量总条数',
zr_page_rank int(1) DEFAULT NULL COMMENT 'zr_词类型具体位置(所有页所有行的具体位置)',
zr_page int(1) DEFAULT NULL COMMENT 'zr_词类型具体页',
zr_page_row int(1) DEFAULT NULL COMMENT 'zr_词类型具体页面的行数',
zr_updated_at timestamp NULL DEFAULT NULL COMMENT 'zr_词类型抓取时间',
sp_page int(1) DEFAULT NULL COMMENT 'sp_词类型具体页',
sp_page_rank int(1) DEFAULT NULL COMMENT 'sp_词类型具体位置(所有页所有行的具体位置)',
sp_page_row int(1) DEFAULT NULL COMMENT 'sp_词类型具体页面的行数',
sp_updated_at timestamp NULL DEFAULT NULL COMMENT 'sp_词类型抓取时间',
sb1_page int(1) DEFAULT NULL COMMENT 'sb_词类型具体页(头部)',
sb1_updated_at timestamp NULL DEFAULT NULL COMMENT 'sb_词类型抓取时间(头部)',
sb2_page int(1) DEFAULT NULL COMMENT 'sb_词类型具体页(尾部)',
sb2_updated_at timestamp NULL DEFAULT NULL COMMENT 'sb_词类型抓取时间(尾部)',
sb3_page int(1) DEFAULT NULL COMMENT 'sb_词类型具体页(视频)',
sb3_updated_at timestamp NULL DEFAULT NULL COMMENT 'sb_词类型抓取时间(视频)',
ac_page int(1) DEFAULT NULL COMMENT 'ac_词类型具体页',
ac_updated_at timestamp NULL DEFAULT NULL COMMENT 'ac_词类型抓取时间',
bs_page int(1) DEFAULT NULL COMMENT 'bs_词类型具体页',
bs_updated_at timestamp NULL DEFAULT NULL COMMENT 'bs_词类型抓取时间',
er_page int(1) DEFAULT NULL COMMENT 'er_词类型具体页',
er_updated_at timestamp NULL DEFAULT NULL COMMENT 'er_词类型抓取时间',
tr_page int(1) DEFAULT NULL COMMENT 'tr_词类型具体页',
tr_updated_at timestamp NULL DEFAULT NULL COMMENT 'tr_词类型抓取时间',
search_term_type varchar(50) DEFAULT NULL COMMENT '搜索词类型(1.主要流量词;2.精准长尾词;3.精准流量词;4.转化优质词;5.转化平稳词;6.转化流失词;7.出单词;8.无效曝光词)',
created_at timestamp NULL DEFAULT CURRENT_TIMESTAMP,
updated_at timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (id),
KEY sel_st (search_term),
KEY sel_asin (asin),
KEY sel_week (week) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1020421895 DEFAULT CHARSET=utf8mb4;"""
conn.execute(sql_create)
print(f"site_name: {self.site_name}站点的last_4_week_st已经跑完,copy表已经建立")
def update_table_state(self, data_type=1):
table = "selection.workflow_integration"
with self.engine.begin() as conn:
sql_update = f"update {table} set status=3 where week='{self.year}_{self.week}' and site_name='{self.site_name}' and data_type={data_type}"
print("sql_update:", sql_update)
conn.execute(sql_update)
def delete_and_rename_and_create_seller_asin_account_detail(self):
num = 0
while True:
num += 1
print("开始检查数据")
sql_read = f"""SELECT min(id) as counts FROM {self.db_name}.{self.site_name}_seller_asin_account_detail_copy1 union
SELECT max(id) as counts FROM {self.db_name}.{self.site_name}_seller_asin_account_detail_copy1;"""
df = pd.read_sql(sql_read, con=self.engine)
count = list(df.counts)[1] - list(df.counts)[0] + 1
print("copy表数据量:", count)
if count > 1000000:
print("3. 删除正式表,将copy表改成正式表")
with self.engine.begin() as conn:
sql_drop = f"drop table if exists {self.db_name}.{self.site_name}_seller_asin_account_detail;"
sql_rename = f"ALTER TABLE {self.db_name}.{self.site_name}_seller_asin_account_detail_copy1 RENAME {self.db_name}.{self.site_name}_seller_asin_account_detail;"
print("sql_drop:", sql_drop)
conn.execute(sql_drop)
print("sql_rename:", sql_rename)
conn.execute(sql_rename)
break
else:
if num > 3:
print("检查次数超过3次异常,停止")
break
else:
print(f"第{num}次检查异常,继续删除+改表面")
continue
with self.engine.begin() as conn:
sql_drop = f"drop table if exists {self.db_name}.{self.site_name}_seller_asin_account_detail_copy1;"
print("sql_drop:", sql_drop)
conn.execute(sql_drop)
sql_create = f"""CREATE TABLE {self.db_name}.{self.site_name}_seller_asin_account_detail_copy1 (
id int(11) NOT NULL AUTO_INCREMENT,
account_id int(11) NULL DEFAULT NULL COMMENT 'us_seller_account_syn表的 主键id',
account_name varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '亚马逊卖家账号名称',
asin varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
launch_time date NULL DEFAULT NULL COMMENT '上架日期',
days_diff int(10) NULL DEFAULT NULL COMMENT '距离计算日期的间隔天数',
is_asin_new int(2) NULL DEFAULT NULL COMMENT '是否是新品(间隔天数<=180天为新品,否则为老品)0:老品,1:新品',
created_at timestamp(0) NULL DEFAULT CURRENT_TIMESTAMP(0),
updated_at timestamp(0) NULL DEFAULT CURRENT_TIMESTAMP(0) ON UPDATE CURRENT_TIMESTAMP(0),
PRIMARY KEY (id) USING BTREE,
INDEX sel_is_asin_new(account_name, is_asin_new) USING BTREE,
INDEX asin(asin) USING BTREE,
INDEX sel_account_id(account_id, is_asin_new) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 72102642 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;"""
conn.execute(sql_create)
print(f"site_name: {self.site_name}站点的_seller_asin_account_detail已经跑完,copy表已经建立")
def run(self):
if self.data_type in [1, 2, 4, 7]:
self.read_workflow()
self.judge()
elif self.data_type in range(11, 21, 1):
if self.data_type in [11]:
self.update_table_state(data_type=2) # week_st
print(f"site_name: {self.site_name}站点的week_st已经跑完")
if self.data_type in [12]:
self.delete_and_rename_and_create_st_last_4_week()
if self.data_type in [13]:
self.delete_and_rename_and_create_st_last_4_week()
elif self.data_type in range(21, 31, 1):
if self.data_type in [21]:
self.delete_and_rename_and_create_seller_asin_account_detail()
pass
if __name__ == '__main__':
site_name = sys.argv[1] # 参数1:站点
data_type = int(sys.argv[2]) # 参数1:站点
handle_obj = Judge(site_name=site_name, data_type=data_type)
handle_obj.run()