dim_asin_variation_info.py 9.3 KB
import os
import sys
import time

sys.path.append(os.path.dirname(sys.path[0]))
from utils.db_util import DBUtil
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil, DateTypes
from datetime import datetime

if __name__ == '__main__':
    site_name = CommonUtil.get_sys_arg(1, None)
    date_type = CommonUtil.get_sys_arg(2, None)
    date_info = CommonUtil.get_sys_arg(3, None)
    #  获取最后一个参数
    last_flag = CommonUtil.get_sys_arg(len(sys.argv) - 1, None)
    cur_date = datetime.now().date()

    sql_date_type = date_type
    print(f"执行参数为{sys.argv}")
    db_type = 'postgresql_cluster'

    CommonUtil.judge_is_work_hours(site_name=site_name, date_type=date_type, date_info=date_info,
                                   principal='fangxingjun',
                                   priority=4, export_tools_type=1, belonging_to_process='变体表')

    # CommonUtil.judge_is_work_hours(site_name=site_name, date_type=date_type, date_info=date_info, principal='fangxingjun',
    #                                priority=3, export_tools_type=1, belonging_to_process='反查搜索词')

    # 获取数据库连接
    engine = DBUtil.get_db_engine(db_type, site_name)
    suffix = str(date_info).replace("-", "_")

    export_cols = [
        "asin",
        "parent_asin",
        "color",
        "size",
        "style",
        "state",
        "column_2",
        "column_1",
        "created_time",
        "updated_time",
        "created_date",
        "mapped_asin",
    ]

    export_tb_target = f"{site_name}_asin_variation"
    export_tb_copy = f"{site_name}_asin_variation_copy1"
    export_table = export_tb_copy
    sql = f"""
                   drop table if exists {export_tb_copy};
                   create table if not exists {export_tb_copy}
                  (
                      like {export_tb_target} including comments including defaults
                  ) PARTITION BY RANGE (mapped_asin);
                  truncate table {export_tb_copy};
                  SELECT create_distributed_table('{export_tb_copy}', 'asin');
                  

                CREATE TABLE public.{export_tb_copy}_part1 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (0) TO (50000000);
                CREATE TABLE public.{export_tb_copy}_part2 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (50000000) TO (100000000);
                CREATE TABLE public.{export_tb_copy}_part3 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (100000000) TO (150000000);
                CREATE TABLE public.{export_tb_copy}_part4 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (150000000) TO (200000000);
                CREATE TABLE public.{export_tb_copy}_part5 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (200000000) TO (250000000);
                CREATE TABLE public.{export_tb_copy}_part6 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (250000000) TO (300000000);
                CREATE TABLE public.{export_tb_copy}_part7 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (300000000) TO (350000000);
                CREATE TABLE public.{export_tb_copy}_part8 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (350000000) TO (400000000);
                CREATE TABLE public.{export_tb_copy}_part9 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (400000000) TO (450000000);
                CREATE TABLE public.{export_tb_copy}_part10 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (450000000) TO (500000000);
                CREATE TABLE public.{export_tb_copy}_part11 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (500000000) TO (550000000);
                CREATE TABLE public.{export_tb_copy}_part12 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (550000000) TO (600000000);
                CREATE TABLE public.{export_tb_copy}_part13 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (600000000) TO (650000000);
                CREATE TABLE public.{export_tb_copy}_part14 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (650000000) TO (700000000);
                CREATE TABLE public.{export_tb_copy}_part15 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (700000000) TO (750000000);
                CREATE TABLE public.{export_tb_copy}_part16 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (750000000) TO (800000000);
                CREATE TABLE public.{export_tb_copy}_part17 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (800000000) TO (850000000);
                CREATE TABLE public.{export_tb_copy}_part18 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (850000000) TO (900000000);
                CREATE TABLE public.{export_tb_copy}_part19 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (900000000) TO (950000000);
                CREATE TABLE public.{export_tb_copy}_part20 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (950000000) TO (1000000000);
                  """
    print(f"sql: {sql}")
    # 执行SQL语句
    DBUtil.engine_exec_sql(engine, sql)
    DBUtil.engine_exec_sql(engine, f"ALTER TABLE {export_tb_copy} DROP COLUMN IF EXISTS id;")
    print("导出的字段:", export_cols)

    partition_dict = {
        "site_name": site_name,
    }

    # 导出执行sqoop的sh编写
    sh = CommonUtil.build_export_sh(
        site_name=site_name,
        db_type=db_type,
        hive_tb="dim_asin_variation_info",
        export_tb=export_table,
        col=export_cols,
        partition_dict=partition_dict
    )

    client = SSHUtil.get_ssh_client()
    SSHUtil.exec_command_async(client, sh, ignore_err=False)
    client.close()

    # 重新获取数据库连接
    engine = DBUtil.get_db_engine(db_type, site_name)


    while True:
        try:
            with engine.begin() as conn:
                # 创建索引
                sql_index_create1 = f"""CREATE INDEX {export_tb_copy}_asin_hash ON public.{export_tb_copy} USING hash (asin);"""
                print(f"sql_index_create1: {sql_index_create1}")
                conn.execute(sql_index_create1)
                sql_index_create2 = f"""CREATE INDEX {export_tb_copy}_parent_asin_hash ON public.{export_tb_copy} USING hash (parent_asin);"""
                print(f"sql_index_create2: {sql_index_create2}")
                conn.execute(sql_index_create2)
                # 交换表名
                sql_table_rename1 = f"""alter table {export_tb_target} rename to {export_tb_target}_back; """
                print(f"sql_table_rename1: {sql_table_rename1}")
                conn.execute(sql_table_rename1)
                sql_table_rename2 = f"""alter table {export_tb_copy} rename to {export_tb_target};"""
                print(f"sql_table_rename2: {sql_table_rename2}")
                conn.execute(sql_table_rename2)
                sql_table_rename3 = f"""alter table {export_tb_target}_back rename to {export_tb_copy};"""
                print(f"sql_table_rename3: {sql_table_rename3}")
                conn.execute(sql_table_rename3)
                # 交换索引名称
                # asin_hash
                sql_index_exchange1 = f"""ALTER INDEX {export_tb_target}_asin_hash RENAME TO {export_tb_target}_asin_hash_temp;"""
                print(f"sql_index_exchange1: {sql_index_exchange1}")
                conn.execute(sql_index_exchange1)
                sql_index_exchange2 = f"""ALTER INDEX {export_tb_copy}_asin_hash RENAME TO {export_tb_target}_asin_hash;"""
                print(f"sql_index_exchange2: {sql_index_exchange2}")
                conn.execute(sql_index_exchange2)
                sql_index_exchange3 = f"""ALTER INDEX {export_tb_target}_asin_hash_temp RENAME TO {export_tb_copy}_asin_hash;"""
                print(f"sql_index_exchange3: {sql_index_exchange3}")
                conn.execute(sql_index_exchange3)
                # parent_asin_hash
                sql_index_exchange1 = f"""ALTER INDEX {export_tb_target}_parent_asin_hash RENAME TO {export_tb_target}_parent_asin_hash_temp;"""
                print(f"sql_index_exchange1: {sql_index_exchange1}")
                conn.execute(sql_index_exchange1)
                sql_index_exchange2 = f"""ALTER INDEX {export_tb_copy}_parent_asin_hash RENAME TO {export_tb_target}_parent_asin_hash;"""
                print(f"sql_index_exchange2: {sql_index_exchange2}")
                conn.execute(sql_index_exchange2)
                sql_index_exchange3 = f"""ALTER INDEX {export_tb_target}_parent_asin_hash_temp RENAME TO {export_tb_copy}_parent_asin_hash;"""
                print(f"sql_index_exchange3: {sql_index_exchange3}")
                conn.execute(sql_index_exchange3)

                # 交换分区名称
                for part_num in range(1, 21):
                    sql_1 = f"""alter table {export_tb_target}_part{part_num} rename to {export_tb_target}_part{part_num}_back; """
                    conn.execute(sql_1)
                    sql_2 = f"""alter table {export_tb_copy}_part{part_num} rename to {export_tb_target}_part{part_num};"""
                    conn.execute(sql_2)
                    sql_3 = f"""alter table {export_tb_target}_part{part_num}_back rename to {export_tb_copy}_part{part_num};"""
                    conn.execute(sql_3)
                    print(sql_1)
                    print(sql_2)
                    print(sql_3)

            break
        except Exception as e:
            time.sleep(60)
            print("创建索引或者交换表名失败:", e)
            engine = DBUtil.get_db_engine(db_type, site_name)