ods_adv_keyword.py 3.22 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils

if __name__ == '__main__':
    site_name = CommonUtil.get_sys_arg(1, None)
    date_type = CommonUtil.get_sys_arg(2, None)
    date_info = CommonUtil.get_sys_arg(3, None)
    assert site_name is not None, "site_name 不能为空!"
    assert date_type is not None, "date_type 不能为空!"
    assert date_info is not None, "date_info 不能为空!"

    hive_tb = "ods_adv_keyword"

    cols = "IF(created_at,created_at,'9999-99-99') created_at," \
           "IF(updated_at,updated_at,'9999-99-99') updated_at," \
           "IF(synchronised_time,synchronised_time,'9999-99-99') synchronised_time"

    if site_name == "us":
        db_type = "adv_us"
        cols = "IF(created_at,created_at,'9999-99-99') created_at," \
               "IF(updated_at,updated_at,'9999-99-99') updated_at"
    elif site_name == "uk":
        db_type = "adv_uk"
    else:
        db_type = "adv_other"

    query = f"""
        select 
            keywordid,
            campaignid,
            adgroupid,
            state,
            keywordtext,
            matchtype,
            bid,
            site,
            account,
            servingstatus,
            {cols}
        from {site_name}_keyword
        where 1 = 1
        and \$CONDITIONS
    """

    partition_dict = {
        "site_name": site_name,
        "date_type": date_type,
        "date_info": date_info
    }
    hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict=partition_dict)
    print(f"hdfs_path is {hdfs_path}")

    empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type,
                                                                   site_name=site_name,
                                                                   query=query,
                                                                   hive_tb_name=hive_tb,
                                                                   msg_usr=['chenyuanjie'],
                                                                   partition_dict=partition_dict
                                                                   )
    assert check_flag, f"导入hive表{hive_tb}表结构检查失败!请检查query是否异常!!"

    if not empty_flag:
        sh = CommonUtil.build_import_sh(site_name=site_name,
                                        db_type=db_type,
                                        query=query,
                                        hdfs_path=hdfs_path)
        # 导入前先删除
        HdfsUtils.delete_hdfs_file(hdfs_path)
        client = SSHUtil.get_ssh_client()
        SSHUtil.exec_command_async(client, sh, ignore_err=False)
        CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb)
        client.close()

        # 导入后检测--检测数据一致性
        CommonUtil.check_import_sync_num(db_type=db_type,
                                         partition_dict=partition_dict,
                                         import_query=query,
                                         hive_tb_name=hive_tb,
                                         msg_usr=['chenyuanjie'])

    pass