1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import os
import sys
import time
sys.path.append(os.path.dirname(sys.path[0]))
from utils.db_util import DBUtil
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil,DateTypes
from datetime import datetime
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
date_type = CommonUtil.get_sys_arg(2, None)
date_info = CommonUtil.get_sys_arg(3, None)
# 获取最后一个参数
last_flag = CommonUtil.get_sys_arg(len(sys.argv) - 1, None)
cur_date = datetime.now().date()
sql_date_type = date_type
print(f"执行参数为{sys.argv}")
db_type = 'postgresql_cluster'
CommonUtil.judge_is_work_hours(site_name=site_name, date_type=date_type, date_info=date_info, principal='fangxingjun',
priority=4, export_tools_type=1, belonging_to_process='反查搜索词')
# 获取数据库连接
engine = DBUtil.get_db_engine(db_type, site_name)
suffix = str(date_info).replace("-", "_")
export_cols = [
"asin",
"parent_asin",
"color",
"size",
"style",
"state",
"column_2",
"column_1",
"created_time",
"updated_time",
"created_date",
"mapped_asin",
]
export_tb_target = f"{site_name}_asin_variation"
export_tb_copy = f"{site_name}_asin_variation_copy1"
export_table = export_tb_copy
sql = f"""
drop table if exists {export_tb_copy};
create table if not exists {export_tb_copy}
(
like {export_tb_target} including comments including defaults
);
truncate table {export_tb_copy};
SELECT create_distributed_table('{export_tb_copy}', 'asin');
CREATE TABLE public.{export_tb_copy}_part1 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (0) TO (50000000);
CREATE TABLE public.{export_tb_copy}_part2 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (50000000) TO (100000000);
CREATE TABLE public.{export_tb_copy}_part3 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (100000000) TO (150000000);
CREATE TABLE public.{export_tb_copy}_part4 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (150000000) TO (200000000);
CREATE TABLE public.{export_tb_copy}_part5 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (200000000) TO (250000000);
CREATE TABLE public.{export_tb_copy}_part6 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (250000000) TO (300000000);
CREATE TABLE public.{export_tb_copy}_part7 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (300000000) TO (350000000);
CREATE TABLE public.{export_tb_copy}_part8 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (350000000) TO (400000000);
CREATE TABLE public.{export_tb_copy}_part9 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (400000000) TO (450000000);
CREATE TABLE public.{export_tb_copy}_part10 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (450000000) TO (500000000);
CREATE TABLE public.{export_tb_copy}_part11 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (500000000) TO (550000000);
CREATE TABLE public.{export_tb_copy}_part12 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (550000000) TO (600000000);
CREATE TABLE public.{export_tb_copy}_part13 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (600000000) TO (650000000);
CREATE TABLE public.{export_tb_copy}_part14 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (650000000) TO (700000000);
CREATE TABLE public.{export_tb_copy}_part15 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (700000000) TO (750000000);
CREATE TABLE public.{export_tb_copy}_part16 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (750000000) TO (800000000);
CREATE TABLE public.{export_tb_copy}_part17 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (800000000) TO (850000000);
CREATE TABLE public.{export_tb_copy}_part18 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (850000000) TO (900000000);
CREATE TABLE public.{export_tb_copy}_part19 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (900000000) TO (950000000);
CREATE TABLE public.{export_tb_copy}_part20 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (950000000) TO (1000000000);
"""
# 执行SQL语句
DBUtil.engine_exec_sql(engine, sql)
print("导出的字段:", export_cols)
partition_dict = {
"site_name": site_name,
}
# 导出执行sqoop的sh编写
sh = CommonUtil.build_export_sh(
site_name=site_name,
db_type=db_type,
hive_tb="dim_asin_variation_info",
export_tb=export_table,
col=export_cols,
partition_dict=partition_dict
)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
client.close()
# 重新获取数据库连接
engine = DBUtil.get_db_engine(db_type, site_name)
while True:
try:
with engine.begin() as conn:
# 创建索引
sql_index_create = f"""
CREATE INDEX {export_tb_copy}_asin_btree ON public.{export_tb_copy} USING btree (asin);
CREATE INDEX {export_tb_copy}_parent_asin_btree ON public.{export_tb_copy} USING btree (parent_asin);"""
conn.execute(sql_index_create)
print(f"sql_index_create: {sql_index_create}")
# 交换表名
sql_1 = f"""alter table {export_tb_target} rename to {export_tb_target}_back; """
conn.execute(sql_1)
sql_2 = f"""alter table {export_tb_copy} rename to {export_tb_target};"""
conn.execute(sql_2)
sql_3 = f"""alter table {export_tb_target}_back rename to {export_tb_copy};"""
conn.execute(sql_3)
# 互换索引名称
sql_index_exchange = f"""
ALTER INDEX {export_tb_target}_asin_btree RENAME TO {export_tb_target}_asin_btree_temp;
ALTER INDEX {export_tb_copy}_asin_btree RENAME TO {export_tb_target}_asin_btree;
ALTER INDEX {export_tb_target}_asin_btree_temp RENAME TO {export_tb_copy}_asin_btree;
"""
print(f"sql_index_exchange: {sql_index_exchange}")
conn.execute(sql_index_exchange)
break
except Exception as e:
time.sleep(60)
print("创建索引或者交换表名失败:", e)
engine = DBUtil.get_db_engine(db_type, site_name)