alter_col_script.py 3.77 KB
import os
import sys

# sys.path.append(os.path.dirname(sys.path[0]))
# sys.path.append(r'E:\Amazon-Selection\Pyspark_job')

from Pyspark_job.utils.db_util import DBUtil


def run():
    engine = DBUtil.get_db_engine("postgresql", "us")
    tb_name = "us_aba_profit_gross"
    sql = f"""
        select pt.relname                                 as partition_name,
       pg_get_expr(pt.relpartbound, pt.oid, true) as partition_expression
    from pg_class base_tb
         join pg_inherits i on i.inhparent = base_tb.oid
         join pg_class pt on pt.oid = i.inhrelid
    where base_tb.oid = '{tb_name}'::regclass
    order by partition_expression desc
            """
    exe_sql_list = []
    with engine.connect() as connection:
        result = connection.execute(sql)
        rows = list(result)
        #  先全部脱离分区
        for row in rows:
            partition_tb = row['partition_name']
            partition_expression = row['partition_expression']

            sql = f"""
                    alter table {tb_name} detach partition {partition_tb};
                    """
            exe_sql_list.append(sql)

        alter_list = [
            "alter column fba_fee type double precision using fba_fee::double precision;",
            "alter column ocean_freight type double precision using ocean_freight::double precision;",
            "alter column air_delivery_fee type double precision using air_delivery_fee::double precision;",
            "alter column referral_fee type double precision using referral_fee::double precision;",
            "alter column return_ratio type double precision using return_ratio::double precision;",
            "alter column operating_costs type double precision using operating_costs::double precision;",
            "alter column costs type double precision using costs::double precision;",
            "alter column advertise type double precision using advertise::double precision;",
            "alter column gross_profit_fee_air type double precision using gross_profit_fee_air::double precision;",
            "alter column gross_profit_fee_sea type double precision using gross_profit_fee_sea::double precision;",
            "alter column price type double precision using price::double precision;",
            "alter column longs type double precision using longs::double precision;",
            "alter column width type double precision using width::double precision;",
            "alter column high type double precision using high::double precision;",
            "alter column weight type double precision using weight::double precision;",
            "alter column user_fba_fee type double precision using user_fba_fee::double precision;",
            "alter column user_gross_profit_fee_sea type double precision using user_gross_profit_fee_sea::double precision;",
        ]

        # 修改父表表结构
        for item in alter_list:
            sql = f"""alter table {tb_name} {item}"""
            exe_sql_list.append(sql)
            pass

        for row in rows:
            partition_tb = row['partition_name']
            partition_expression = str(row['partition_expression']).lower()
            # 依次修改从表表结构
            for item in alter_list:
                sql = f"""alter table {partition_tb} {item};"""
                exe_sql_list.append(sql)
                pass
            # 加入分区
            sql = f"alter table {tb_name} attach partition {partition_tb} {partition_expression};"
            exe_sql_list.append(sql)

        with open("exec.sql", "w") as file:
            for sql in exe_sql_list:
                sql = sql.strip()
                if sql.endswith(";;"):
                    sql = sql.replace(";;", ';')
                file.write(f"{sql}\n")
        print("success")


if __name__ == '__main__':
    run()