1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
import os
import sys
import time
sys.path.append(os.path.dirname(sys.path[0]))
from utils.db_util import DBUtil
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil, DateTypes
from datetime import datetime
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
date_type = CommonUtil.get_sys_arg(2, None)
date_info = CommonUtil.get_sys_arg(3, None)
# 获取最后一个参数
last_flag = CommonUtil.get_sys_arg(len(sys.argv) - 1, None)
cur_date = datetime.now().date()
sql_date_type = date_type
print(f"执行参数为{sys.argv}")
db_type = 'postgresql_cluster'
CommonUtil.judge_is_work_hours(site_name=site_name, date_type=date_type, date_info=date_info,
principal='fangxingjun',
priority=4, export_tools_type=1, belonging_to_process='变体表')
# CommonUtil.judge_is_work_hours(site_name=site_name, date_type=date_type, date_info=date_info, principal='fangxingjun',
# priority=3, export_tools_type=1, belonging_to_process='反查搜索词')
# 获取数据库连接
engine = DBUtil.get_db_engine(db_type, site_name)
suffix = str(date_info).replace("-", "_")
export_cols = [
"asin",
"parent_asin",
"color",
"size",
"style",
"state",
"column_2",
"column_1",
"created_time",
"updated_time",
"created_date",
"mapped_asin",
]
export_tb_target = f"{site_name}_asin_variation"
export_tb_copy = f"{site_name}_asin_variation_copy1"
export_table = export_tb_copy
sql = f"""
drop table if exists {export_tb_copy};
create table if not exists {export_tb_copy}
(
like {export_tb_target} including comments including defaults
) PARTITION BY RANGE (mapped_asin);
truncate table {export_tb_copy};
SELECT create_distributed_table('{export_tb_copy}', 'asin');
CREATE TABLE public.{export_tb_copy}_part1 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (0) TO (50000000);
CREATE TABLE public.{export_tb_copy}_part2 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (50000000) TO (100000000);
CREATE TABLE public.{export_tb_copy}_part3 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (100000000) TO (150000000);
CREATE TABLE public.{export_tb_copy}_part4 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (150000000) TO (200000000);
CREATE TABLE public.{export_tb_copy}_part5 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (200000000) TO (250000000);
CREATE TABLE public.{export_tb_copy}_part6 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (250000000) TO (300000000);
CREATE TABLE public.{export_tb_copy}_part7 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (300000000) TO (350000000);
CREATE TABLE public.{export_tb_copy}_part8 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (350000000) TO (400000000);
CREATE TABLE public.{export_tb_copy}_part9 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (400000000) TO (450000000);
CREATE TABLE public.{export_tb_copy}_part10 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (450000000) TO (500000000);
CREATE TABLE public.{export_tb_copy}_part11 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (500000000) TO (550000000);
CREATE TABLE public.{export_tb_copy}_part12 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (550000000) TO (600000000);
CREATE TABLE public.{export_tb_copy}_part13 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (600000000) TO (650000000);
CREATE TABLE public.{export_tb_copy}_part14 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (650000000) TO (700000000);
CREATE TABLE public.{export_tb_copy}_part15 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (700000000) TO (750000000);
CREATE TABLE public.{export_tb_copy}_part16 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (750000000) TO (800000000);
CREATE TABLE public.{export_tb_copy}_part17 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (800000000) TO (850000000);
CREATE TABLE public.{export_tb_copy}_part18 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (850000000) TO (900000000);
CREATE TABLE public.{export_tb_copy}_part19 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (900000000) TO (950000000);
CREATE TABLE public.{export_tb_copy}_part20 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (950000000) TO (1000000000);
"""
print(f"sql: {sql}")
# 执行SQL语句
DBUtil.engine_exec_sql(engine, sql)
DBUtil.engine_exec_sql(engine, f"ALTER TABLE {export_tb_copy} DROP COLUMN IF EXISTS id;")
print("导出的字段:", export_cols)
partition_dict = {
"site_name": site_name,
}
# 导出执行sqoop的sh编写
sh = CommonUtil.build_export_sh(
site_name=site_name,
db_type=db_type,
hive_tb="dim_asin_variation_info",
export_tb=export_table,
col=export_cols,
partition_dict=partition_dict
)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
client.close()
# 重新获取数据库连接
engine = DBUtil.get_db_engine(db_type, site_name)
while True:
try:
with engine.begin() as conn:
# 创建索引
sql_index_create1 = f"""CREATE INDEX {export_tb_copy}_asin_hash ON public.{export_tb_copy} USING hash (asin);"""
print(f"sql_index_create1: {sql_index_create1}")
conn.execute(sql_index_create1)
sql_index_create2 = f"""CREATE INDEX {export_tb_copy}_parent_asin_hash ON public.{export_tb_copy} USING hash (parent_asin);"""
print(f"sql_index_create2: {sql_index_create2}")
conn.execute(sql_index_create2)
# 交换表名
sql_table_rename1 = f"""alter table {export_tb_target} rename to {export_tb_target}_back; """
print(f"sql_table_rename1: {sql_table_rename1}")
conn.execute(sql_table_rename1)
sql_table_rename2 = f"""alter table {export_tb_copy} rename to {export_tb_target};"""
print(f"sql_table_rename2: {sql_table_rename2}")
conn.execute(sql_table_rename2)
sql_table_rename3 = f"""alter table {export_tb_target}_back rename to {export_tb_copy};"""
print(f"sql_table_rename3: {sql_table_rename3}")
conn.execute(sql_table_rename3)
# 交换索引名称
# asin_hash
sql_index_exchange1 = f"""ALTER INDEX {export_tb_target}_asin_hash RENAME TO {export_tb_target}_asin_hash_temp;"""
print(f"sql_index_exchange1: {sql_index_exchange1}")
conn.execute(sql_index_exchange1)
sql_index_exchange2 = f"""ALTER INDEX {export_tb_copy}_asin_hash RENAME TO {export_tb_target}_asin_hash;"""
print(f"sql_index_exchange2: {sql_index_exchange2}")
conn.execute(sql_index_exchange2)
sql_index_exchange3 = f"""ALTER INDEX {export_tb_target}_asin_hash_temp RENAME TO {export_tb_copy}_asin_hash;"""
print(f"sql_index_exchange3: {sql_index_exchange3}")
conn.execute(sql_index_exchange3)
# parent_asin_hash
sql_index_exchange1 = f"""ALTER INDEX {export_tb_target}_parent_asin_hash RENAME TO {export_tb_target}_parent_asin_hash_temp;"""
print(f"sql_index_exchange1: {sql_index_exchange1}")
conn.execute(sql_index_exchange1)
sql_index_exchange2 = f"""ALTER INDEX {export_tb_copy}_parent_asin_hash RENAME TO {export_tb_target}_parent_asin_hash;"""
print(f"sql_index_exchange2: {sql_index_exchange2}")
conn.execute(sql_index_exchange2)
sql_index_exchange3 = f"""ALTER INDEX {export_tb_target}_parent_asin_hash_temp RENAME TO {export_tb_copy}_parent_asin_hash;"""
print(f"sql_index_exchange3: {sql_index_exchange3}")
conn.execute(sql_index_exchange3)
# 交换分区名称
for part_num in range(1, 21):
sql_1 = f"""alter table {export_tb_target}_part{part_num} rename to {export_tb_target}_part{part_num}_back; """
conn.execute(sql_1)
sql_2 = f"""alter table {export_tb_copy}_part{part_num} rename to {export_tb_target}_part{part_num};"""
conn.execute(sql_2)
sql_3 = f"""alter table {export_tb_target}_part{part_num}_back rename to {export_tb_copy}_part{part_num};"""
conn.execute(sql_3)
print(sql_1)
print(sql_2)
print(sql_3)
break
except Exception as e:
time.sleep(60)
print("创建索引或者交换表名失败:", e)
engine = DBUtil.get_db_engine(db_type, site_name)