ods_asin_variat.py 2.61 KB
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils

if __name__ == '__main__':
    site_name = CommonUtil.get_sys_arg(1, None)
    assert site_name is not None, "site_name 不能为空!"

    hive_tb = "ods_asin_variat"
    db_type = "mysql"
    partition_dict = {
        "site_name": site_name
    }
    hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict=partition_dict)
    print(f"hdfs_path is {hdfs_path}")

    import_table = f"{site_name}_variat"

    query = f"""
        select 
        id,
        asin,
        parent_asin,
        color,
        `size`,
        `style`,
        `state`,
        column_2,
        column_1,
        created_at as created_time,
        updated_at as updated_time
        from {import_table}
        where 1 = 1
        and \$CONDITIONS
    """

    empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type,
                                                                   site_name=site_name,
                                                                   query=query,
                                                                   hive_tb_name=hive_tb,
                                                                   msg_usr=['chenyuanjie']
                                                                   )
    assert check_flag, f"导入hive表{hive_tb}表结构检查失败!请检查query是否异常!!"

    if not empty_flag:
        sh = CommonUtil.build_import_sh(site_name=site_name,
                                        db_type=db_type,
                                        query=query,
                                        hdfs_path=hdfs_path,
                                        map_num=50,
                                        key='id'
                                        )
        # 导入前先删除
        HdfsUtils.delete_hdfs_file(hdfs_path)
        client = SSHUtil.get_ssh_client()
        SSHUtil.exec_command_async(client, sh, ignore_err=False)
        CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb)
        client.close()

        # # 导入后检测--检测数据一致性
        # CommonUtil.check_import_sync_num(db_type=db_type,
        #                                  partition_dict=partition_dict,
        #                                  import_query=query,
        #                                  hive_tb_name=hive_tb,
        #                                  msg_usr=['chenyuanjie','wangrui4','chenjianyun']
        #                                  )

    pass