1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import os
import sys
import pandas as pd
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from utils.templates_mysql import TemplatesMysql
class ExportDwtStInfo(TemplatesMysql):
def __init__(self, site_name='us', date_type="week", date_info='2022-1'):
"""
默认导入所有站点的所有data_type类型的表
"""
super().__init__()
self.site_name = site_name
if self.site_name == "all":
self.site_name_list = ["us", "uk", "de", "es", "fr", "it"]
else:
self.site_name_list = [self.site_name]
self.date_type = date_type
self.date_info = date_info
if self.date_type in ["week", "4_week"]:
self.year_week = date_info
self.year = int(self.year_week.split("-")[0])
self.week = int(self.year_week.split("-")[-1])
self.params_sql = f" and week={self.week}"
self.table_save = f"{self.site_name}_brand_st_info_{self.date_type}"
self.table_update = f"{self.site_name}_brand_analytics_{self.year}"
else:
if self.date_type in ["month"]:
self.year_month = date_info
self.year = int(self.year_month.split("-")[0])
self.month = int(self.year_month.split("-")[-1])
self.params_sql = f" and month={self.month}"
else:
self.year_quarter = date_info
self.year = int(self.year_quarter.split("-")[0])
self.quarter = int(self.year_quarter.split("-")[-1])
self.params_sql = f" and quarter={self.quarter}"
self.table_save = f"{self.site_name}_brand_st_info_{self.date_type}"
self.table_update = f"{self.site_name}_brand_analytics_{self.date_type}"
self.path_sh = f"/opt/module/spark/demo/sqoop_script/dwd/export_dwt_st_info.sh"
self.df_table_counts = pd.DataFrame()
def export_data(self, site_name):
with self.engine.begin() as conn:
sql_delete = f"delete from {self.table_save} where year={self.year} {self.params_sql}"
conn.execute(sql_delete)
print(f"开始导出{site_name}站点的数据")
os.system(f"{self.path_sh} {site_name} {self.date_type} {self.date_info.split('-')[0]} {self.date_info.split('-')[1]}")
# os.system(f"{self.path_sh} {site_name} {self.date_type} {self.year} {self.week}")
def check_data(self, site_name):
self.site_name = site_name
self.engine = self.mysql_connect()
sql_read = f"select count(*) as table_counts from {self.table_save} where year={self.year} {self.params_sql}"
self.df_table_counts = pd.read_sql(sql_read, con=self.engine)
table_counts = list(self.df_table_counts.table_counts)[0]
print("table_counts:", table_counts)
if table_counts == 0:
self.export_data(site_name=site_name)
def update_data(self):
with self.engine.begin() as conn:
# conn.execute(f"set @week={self.week};")
print(f"1. {self.site_name}--更新ao_val等9大指标") # , a.orders=b.st_search_sum 暂时舍弃,需要重新计算
if self.date_type in ["week", "4_week"]:
params = f"b.week={self.week} and b.year={self.year}"
elif self.date_type in ["month"]:
params = f"b.month={self.month} and b.year={self.year}"
else:
params = f"b.quarter={self.quarter} and b.year={self.year}"
sql_update_1 = f"""UPDATE {self.table_update} a, {self.table_save} b
set a.ao_val=b.st_ao_val, a.ao_val_rank=b.st_ao_val_rank, a.ao_val_rate=b.st_ao_val_rate, a.is_ascending_text=b.st_is_ascending_text, a.is_search_text=b.st_is_search_text,
a.quantity_being_sold=b.st_quantity_being_sold,
a.bsr_orders=b.st_asin_bs_orders_sum, a.category_id=b.st_asin_bs_cate_current_id
WHERE {params} and a.id=b.st_brand_id;"""
print(f"sql_update_1:", sql_update_1)
conn.execute(sql_update_1)
print(f"2. {self.site_name}--更新is_first_text")
sql_update_2 = f"""UPDATE {self.table_update} a, {self.table_save} b
set a.is_first_text=b.st_is_first_text WHERE a.rank<=700000 and {params} and b.st_is_first_text=1 and a.id=b.st_brand_id;"""
print(f"sql_update_2:", sql_update_2)
conn.execute(sql_update_2)
print(f"3. 更新工作流表workflow_exhibition的data_type=7(ao_val计算)的状态为3")
sql_update_3 = f"update selection.workflow_exhibition set status=3 " \
f"where week='{self.year}_{self.week}' and site_name='{self.site_name}' and data_type=7"
conn.execute(sql_update_3)
def run(self):
for site_name in self.site_name_list:
self.site_name = site_name
if self.date_type in ["week", "4_week"]:
self.year_week = date_info
self.year = int(self.year_week.split("-")[0])
self.week = int(self.year_week.split("-")[-1])
self.params_sql = f" and week={self.week}"
self.table_save = f"{self.site_name}_brand_st_info_{self.date_type}"
# if self.site_name == 'us':
# self.table_save = f"{self.site_name}_brand_st_info_{self.date_type}_copy1"
self.table_update = f"{self.site_name}_brand_analytics_{self.year}"
else:
if self.date_type in ["month"]:
self.year_month = date_info
self.year = int(self.year_month.split("-")[0])
self.month = int(self.year_month.split("-")[-1])
else:
self.year_quarter = date_info
self.year = int(self.year_quarter.split("-")[0])
self.quarter = int(self.year_quarter.split("-")[-1])
self.table_save = f"{self.site_name}_brand_st_info_{self.date_type}"
self.table_update = f"{self.site_name}_brand_analytics_{self.date_type}"
self.engine = self.mysql_connect()
self.export_data(site_name=site_name)
self.check_data(site_name=site_name)
self.update_data()
if __name__ == '__main__':
site_name = sys.argv[1] # 参数1:site_name列表-->all:所有站点
date_type = sys.argv[2] # 参数2:类型:week/4_week/month/quarter
date_info = sys.argv[3] # 参数2:年-周, 比如: 2022-1
# handle_obj = ExportDwtStInfo(site_name_flag=site_name_flag, year_week=year_week)
handle_obj = ExportDwtStInfo(site_name=site_name, date_type=date_type, date_info=date_info)
handle_obj.run()