1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.db_util import DBUtil
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil, DateTypes
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
date_type = CommonUtil.get_sys_arg(2, None)
date_info = CommonUtil.get_sys_arg(3, None)
# 获取最后一个参数
sql_flag = CommonUtil.get_sys_arg(len(sys.argv) - 1, None)
print(f"执行参数为{sys.argv}")
if sql_flag == 'mysql':
db_type = 'mysql'
print("导出到mysql中")
elif sql_flag == 'postgresql':
db_type = "postgresql"
print("导出到PG库中")
# 获取数据库连接
engine = DBUtil.get_db_engine(db_type, site_name)
export_seller_agg_table = f"{site_name}_seller_asin_account_agg"
# 保证幂等性,先删除原始表同周期的数据
sql = f"""
delete from {export_seller_agg_table} where date_info = {date_info}
"""
DBUtil.engine_exec_sql(engine, sql)
# 导出agg表
sh_agg = CommonUtil.build_export_sh(
site_name=site_name,
db_type=db_type,
hive_tb="dwd_seller_asin_account_agg",
export_tb=export_seller_agg_table,
col=[
"account_id",
"account_name",
"asin_new_counts",
"asin_counts",
"asin_counts_exists",
"counts_new_rate",
"top_20_avg_price",
"top_20_avg_rating",
"top_20_avg_total_comments",
"fb_variat_num",
"fb_asin_total",
"fb_variat_prop",
"ym",
"week",
"date_info"
],
partition_dict={
"site_name": site_name,
"date_type": date_type,
"date_info": date_info
}
)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh_agg, ignore_err=False)
# 导出dwt_fd_category_agg
export_fd_cat_agg = f"{site_name}_seller_category_agg"
# 清除同周期历史数据
sql = f"""
delete from {export_fd_cat_agg} where date_info = {date_info}
"""
DBUtil.engine_exec_sql(engine, sql)
# 导出表
sh_fd_cat_agg = CommonUtil.build_export_sh(
site_name=site_name,
db_type=db_type,
hive_tb="dwt_fd_category_agg",
export_tb=export_fd_cat_agg,
col=[
"fd_account_id",
"bsr_cate_1_id",
"fd_cate_asin_num",
"fd_cate_new_asin_num",
"fd_asin_num",
"bsr_asin_num",
"fd_cate_asin_per",
"fd_cate_new_asin_per",
"fd_market_per",
"ym",
"week",
"date_info"
],
partition_dict={
"site_name": site_name,
"date_type": date_type,
"date_info": date_info
}
)
SSHUtil.exec_command_async(client, sh_fd_cat_agg, ignore_err=False)
# 导出detail表
export_seller_detail_table = f"{site_name}_seller_asin_account_detail_copy1"
export_table_target = f"{site_name}_seller_asin_account_detail"
# 清除 copy表数据
sqls = [
f"create table if not exists {export_seller_detail_table} ( like {export_table_target} );",
f"truncate table {export_seller_detail_table}; "
]
for sql in sqls:
DBUtil.engine_exec_sql(engine, sql)
# 导出表
sh_detail = CommonUtil.build_export_sh(
site_name=site_name,
db_type=db_type,
hive_tb="dwd_seller_asin_account_detail",
export_tb=export_seller_detail_table,
col=[
"account_id",
"account_name",
"asin",
"launch_time",
"days_diff",
"is_asin_new"
],
partition_dict={
"site_name": site_name,
"date_type": date_type,
"date_info": date_info
}
)
SSHUtil.exec_command_async(client, sh_detail, ignore_err=False)
client.close()
# 替换表名称
# sql = f"""
# alter table {export_table_target} rename to {export_table_target}_bak ;
# alter table {export_seller_detail_table} rename to {export_table_target} ;
# alter table {export_table_target}_bak rename to {export_seller_detail_table} ;
# """
sqls = [f"alter table {export_table_target} rename to {export_table_target}_bak ;",
f"alter table {export_seller_detail_table} rename to {export_table_target} ;",
f"alter table {export_table_target}_bak rename to {export_seller_detail_table} ;"
]
for sql in sqls:
DBUtil.engine_exec_sql(engine, sql)
# 关闭链接
engine.dispose()