1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil, DateTypes
from utils.db_util import DBUtil
from sqlalchemy import text
from datetime import datetime
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
date_type = CommonUtil.get_sys_arg(2, None)
date_info = CommonUtil.get_sys_arg(3, None)
test_flag = CommonUtil.get_sys_arg(len(sys.argv) - 1, None)
if test_flag == 'test':
db_type = 'postgresql_test'
print("导出到测试库中")
else:
CommonUtil.judge_is_work_hours(site_name=site_name, date_type=date_type, date_info=date_info,
principal='wangrui4', priority=2, export_tools_type=1, belonging_to_process='买家搜索词')
db_type = 'postgresql_cluster'
print("导出到PG-Cluster库中")
year_str = CommonUtil.safeIndex(date_info.split("-"), 0, None)
export_master_tb = f"{site_name}_buyer_st"
export_tb = f"{site_name}_buyer_st_{year_str}"
# 获取数据库连接
mysql_engine = DBUtil.get_db_engine('mysql', 'us')
pg_clu_engine = DBUtil.get_db_engine(db_type, site_name)
start_day_sql = ''
end_day_sql = ''
if date_type == 'week':
start_day_sql = text(f"""
select date from date_20_to_30
where year_week='{date_info}' and week_day=1;
""")
end_day_sql = text(f"""
SELECT date from date_20_to_30
where year_week='{date_info}' and week_day=7;
""")
elif date_type == 'month_week':
start_day_sql = text(f"""
SELECT STR_TO_DATE('{date_info}-01', '%Y-%m-%d') AS date;
""")
end_day_sql = text(f"""
SELECT LAST_DAY(STR_TO_DATE('{date_info}-01', '%Y-%m-%d')) AS date;
""")
with mysql_engine.connect() as connection:
start_day = (connection.execute(start_day_sql).first())['date']
end_day = (connection.execute(end_day_sql).first())['date']
connection.close()
start_day = str(start_day).replace("-", "")
end_day = str(end_day).replace("-", "")
print(f"{date_info}的月初是:{start_day},月末是:{end_day}")
if date_type == 'us':
sql = f"""
create table if not exists {export_tb} (
like {export_master_tb} including comments
);
delete from {export_tb} where report_date >= '{start_day}' and report_date <= '{end_day}'
"""
else:
sql = f"""
create table if not exists {export_tb} (
like {export_master_tb} including comments
);
delete from {export_tb} where report_date > '{start_day}' and report_date <= '{end_day}'
"""
DBUtil.engine_exec_sql(pg_clu_engine, sql)
# 导出表名
sh = CommonUtil.build_export_sh(
site_name=site_name,
db_type=db_type,
hive_tb="dwd_buyer_st",
export_tb=export_tb,
col=[
"search_term",
"report_date",
"sales",
"orders",
"clicks"
],
partition_dict={
"site_name": site_name,
"date_type": date_type,
"date_info": date_info
}
)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
client.close()
print("success")