import os import sys # sys.path.append(os.path.dirname(sys.path[0])) # sys.path.append(r'E:\Amazon-Selection\Pyspark_job') from Pyspark_job.utils.db_util import DBUtil def run(): engine = DBUtil.get_db_engine("postgresql", "us") tb_name = "us_aba_profit_gross" sql = f""" select pt.relname as partition_name, pg_get_expr(pt.relpartbound, pt.oid, true) as partition_expression from pg_class base_tb join pg_inherits i on i.inhparent = base_tb.oid join pg_class pt on pt.oid = i.inhrelid where base_tb.oid = '{tb_name}'::regclass order by partition_expression desc """ exe_sql_list = [] with engine.connect() as connection: result = connection.execute(sql) rows = list(result) # 先全部脱离分区 for row in rows: partition_tb = row['partition_name'] partition_expression = row['partition_expression'] sql = f""" alter table {tb_name} detach partition {partition_tb}; """ exe_sql_list.append(sql) alter_list = [ "alter column fba_fee type double precision using fba_fee::double precision;", "alter column ocean_freight type double precision using ocean_freight::double precision;", "alter column air_delivery_fee type double precision using air_delivery_fee::double precision;", "alter column referral_fee type double precision using referral_fee::double precision;", "alter column return_ratio type double precision using return_ratio::double precision;", "alter column operating_costs type double precision using operating_costs::double precision;", "alter column costs type double precision using costs::double precision;", "alter column advertise type double precision using advertise::double precision;", "alter column gross_profit_fee_air type double precision using gross_profit_fee_air::double precision;", "alter column gross_profit_fee_sea type double precision using gross_profit_fee_sea::double precision;", "alter column price type double precision using price::double precision;", "alter column longs type double precision using longs::double precision;", "alter column width type double precision using width::double precision;", "alter column high type double precision using high::double precision;", "alter column weight type double precision using weight::double precision;", "alter column user_fba_fee type double precision using user_fba_fee::double precision;", "alter column user_gross_profit_fee_sea type double precision using user_gross_profit_fee_sea::double precision;", ] # 修改父表表结构 for item in alter_list: sql = f"""alter table {tb_name} {item}""" exe_sql_list.append(sql) pass for row in rows: partition_tb = row['partition_name'] partition_expression = str(row['partition_expression']).lower() # 依次修改从表表结构 for item in alter_list: sql = f"""alter table {partition_tb} {item};""" exe_sql_list.append(sql) pass # 加入分区 sql = f"alter table {tb_name} attach partition {partition_tb} {partition_expression};" exe_sql_list.append(sql) with open("exec.sql", "w") as file: for sql in exe_sql_list: sql = sql.strip() if sql.endswith(";;"): sql = sql.replace(";;", ';') file.write(f"{sql}\n") print("success") if __name__ == '__main__': run()