test_import.py 2.87 KB
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil


if __name__ == '__main__':
    site_name = CommonUtil.get_sys_arg(1, None)
    year_month = CommonUtil.get_sys_arg(2, None)
    assert site_name is not None, "site_name 不能为空!"
    assert year_month is not None, "year_month 不能为空!"

    year,month = year_month.split("-")

    hive_tb = "tmp_st_month_2110_2208"

    partition_dict = {
        "site_name": site_name,
        "year_month": year_month
    }

    hdfs_path = CommonUtil.build_hdfs_path(hive_tb,partition_dict)
    print(f"hdfs_path is {hdfs_path}")

    query = f"""
        select
            week,
            asin,
            search_term,
            ao_val,
            orders,
            orders_sum,
            flow,
            order_flow,
            search_num,
            search_rank,
            quantity_being_sold,
            adv_compet,
            zr_page_rank,
            zr_page,
            zr_page_row,
            sp_page,
            sp_page_rank,
            sp_page_row,
            sb1_page,
            sb2_page,
            sb3_page,
            ac_page,
            bs_page,
            er_page,
            tr_page,
            search_term_type,
            created_at,
            updated_at,
            id
        from {site_name}_st_month_{year}_{month}
        where 1 = 1
        and \$CONDITIONS
    """
    print(query)
    db_type = "mysql"
    empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type,
                                                                   site_name=site_name,
                                                                   query=query,
                                                                   hive_tb_name=hive_tb,
                                                                   msg_usr=['chenyuanjie']
                                                                   )

    if not empty_flag:
        sh = CommonUtil.build_import_sh_v2(site_name=site_name,
                                        db_type=db_type,
                                        query=query,
                                        hdfs_path=hdfs_path,
                                        )

        client = SSHUtil.get_ssh_client()
        SSHUtil.exec_command_async(client, sh, ignore_err=False)
        CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb)
        client.close()

        # 导入后检测--检测数据一致性
        CommonUtil.check_import_sync_num(db_type=db_type,
                                         partition_dict=partition_dict,
                                         import_query=query,
                                         hive_tb_name=hive_tb,
                                         msg_usr=['chenyuanjie']
                                         )

    pass