dwd_asin_to_pg.py 6.78 KB
import os
import sys
import time

sys.path.append(os.path.dirname(sys.path[0]))
from utils.db_util import DBUtil
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil,DateTypes
from datetime import datetime


if __name__ == '__main__':
    site_name = CommonUtil.get_sys_arg(1, None)
    date_type = CommonUtil.get_sys_arg(2, None)
    date_info = CommonUtil.get_sys_arg(3, None)
    #  获取最后一个参数
    last_flag = CommonUtil.get_sys_arg(len(sys.argv) - 1, None)
    cur_date = datetime.now().date()

    sql_date_type = date_type
    print(f"执行参数为{sys.argv}")
    db_type = 'postgresql_cluster'

    CommonUtil.judge_is_work_hours(site_name=site_name, date_type=date_type, date_info=date_info, principal='fangxingjun',
                                   priority=4, export_tools_type=1, belonging_to_process='反查搜索词')

    # 获取数据库连接
    engine = DBUtil.get_db_engine(db_type, site_name)
    suffix = str(date_info).replace("-", "_")

    export_cols = [
        "asin",
        "parent_asin",
        "color",
        "size",
        "style",
        "state",
        "column_2",
        "column_1",
        "created_time",
        "updated_time",
        "created_date",
        "mapped_asin",
    ]

    export_tb_target = f"{site_name}_asin_variation"
    export_tb_copy = f"{site_name}_asin_variation_copy1"
    export_table = export_tb_copy
    sql = f"""
                   drop table if exists {export_tb_copy};
                   create table if not exists {export_tb_copy}
                  (
                      like {export_tb_target} including comments including defaults
                  );
                  truncate table {export_tb_copy};
                  SELECT create_distributed_table('{export_tb_copy}', 'asin');
                  
                CREATE TABLE public.{export_tb_copy}_part1 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (0) TO (50000000);
                CREATE TABLE public.{export_tb_copy}_part2 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (50000000) TO (100000000);
                CREATE TABLE public.{export_tb_copy}_part3 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (100000000) TO (150000000);
                CREATE TABLE public.{export_tb_copy}_part4 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (150000000) TO (200000000);
                CREATE TABLE public.{export_tb_copy}_part5 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (200000000) TO (250000000);
                CREATE TABLE public.{export_tb_copy}_part6 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (250000000) TO (300000000);
                CREATE TABLE public.{export_tb_copy}_part7 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (300000000) TO (350000000);
                CREATE TABLE public.{export_tb_copy}_part8 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (350000000) TO (400000000);
                CREATE TABLE public.{export_tb_copy}_part9 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (400000000) TO (450000000);
                CREATE TABLE public.{export_tb_copy}_part10 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (450000000) TO (500000000);
                CREATE TABLE public.{export_tb_copy}_part11 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (500000000) TO (550000000);
                CREATE TABLE public.{export_tb_copy}_part12 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (550000000) TO (600000000);
                CREATE TABLE public.{export_tb_copy}_part13 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (600000000) TO (650000000);
                CREATE TABLE public.{export_tb_copy}_part14 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (650000000) TO (700000000);
                CREATE TABLE public.{export_tb_copy}_part15 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (700000000) TO (750000000);
                CREATE TABLE public.{export_tb_copy}_part16 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (750000000) TO (800000000);
                CREATE TABLE public.{export_tb_copy}_part17 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (800000000) TO (850000000);
                CREATE TABLE public.{export_tb_copy}_part18 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (850000000) TO (900000000);
                CREATE TABLE public.{export_tb_copy}_part19 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (900000000) TO (950000000);
                CREATE TABLE public.{export_tb_copy}_part20 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (950000000) TO (1000000000);
                  """
    # 执行SQL语句
    DBUtil.engine_exec_sql(engine, sql)
    print("导出的字段:", export_cols)

    partition_dict = {
        "site_name": site_name,
    }

    # 导出执行sqoop的sh编写
    sh = CommonUtil.build_export_sh(
        site_name=site_name,
        db_type=db_type,
        hive_tb="dim_asin_variation_info",
        export_tb=export_table,
        col=export_cols,
        partition_dict=partition_dict
    )

    client = SSHUtil.get_ssh_client()
    SSHUtil.exec_command_async(client, sh, ignore_err=False)
    client.close()

    # 重新获取数据库连接
    engine = DBUtil.get_db_engine(db_type, site_name)

    while True:
        try:
            with engine.begin() as conn:
                # 创建索引
                sql_index_create = f"""
                    CREATE INDEX {export_tb_copy}_asin_btree ON public.{export_tb_copy} USING btree (asin);
                    CREATE INDEX {export_tb_copy}_parent_asin_btree ON public.{export_tb_copy} USING btree (parent_asin);"""
                conn.execute(sql_index_create)
                print(f"sql_index_create: {sql_index_create}")
                # 交换表名
                sql_1 = f"""alter table {export_tb_target} rename to {export_tb_target}_back; """
                conn.execute(sql_1)
                sql_2 = f"""alter table {export_tb_copy} rename to {export_tb_target};"""
                conn.execute(sql_2)
                sql_3 = f"""alter table {export_tb_target}_back rename to {export_tb_copy};"""
                conn.execute(sql_3)
                # 互换索引名称
                sql_index_exchange = f"""
                    ALTER INDEX {export_tb_target}_asin_btree RENAME TO {export_tb_target}_asin_btree_temp;
                    ALTER INDEX {export_tb_copy}_asin_btree RENAME TO {export_tb_target}_asin_btree;
                    ALTER INDEX {export_tb_target}_asin_btree_temp RENAME TO {export_tb_copy}_asin_btree;
                """
                print(f"sql_index_exchange: {sql_index_exchange}")
                conn.execute(sql_index_exchange)

            break
        except Exception as e:
            time.sleep(60)
            print("创建索引或者交换表名失败:", e)
            engine = DBUtil.get_db_engine(db_type, site_name)