ods_asin_detail.py 6.6 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
"""
   @Author      : HuangJian
   @Description : asin详情表-周表
   @SourceTable : us_asin_detail_2023_18
   @SinkTable   : ods_asin_detail
   @CreateTime  : 2022/05/18 14:55
   @UpdateTime  : 2022/05/18 14:55
"""

import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.common_util import DateTypes
from utils.hdfs_utils import HdfsUtils

if __name__ == '__main__':
    site_name = CommonUtil.get_sys_arg(1, None)
    date_type = CommonUtil.get_sys_arg(2, None)
    date_info = CommonUtil.get_sys_arg(3, None)
    assert site_name is not None, "site_name 不能为空!"
    assert date_type is not None, "date_type 不能为空!"
    assert date_info is not None, "date_info 不能为空!"

    hive_table = f"ods_asin_detail"
    partition_dict = {
        "site_name": site_name,
        "date_type": date_type,
        "date_info": date_info
    }

    # 落表路径校验
    hdfs_path = CommonUtil.build_hdfs_path(hive_table, partition_dict=partition_dict)
    print(f"hdfs_path is {hdfs_path}")

    # 日期拆分
    d1, d2 = CommonUtil.split_month_week_date(date_type, date_info)

    if date_type == DateTypes.week.name:
        # pg的分区周单位数是带0,如01、02、03
        d2 = f'0{d2}' if int(d2) < 10 else f'{d2}'
        # 这里主要是区分db链接
        if site_name == 'us' and date_info >= '2023-26':
            db_type = 'postgresql'
            if date_info >= '2023-34':
                db_type = 'postgresql_14'
            date_col = "launch_time,created_time as created_at,updated_time as updated_at"
            new_col = ',describe'
        else:
            db_type = 'postgresql_14'
            date_col = "launch_time,created_time as created_at,updated_time as updated_at"
            new_col = ',describe'

        print(f"同步连接的db_type:{db_type}")

        # 这里主要是区分新增字段
        # 18周新增字段weight_str
        if date_info >= '2023-18':
            new_col += ',weight_str'

        # 21周新增字段package_quantity、pattern_name
        if date_info >= '2023-21':
            new_col += ',package_quantity,pattern_name'

        # 49周新增字段follow_sellers
        if date_info >= '2023-49':
            new_col += ',follow_sellers'

        # 51周新增字段product_description,buy_sales
        if date_info >= '2023-51':
            new_col += ',product_description,buy_sales'

        # 2024-02周新增字段image_view
        if date_info >= '2024-02':
            new_col += ',image_view'

        # # 2024-05周新增字段product_json,product_detail_json,review_ai_text,review_label_json
        # if date_info >= '2024-05':
        #     new_col += ',product_json,product_detail_json,review_ai_text,review_label_json'
        import_table = f"{site_name}_asin_detail_{d1}_{d2}"

    if date_type == DateTypes.month.name or date_type == DateTypes.month_week.name:
        db_type = 'postgresql_14'
        date_col = "launch_time, created_time as created_at, updated_time as updated_at"
        new_col = "describe, weight_str, package_quantity, pattern_name, follow_sellers, product_description, buy_sales, image_view, spider_int, " \
                  "lob_asin_json, seller_json, customer_reviews_json, product_json, product_detail_json, review_ai_text, review_label_json, sp_initial_seen_asins_json, " \
                  "sp_4stars_initial_seen_asins_json, sp_delivery_initial_seen_asins_json, compare_similar_asin_json, together_asin_json, min_match_asin_json, " \
chenyuanjie committed
90
                  "variat_num, current_asin, img_list, variat_list, parent_asin, bundles_this_asins_json, video_m3u8_url"
chenyuanjie committed
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173
        d2 = f'0{d2}' if int(d2) < 10 else f'{d2}'
        import_table = f"{site_name}_asin_detail_month_{d1}_{d2}"

    sql_query = f"""
            select 
                id,
                asin,
                img_url,
                title,
                title_len,
                price,
                rating,
                total_comments,
                buy_box_seller_type,
                page_inventory,
                category,
                volume,
                weight,
                rank,
                {date_col},
                category_state,
                img_num,
                img_type,
                activity_type,
                one_two_val,
                three_four_val,
                five_six_val,
                eight_val,
                qa_num,
                one_star,
                two_star,
                three_star,
                four_star,
                five_star,
                low_star,
                together_asin,
                brand,
                ac_name,
                material,
                node_id,
                data_type,
                sp_num,
                {new_col} 
            from {import_table} 
            where 1=1 
            and \$CONDITIONS
        """

    # 进行schema和数据校验
    CommonUtil.check_schema_before_import(db_type=db_type,
                                              site_name=site_name,
                                              query=sql_query,
                                              hive_tb_name=hive_table,
                                              msg_usr=['chenyuanjie'],
                                              partition_dict=partition_dict)

    # 生成导出脚本
    import_sh = CommonUtil.build_import_sh(site_name=site_name,
                                           db_type=db_type,
                                           query=sql_query,
                                           hdfs_path=hdfs_path,
                                           map_num=50,
                                           key='id')
    # 导入前先删除原始hdfs数据
    HdfsUtils.delete_hdfs_file(hdfs_path)
    # 创建ssh Client对象--用于执行cmd命令
    client = SSHUtil.get_ssh_client()
    SSHUtil.exec_command_async(client, import_sh, ignore_err=False)
    # 创建lzo索引和修复元数据
    CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_table)
    # 关闭链接
    client.close()

    # 导入后检测--检测数据一致性
    if date_type != 'month_week':
        CommonUtil.check_import_sync_num(db_type=db_type,
                                         partition_dict=partition_dict,
                                         import_query=sql_query,
                                         hive_tb_name=hive_table,
                                         msg_usr=['chenyuanjie'])

    # 导入后验证--重点字段阈值预警
    CommonUtil.check_fields_and_warning(hive_tb_name=hive_table, partition_dict=partition_dict)
chenyuanjie committed
174