us_asin_image_pg14.py 2.91 KB
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
from utils.db_util import DBUtil


if __name__ == '__main__':
    site_name = "us"

    hive_tb = f"tmp_asin_image"

    partition_dict = {
        "site_name": "us14",
    }
    hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict=partition_dict)
    print(f"hdfs_path is {hdfs_path}")

    query = f"""
        select
            asin,
            img_url,
            img_order_by,
            created_at,
            updated_at,
            data_type
        from {site_name}_asin_image_pyb_copy
        where 1 = 1
        and \$CONDITIONS
    """
    print(query)
    db_type = "postgresql_14"
    empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type,
                                                                   site_name=site_name,
                                                                   query=query,
                                                                   hive_tb_name=hive_tb,
                                                                   msg_usr=['chenyuanjie']
                                                                   )

    if not empty_flag:
        sh = CommonUtil.build_import_sh(site_name=site_name,
                                        db_type=db_type,
                                        query=query,
                                        hdfs_path=hdfs_path,
                                        map_num=10,
                                        key='id')

        client = SSHUtil.get_ssh_client()
        SSHUtil.exec_command_async(client, sh, ignore_err=False)
        CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb)
        client.close()

        #导入后检测--检测数据一致性
        CommonUtil.check_import_sync_num(db_type=db_type,
                                         partition_dict=partition_dict,
                                         import_query=query,
                                         hive_tb_name=hive_tb,
                                         msg_usr=['chenyuanjie']
                                         )


    # # 导出到pg数据库
    # db_type = "postgresql"
    # export_tb = f"{site_name}_asin_image_copy"
    #
    # # 导出表名
    # sh = CommonUtil.build_export_sh(
    #     site_name=site_name,
    #     db_type=db_type,
    #     hive_tb="tmp_asin_image_copy",
    #     export_tb=export_tb,
    #     col=[
    #         "asin",
    #         "img_url",
    #         "img_order_by",
    #         "created_at",
    #         "updated_at",
    #         "data_type"
    #     ],
    #     partition_dict={
    #         "site_name": site_name
    #     }
    # )
    # client = SSHUtil.get_ssh_client()
    # SSHUtil.exec_command_async(client, sh, ignore_err=False)
    # client.close()

    pass