asin_state.py 2.83 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil


if __name__ == '__main__':
    site_name = CommonUtil.get_sys_arg(1, None)
    assert site_name is not None, "site_name 不能为空!"

    # hive_tb = "tmp_asin_state"
    #
    # partition_dict = {
    #     "site_name": site_name
    # }
    #
    # hdfs_path = CommonUtil.build_hdfs_path(hive_tb , partition_dict)
    # print(f"hdfs_path is {hdfs_path}")
    #
    # query = f"""
    #     select
    #     asin,
    #     state,
    #     created_at,
    #     updated_at,
    #     3 as flag
    #     from us_all_syn_st_history_2022
    #     where 1 = 1
    #     and \$CONDITIONS
    # """
    # print(query)
    # db_type = "mysql"
    # empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type,
    #                                                                site_name=site_name,
    #                                                                query=query,
    #                                                                hive_tb_name=hive_tb,
    #                                                                msg_usr=['chenyuanjie']
    #                                                                )
    #
    # if not empty_flag:
    #     sh = CommonUtil.build_import_sh_v2(site_name=site_name,
    #                                     db_type=db_type,
    #                                     query=query,
    #                                     hdfs_path=hdfs_path,
    #                                     map_num=15,
    #                                     key="state"
    #                                     )
    #
    #     client = SSHUtil.get_ssh_client()
    #     SSHUtil.exec_command_async(client, sh, ignore_err=False)
    #     CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb)
    #     client.close()
    #
    #     # 导入后检测--检测数据一致性
    #     CommonUtil.check_import_sync_num(db_type=db_type,
    #                                      partition_dict=partition_dict,
    #                                      import_query=query,
    #                                      hive_tb_name=hive_tb,
    #                                      msg_usr=['chenyuanjie']
    #                                      )

    # 导出到pg数据库
    db_type = "postgresql"
    export_tb = "us_all_syn_st_asin"

    sh = CommonUtil.build_export_sh(
        site_name=site_name,
        db_type=db_type,
        hive_tb="tmp_asin_state_copy",
        export_tb=export_tb,
        col=[
            "asin",
            "state"
        ],
        partition_dict={
            "site_name": site_name
        }
    )
    client = SSHUtil.get_ssh_client()
    SSHUtil.exec_command_async(client, sh, ignore_err=False)
    client.close()

    pass