ods_self_asin_detail.py 3.04 KB
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
from utils.db_util import DbTypes

if __name__ == '__main__':
    site_name = CommonUtil.get_sys_arg(1, None)
    date_type = CommonUtil.get_sys_arg(2, None)
    date_info = CommonUtil.get_sys_arg(3, None)
    assert site_name is not None, "sitename 不能为空!"
    assert date_info is not None, "date_info 不能为空!"
    year = CommonUtil.reformat_date(date_info, "%Y-%m-%d", "%Y", )
    hive_tb = "ods_self_asin_detail"
    partition_dict = {
        "site_name": site_name,
        "date_type": date_type,
        "date_info": date_info,
    }
    hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict=partition_dict)
    print(f"hdfs_path is {hdfs_path}")

    query = f"""
        select
        asin,
        img_url,
        title,
        title_len,
        price,
        rating,
        total_comments,
        buy_box_seller_type,
        page_inventory,
        category,
        volume,
        weight,
        rank,
        launch_time,
        video_url,
        add_url,
        material,
        created_at,
        img_num,
        img_type,
        qa_num,
        brand,
        ac_name,
        node_id,
        sp_num,
        mpn,
        online_time,
        describe,
        one_star,
        two_star,
        three_star,
        four_star,
        five_star,
        low_star,
        asin_type,
        is_coupon,
        search_category,
        weight_str,
        account_name,
        other_seller_name,
        account_id
        from {site_name}_self_asin_detail_{year}
        where 1 = 1
        and site = '{site_name}'
        and bsr_date_info = '{date_info}'
        and date_info >= '{date_info}'
        and \$CONDITIONS
"""
    print("sql ======================================================")
    print(query)
    db_type = DbTypes.postgresql.name
    empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type,
                                                                   site_name=site_name,
                                                                   query=query,
                                                                   hive_tb_name=hive_tb,
                                                                   msg_usr=['wujicang']
                                                                   )
    assert check_flag, f"导入hive表{hive_tb}表结构检查失败!请检查query是否异常!!"

    if not empty_flag:
        sh = CommonUtil.build_import_sh(site_name=site_name,
                                        db_type=db_type,
                                        query=query,
                                        hdfs_path=hdfs_path)
        # 导入前先删除
        HdfsUtils.delete_hdfs_file(hdfs_path)
        client = SSHUtil.get_ssh_client()
        SSHUtil.exec_command_async(client, sh, ignore_err=False)
        CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb)
    pass