dwd_asin_to_pg.py 6.78 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
import os
import sys
import time

sys.path.append(os.path.dirname(sys.path[0]))
from utils.db_util import DBUtil
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil,DateTypes
from datetime import datetime


if __name__ == '__main__':
    site_name = CommonUtil.get_sys_arg(1, None)
    date_type = CommonUtil.get_sys_arg(2, None)
    date_info = CommonUtil.get_sys_arg(3, None)
    #  获取最后一个参数
    last_flag = CommonUtil.get_sys_arg(len(sys.argv) - 1, None)
    cur_date = datetime.now().date()

    sql_date_type = date_type
    print(f"执行参数为{sys.argv}")
    db_type = 'postgresql_cluster'

    CommonUtil.judge_is_work_hours(site_name=site_name, date_type=date_type, date_info=date_info, principal='fangxingjun',
                                   priority=4, export_tools_type=1, belonging_to_process='反查搜索词')

    # 获取数据库连接
    engine = DBUtil.get_db_engine(db_type, site_name)
    suffix = str(date_info).replace("-", "_")

    export_cols = [
        "asin",
        "parent_asin",
        "color",
        "size",
        "style",
        "state",
        "column_2",
        "column_1",
        "created_time",
        "updated_time",
        "created_date",
        "mapped_asin",
    ]

    export_tb_target = f"{site_name}_asin_variation"
    export_tb_copy = f"{site_name}_asin_variation_copy1"
    export_table = export_tb_copy
    sql = f"""
                   drop table if exists {export_tb_copy};
                   create table if not exists {export_tb_copy}
                  (
                      like {export_tb_target} including comments including defaults
                  );
                  truncate table {export_tb_copy};
                  SELECT create_distributed_table('{export_tb_copy}', 'asin');
                  
                CREATE TABLE public.{export_tb_copy}_part1 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (0) TO (50000000);
                CREATE TABLE public.{export_tb_copy}_part2 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (50000000) TO (100000000);
                CREATE TABLE public.{export_tb_copy}_part3 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (100000000) TO (150000000);
                CREATE TABLE public.{export_tb_copy}_part4 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (150000000) TO (200000000);
                CREATE TABLE public.{export_tb_copy}_part5 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (200000000) TO (250000000);
                CREATE TABLE public.{export_tb_copy}_part6 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (250000000) TO (300000000);
                CREATE TABLE public.{export_tb_copy}_part7 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (300000000) TO (350000000);
                CREATE TABLE public.{export_tb_copy}_part8 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (350000000) TO (400000000);
                CREATE TABLE public.{export_tb_copy}_part9 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (400000000) TO (450000000);
                CREATE TABLE public.{export_tb_copy}_part10 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (450000000) TO (500000000);
                CREATE TABLE public.{export_tb_copy}_part11 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (500000000) TO (550000000);
                CREATE TABLE public.{export_tb_copy}_part12 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (550000000) TO (600000000);
                CREATE TABLE public.{export_tb_copy}_part13 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (600000000) TO (650000000);
                CREATE TABLE public.{export_tb_copy}_part14 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (650000000) TO (700000000);
                CREATE TABLE public.{export_tb_copy}_part15 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (700000000) TO (750000000);
                CREATE TABLE public.{export_tb_copy}_part16 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (750000000) TO (800000000);
                CREATE TABLE public.{export_tb_copy}_part17 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (800000000) TO (850000000);
                CREATE TABLE public.{export_tb_copy}_part18 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (850000000) TO (900000000);
                CREATE TABLE public.{export_tb_copy}_part19 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (900000000) TO (950000000);
                CREATE TABLE public.{export_tb_copy}_part20 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (950000000) TO (1000000000);
                  """
    # 执行SQL语句
    DBUtil.engine_exec_sql(engine, sql)
    print("导出的字段:", export_cols)

    partition_dict = {
        "site_name": site_name,
    }

    # 导出执行sqoop的sh编写
    sh = CommonUtil.build_export_sh(
        site_name=site_name,
        db_type=db_type,
        hive_tb="dim_asin_variation_info",
        export_tb=export_table,
        col=export_cols,
        partition_dict=partition_dict
    )

    client = SSHUtil.get_ssh_client()
    SSHUtil.exec_command_async(client, sh, ignore_err=False)
    client.close()

    # 重新获取数据库连接
    engine = DBUtil.get_db_engine(db_type, site_name)

    while True:
        try:
            with engine.begin() as conn:
                # 创建索引
                sql_index_create = f"""
                    CREATE INDEX {export_tb_copy}_asin_btree ON public.{export_tb_copy} USING btree (asin);
                    CREATE INDEX {export_tb_copy}_parent_asin_btree ON public.{export_tb_copy} USING btree (parent_asin);"""
                conn.execute(sql_index_create)
                print(f"sql_index_create: {sql_index_create}")
                # 交换表名
                sql_1 = f"""alter table {export_tb_target} rename to {export_tb_target}_back; """
                conn.execute(sql_1)
                sql_2 = f"""alter table {export_tb_copy} rename to {export_tb_target};"""
                conn.execute(sql_2)
                sql_3 = f"""alter table {export_tb_target}_back rename to {export_tb_copy};"""
                conn.execute(sql_3)
                # 互换索引名称
                sql_index_exchange = f"""
                    ALTER INDEX {export_tb_target}_asin_btree RENAME TO {export_tb_target}_asin_btree_temp;
                    ALTER INDEX {export_tb_copy}_asin_btree RENAME TO {export_tb_target}_asin_btree;
                    ALTER INDEX {export_tb_target}_asin_btree_temp RENAME TO {export_tb_copy}_asin_btree;
                """
                print(f"sql_index_exchange: {sql_index_exchange}")
                conn.execute(sql_index_exchange)

            break
        except Exception as e:
            time.sleep(60)
            print("创建索引或者交换表名失败:", e)
            engine = DBUtil.get_db_engine(db_type, site_name)