""" @Author : HuangJian @Description : asin详情表-周表 @SourceTable : us_asin_detail_2023_18 @SinkTable : ods_asin_detail @CreateTime : 2022/05/18 14:55 @UpdateTime : 2022/05/18 14:55 """ import os import sys sys.path.append(os.path.dirname(sys.path[0])) from utils.ssh_util import SSHUtil from utils.common_util import CommonUtil from utils.common_util import DateTypes from utils.hdfs_utils import HdfsUtils if __name__ == '__main__': site_name = CommonUtil.get_sys_arg(1, None) date_type = CommonUtil.get_sys_arg(2, None) date_info = CommonUtil.get_sys_arg(3, None) assert site_name is not None, "site_name 不能为空!" assert date_type is not None, "date_type 不能为空!" assert date_info is not None, "date_info 不能为空!" hive_table = f"ods_asin_detail" partition_dict = { "site_name": site_name, "date_type": date_type, "date_info": date_info } # 落表路径校验 hdfs_path = CommonUtil.build_hdfs_path(hive_table, partition_dict=partition_dict) print(f"hdfs_path is {hdfs_path}") # 日期拆分 d1, d2 = CommonUtil.split_month_week_date(date_type, date_info) if date_type == DateTypes.week.name: # pg的分区周单位数是带0,如01、02、03 d2 = f'0{d2}' if int(d2) < 10 else f'{d2}' # 这里主要是区分db链接 if site_name == 'us' and date_info >= '2023-26': db_type = 'postgresql' if date_info >= '2023-34': db_type = 'postgresql_14' date_col = "launch_time,created_time as created_at,updated_time as updated_at" new_col = ',describe' else: db_type = 'postgresql_14' date_col = "launch_time,created_time as created_at,updated_time as updated_at" new_col = ',describe' print(f"同步连接的db_type:{db_type}") # 这里主要是区分新增字段 # 18周新增字段weight_str if date_info >= '2023-18': new_col += ',weight_str' # 21周新增字段package_quantity、pattern_name if date_info >= '2023-21': new_col += ',package_quantity,pattern_name' # 49周新增字段follow_sellers if date_info >= '2023-49': new_col += ',follow_sellers' # 51周新增字段product_description,buy_sales if date_info >= '2023-51': new_col += ',product_description,buy_sales' # 2024-02周新增字段image_view if date_info >= '2024-02': new_col += ',image_view' # # 2024-05周新增字段product_json,product_detail_json,review_ai_text,review_label_json # if date_info >= '2024-05': # new_col += ',product_json,product_detail_json,review_ai_text,review_label_json' import_table = f"{site_name}_asin_detail_{d1}_{d2}" if date_type == DateTypes.month.name or date_type == DateTypes.month_week.name: db_type = 'postgresql_14' date_col = "launch_time, created_time as created_at, updated_time as updated_at" new_col = "describe, weight_str, package_quantity, pattern_name, follow_sellers, product_description, buy_sales, image_view, spider_int, " \ "lob_asin_json, seller_json, customer_reviews_json, product_json, product_detail_json, review_ai_text, review_label_json, sp_initial_seen_asins_json, " \ "sp_4stars_initial_seen_asins_json, sp_delivery_initial_seen_asins_json, compare_similar_asin_json, together_asin_json, min_match_asin_json, " \ "variat_num, current_asin, img_list, variat_list, parent_asin, bundles_this_asins_json, video_m3u8_url" d2 = f'0{d2}' if int(d2) < 10 else f'{d2}' import_table = f"{site_name}_asin_detail_month_{d1}_{d2}" sql_query = f""" select id, asin, img_url, title, title_len, price, rating, total_comments, buy_box_seller_type, page_inventory, category, volume, weight, rank, {date_col}, category_state, img_num, img_type, activity_type, one_two_val, three_four_val, five_six_val, eight_val, qa_num, one_star, two_star, three_star, four_star, five_star, low_star, together_asin, brand, ac_name, material, node_id, data_type, sp_num, {new_col} from {import_table} where 1=1 and \$CONDITIONS """ # 进行schema和数据校验 CommonUtil.check_schema_before_import(db_type=db_type, site_name=site_name, query=sql_query, hive_tb_name=hive_table, msg_usr=['chenyuanjie'], partition_dict=partition_dict) # 生成导出脚本 import_sh = CommonUtil.build_import_sh(site_name=site_name, db_type=db_type, query=sql_query, hdfs_path=hdfs_path, map_num=50, key='id') # 导入前先删除原始hdfs数据 HdfsUtils.delete_hdfs_file(hdfs_path) # 创建ssh Client对象--用于执行cmd命令 client = SSHUtil.get_ssh_client() SSHUtil.exec_command_async(client, import_sh, ignore_err=False) # 创建lzo索引和修复元数据 CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_table) # 关闭链接 client.close() # 导入后检测--检测数据一致性 if date_type != 'month_week': CommonUtil.check_import_sync_num(db_type=db_type, partition_dict=partition_dict, import_query=sql_query, hive_tb_name=hive_table, msg_usr=['chenyuanjie']) # 导入后验证--重点字段阈值预警 CommonUtil.check_fields_and_warning(hive_tb_name=hive_table, partition_dict=partition_dict)