""" @Author : HuangJian @Description : 各站点asin重量解析表 @SourceTable : us_asin_weight_2023 @SinkTable : ods_asin_weight @CreateTime : 2022/05/23 09:55 @UpdateTime : 2022/05/23 09:55 """ import os import sys sys.path.append(os.path.dirname(sys.path[0])) from utils.ssh_util import SSHUtil from utils.db_util import DBUtil from utils.common_util import CommonUtil from utils.common_util import DateTypes from utils.hdfs_utils import HdfsUtils from utils.spark_util import SparkUtil if __name__ == '__main__': site_name = CommonUtil.get_sys_arg(1, None) date_type = CommonUtil.get_sys_arg(2, None) date_info = CommonUtil.get_sys_arg(3, None) assert site_name is not None, "site_name 不能为空!" assert date_type is not None, "date_type 不能为空!" assert date_info is not None, "date_info 不能为空!" # 该表现在为月同步表,因此增加月类型校验 assert date_type == DateTypes.week.name, "date_type类型不对,应为week" # 日期拆分 d1, d2 = CommonUtil.split_month_week_date(date_type, date_info) d2_str = f'0{d2}' if int(d2) < 10 else f'{d2}' # 此表在pg库 import_table = f"{site_name}_asin_weight_{d1}_{d2_str}" db_type = 'postgresql' # 创建ssh Client对象--用于执行cmd命令 client = SSHUtil.get_ssh_client() # 清洗脚本sh clean_weight_sh = f"/opt/module/anaconda3/envs/pyspark/bin/python /opt/module/spark-3.2.0-bin-hadoop3.2/demo/py_demo/clean/clean_weight.py {site_name} {d1} {d2}" # 执行清洗脚本 SSHUtil.exec_command_async(client, clean_weight_sh, ignore_err=False) hive_table = f"ods_asin_weight" partition_dict = { "site_name": site_name, "date_type": date_type, "date_info": date_info } # 落表路径校验 hdfs_path = CommonUtil.build_hdfs_path(hive_table, partition_dict=partition_dict) print(f"hdfs_path is {hdfs_path}") sql_query = f""" select id, asin, weight_str, weight, weight_type, created_at, updated_at from {import_table} where 1=1 and \$CONDITIONS """ # 进行schema和数据校验 if site_name not in ('fr', 'it', 'es'): CommonUtil.check_schema_before_import(db_type=db_type, site_name=site_name, query=sql_query, hive_tb_name=hive_table, msg_usr=['chenyuanjie']) # 生成导出脚本 import_sh = CommonUtil.build_import_sh(site_name=site_name, db_type=db_type, query=sql_query, hdfs_path=hdfs_path) # 导入前先删除原始hdfs数据 HdfsUtils.delete_hdfs_file(hdfs_path) SSHUtil.exec_command_async(client, import_sh, ignore_err=False) # 创建lzo索引和修复元数据 CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_table) # 关闭链接 client.close()