1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import os
import sys
# sys.path.append(os.path.dirname(sys.path[0]))
# sys.path.append(r'E:\Amazon-Selection\Pyspark_job')
from Pyspark_job.utils.db_util import DBUtil
def run():
engine = DBUtil.get_db_engine("postgresql", "us")
tb_name = "us_aba_profit_gross"
sql = f"""
select pt.relname as partition_name,
pg_get_expr(pt.relpartbound, pt.oid, true) as partition_expression
from pg_class base_tb
join pg_inherits i on i.inhparent = base_tb.oid
join pg_class pt on pt.oid = i.inhrelid
where base_tb.oid = '{tb_name}'::regclass
order by partition_expression desc
"""
exe_sql_list = []
with engine.connect() as connection:
result = connection.execute(sql)
rows = list(result)
# 先全部脱离分区
for row in rows:
partition_tb = row['partition_name']
partition_expression = row['partition_expression']
sql = f"""
alter table {tb_name} detach partition {partition_tb};
"""
exe_sql_list.append(sql)
alter_list = [
"alter column fba_fee type double precision using fba_fee::double precision;",
"alter column ocean_freight type double precision using ocean_freight::double precision;",
"alter column air_delivery_fee type double precision using air_delivery_fee::double precision;",
"alter column referral_fee type double precision using referral_fee::double precision;",
"alter column return_ratio type double precision using return_ratio::double precision;",
"alter column operating_costs type double precision using operating_costs::double precision;",
"alter column costs type double precision using costs::double precision;",
"alter column advertise type double precision using advertise::double precision;",
"alter column gross_profit_fee_air type double precision using gross_profit_fee_air::double precision;",
"alter column gross_profit_fee_sea type double precision using gross_profit_fee_sea::double precision;",
"alter column price type double precision using price::double precision;",
"alter column longs type double precision using longs::double precision;",
"alter column width type double precision using width::double precision;",
"alter column high type double precision using high::double precision;",
"alter column weight type double precision using weight::double precision;",
"alter column user_fba_fee type double precision using user_fba_fee::double precision;",
"alter column user_gross_profit_fee_sea type double precision using user_gross_profit_fee_sea::double precision;",
]
# 修改父表表结构
for item in alter_list:
sql = f"""alter table {tb_name} {item}"""
exe_sql_list.append(sql)
pass
for row in rows:
partition_tb = row['partition_name']
partition_expression = str(row['partition_expression']).lower()
# 依次修改从表表结构
for item in alter_list:
sql = f"""alter table {partition_tb} {item};"""
exe_sql_list.append(sql)
pass
# 加入分区
sql = f"alter table {tb_name} attach partition {partition_tb} {partition_expression};"
exe_sql_list.append(sql)
with open("exec.sql", "w") as file:
for sql in exe_sql_list:
sql = sql.strip()
if sql.endswith(";;"):
sql = sql.replace(";;", ';')
file.write(f"{sql}\n")
print("success")
if __name__ == '__main__':
run()