1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.db_util import DBUtil, DbTypes
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
def export_postgresql():
pass
def export_postgresql_cluster():
pass
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
date_info = CommonUtil.get_sys_arg(2, None)
# 获取最后一个参数
test_flag = CommonUtil.get_sys_arg(len(sys.argv) - 1, None)
# 工作时间不导出
date_type = "day"
CommonUtil.judge_is_work_hours(site_name=site_name,
date_type=date_type,
date_info=date_info,
principal='wujicang',
priority=1,
export_tools_type=1)
if test_flag == 'test':
db_type = DbTypes.postgresql_test.name
else:
db_type = DbTypes.postgresql_cluster.name
print(f"导出到{db_type}中")
engine = DBUtil.get_db_engine(db_type, site_name)
d_month_now = CommonUtil.reformat_date(date_info, "%Y-%m-%d", "%Y_%m", )
rel_date_info = CommonUtil.reformat_date(date_info, "%Y-%m-%d", "%Y-%m", )
next_month = CommonUtil.get_month_offset(rel_date_info, 1)
# 导出表
export_master_tb = f"{site_name}_bsr_asin_detail"
export_tb = f"{export_master_tb}_{d_month_now}"
export_tb_copy = f"{export_master_tb}_{d_month_now}_copy"
with engine.connect() as connection:
sql = f"""
create table if not exists {export_tb} partition of {export_master_tb} for values from ('{rel_date_info}') to ('{next_month}');
drop table if exists {export_tb_copy};
create table if not exists {export_tb_copy}
(
like {export_tb} including indexes including comments
);
"""
print("================================执行sql================================")
print(sql)
connection.execute(sql)
connection.close()
# 导出表名
sh = CommonUtil.build_export_sh(
site_name=site_name,
db_type=db_type,
hive_tb="dwt_bsr_asin_detail",
export_tb=export_tb_copy,
col=[
"asin",
"title",
"img_url",
"ao_val",
"rating",
"total_comments",
"bsr_orders",
"bsr_orders_change",
"price",
"weight",
"launch_time",
"date_info",
"brand_name",
"buy_box_seller_type",
"account_name",
"volume",
"img_type",
"last_update_time",
"asin_type",
"asin_air_freight_gross_margin",
"asin_ocean_freight_gross_margin",
"asin_unlaunch_time",
"seller_id",
"seller_country_name",
"category_first_id",
"first_category_rank",
"first_category_rank_date",
"package_quantity",
"asin_launch_time_type",
"seller_country_type",
"asin_bought_month",
],
partition_dict={
"site_name": site_name,
"date_info": rel_date_info
}
)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
client.close()
# 交换分区表名
DBUtil.exchange_pg_part_tb(
engine,
source_tb_name=export_tb_copy,
part_master_tb=export_master_tb,
part_target_tb=export_tb,
part_val={
"from": [rel_date_info],
"to": [next_month]
},
cp_index_flag=False,
)
# 修改状态
sql = f""" insert ignore into workflow_everyday (site_name, report_date, status, status_val, table_name, date_type, page, is_end, remark)
values ('{site_name}', '{date_info}', '导出PG数据库', 14, '{site_name}_bsr_asin_rank', 'day', 'BSR榜单', '是', 'BS榜单对应的TOP100ASIN')
"""
CommonUtil.modify_export_workflow_status(
update_workflow_sql=sql,
site_name=site_name,
date_type=date_type,
date_info=date_info
)
print("success")