dim_asin_variation_info.py 9.3 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173
import os
import sys
import time

sys.path.append(os.path.dirname(sys.path[0]))
from utils.db_util import DBUtil
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil, DateTypes
from datetime import datetime

if __name__ == '__main__':
    site_name = CommonUtil.get_sys_arg(1, None)
    date_type = CommonUtil.get_sys_arg(2, None)
    date_info = CommonUtil.get_sys_arg(3, None)
    #  获取最后一个参数
    last_flag = CommonUtil.get_sys_arg(len(sys.argv) - 1, None)
    cur_date = datetime.now().date()

    sql_date_type = date_type
    print(f"执行参数为{sys.argv}")
    db_type = 'postgresql_cluster'

    CommonUtil.judge_is_work_hours(site_name=site_name, date_type=date_type, date_info=date_info,
                                   principal='fangxingjun',
                                   priority=4, export_tools_type=1, belonging_to_process='变体表')

    # CommonUtil.judge_is_work_hours(site_name=site_name, date_type=date_type, date_info=date_info, principal='fangxingjun',
    #                                priority=3, export_tools_type=1, belonging_to_process='反查搜索词')

    # 获取数据库连接
    engine = DBUtil.get_db_engine(db_type, site_name)
    suffix = str(date_info).replace("-", "_")

    export_cols = [
        "asin",
        "parent_asin",
        "color",
        "size",
        "style",
        "state",
        "column_2",
        "column_1",
        "created_time",
        "updated_time",
        "created_date",
        "mapped_asin",
    ]

    export_tb_target = f"{site_name}_asin_variation"
    export_tb_copy = f"{site_name}_asin_variation_copy1"
    export_table = export_tb_copy
    sql = f"""
                   drop table if exists {export_tb_copy};
                   create table if not exists {export_tb_copy}
                  (
                      like {export_tb_target} including comments including defaults
                  ) PARTITION BY RANGE (mapped_asin);
                  truncate table {export_tb_copy};
                  SELECT create_distributed_table('{export_tb_copy}', 'asin');
                  

                CREATE TABLE public.{export_tb_copy}_part1 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (0) TO (50000000);
                CREATE TABLE public.{export_tb_copy}_part2 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (50000000) TO (100000000);
                CREATE TABLE public.{export_tb_copy}_part3 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (100000000) TO (150000000);
                CREATE TABLE public.{export_tb_copy}_part4 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (150000000) TO (200000000);
                CREATE TABLE public.{export_tb_copy}_part5 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (200000000) TO (250000000);
                CREATE TABLE public.{export_tb_copy}_part6 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (250000000) TO (300000000);
                CREATE TABLE public.{export_tb_copy}_part7 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (300000000) TO (350000000);
                CREATE TABLE public.{export_tb_copy}_part8 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (350000000) TO (400000000);
                CREATE TABLE public.{export_tb_copy}_part9 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (400000000) TO (450000000);
                CREATE TABLE public.{export_tb_copy}_part10 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (450000000) TO (500000000);
                CREATE TABLE public.{export_tb_copy}_part11 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (500000000) TO (550000000);
                CREATE TABLE public.{export_tb_copy}_part12 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (550000000) TO (600000000);
                CREATE TABLE public.{export_tb_copy}_part13 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (600000000) TO (650000000);
                CREATE TABLE public.{export_tb_copy}_part14 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (650000000) TO (700000000);
                CREATE TABLE public.{export_tb_copy}_part15 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (700000000) TO (750000000);
                CREATE TABLE public.{export_tb_copy}_part16 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (750000000) TO (800000000);
                CREATE TABLE public.{export_tb_copy}_part17 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (800000000) TO (850000000);
                CREATE TABLE public.{export_tb_copy}_part18 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (850000000) TO (900000000);
                CREATE TABLE public.{export_tb_copy}_part19 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (900000000) TO (950000000);
                CREATE TABLE public.{export_tb_copy}_part20 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (950000000) TO (1000000000);
                  """
    print(f"sql: {sql}")
    # 执行SQL语句
    DBUtil.engine_exec_sql(engine, sql)
    DBUtil.engine_exec_sql(engine, f"ALTER TABLE {export_tb_copy} DROP COLUMN IF EXISTS id;")
    print("导出的字段:", export_cols)

    partition_dict = {
        "site_name": site_name,
    }

    # 导出执行sqoop的sh编写
    sh = CommonUtil.build_export_sh(
        site_name=site_name,
        db_type=db_type,
        hive_tb="dim_asin_variation_info",
        export_tb=export_table,
        col=export_cols,
        partition_dict=partition_dict
    )

    client = SSHUtil.get_ssh_client()
    SSHUtil.exec_command_async(client, sh, ignore_err=False)
    client.close()

    # 重新获取数据库连接
    engine = DBUtil.get_db_engine(db_type, site_name)


    while True:
        try:
            with engine.begin() as conn:
                # 创建索引
                sql_index_create1 = f"""CREATE INDEX {export_tb_copy}_asin_hash ON public.{export_tb_copy} USING hash (asin);"""
                print(f"sql_index_create1: {sql_index_create1}")
                conn.execute(sql_index_create1)
                sql_index_create2 = f"""CREATE INDEX {export_tb_copy}_parent_asin_hash ON public.{export_tb_copy} USING hash (parent_asin);"""
                print(f"sql_index_create2: {sql_index_create2}")
                conn.execute(sql_index_create2)
                # 交换表名
                sql_table_rename1 = f"""alter table {export_tb_target} rename to {export_tb_target}_back; """
                print(f"sql_table_rename1: {sql_table_rename1}")
                conn.execute(sql_table_rename1)
                sql_table_rename2 = f"""alter table {export_tb_copy} rename to {export_tb_target};"""
                print(f"sql_table_rename2: {sql_table_rename2}")
                conn.execute(sql_table_rename2)
                sql_table_rename3 = f"""alter table {export_tb_target}_back rename to {export_tb_copy};"""
                print(f"sql_table_rename3: {sql_table_rename3}")
                conn.execute(sql_table_rename3)
                # 交换索引名称
                # asin_hash
                sql_index_exchange1 = f"""ALTER INDEX {export_tb_target}_asin_hash RENAME TO {export_tb_target}_asin_hash_temp;"""
                print(f"sql_index_exchange1: {sql_index_exchange1}")
                conn.execute(sql_index_exchange1)
                sql_index_exchange2 = f"""ALTER INDEX {export_tb_copy}_asin_hash RENAME TO {export_tb_target}_asin_hash;"""
                print(f"sql_index_exchange2: {sql_index_exchange2}")
                conn.execute(sql_index_exchange2)
                sql_index_exchange3 = f"""ALTER INDEX {export_tb_target}_asin_hash_temp RENAME TO {export_tb_copy}_asin_hash;"""
                print(f"sql_index_exchange3: {sql_index_exchange3}")
                conn.execute(sql_index_exchange3)
                # parent_asin_hash
                sql_index_exchange1 = f"""ALTER INDEX {export_tb_target}_parent_asin_hash RENAME TO {export_tb_target}_parent_asin_hash_temp;"""
                print(f"sql_index_exchange1: {sql_index_exchange1}")
                conn.execute(sql_index_exchange1)
                sql_index_exchange2 = f"""ALTER INDEX {export_tb_copy}_parent_asin_hash RENAME TO {export_tb_target}_parent_asin_hash;"""
                print(f"sql_index_exchange2: {sql_index_exchange2}")
                conn.execute(sql_index_exchange2)
                sql_index_exchange3 = f"""ALTER INDEX {export_tb_target}_parent_asin_hash_temp RENAME TO {export_tb_copy}_parent_asin_hash;"""
                print(f"sql_index_exchange3: {sql_index_exchange3}")
                conn.execute(sql_index_exchange3)

                # 交换分区名称
                for part_num in range(1, 21):
                    sql_1 = f"""alter table {export_tb_target}_part{part_num} rename to {export_tb_target}_part{part_num}_back; """
                    conn.execute(sql_1)
                    sql_2 = f"""alter table {export_tb_copy}_part{part_num} rename to {export_tb_target}_part{part_num};"""
                    conn.execute(sql_2)
                    sql_3 = f"""alter table {export_tb_target}_part{part_num}_back rename to {export_tb_copy}_part{part_num};"""
                    conn.execute(sql_3)
                    print(sql_1)
                    print(sql_2)
                    print(sql_3)

            break
        except Exception as e:
            time.sleep(60)
            print("创建索引或者交换表名失败:", e)
            engine = DBUtil.get_db_engine(db_type, site_name)