import os import sys sys.path.append(os.path.dirname(sys.path[0])) from utils.ssh_util import SSHUtil from utils.common_util import CommonUtil from utils.hdfs_utils import HdfsUtils from utils.db_util import DbTypes if __name__ == '__main__': site_name = CommonUtil.get_sys_arg(1, None) date_type = CommonUtil.get_sys_arg(2, None) date_info = CommonUtil.get_sys_arg(3, None) assert site_name is not None, "sitename 不能为空!" assert date_info is not None, "date_info 不能为空!" year = CommonUtil.reformat_date(date_info, "%Y-%m-%d", "%Y", ) hive_tb = "ods_self_asin_detail" partition_dict = { "site_name": site_name, "date_type": date_type, "date_info": date_info, } hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict=partition_dict) print(f"hdfs_path is {hdfs_path}") query = f""" select asin, img_url, title, title_len, price, rating, total_comments, buy_box_seller_type, page_inventory, category, volume, weight, rank, launch_time, video_url, add_url, material, created_at, img_num, img_type, qa_num, brand, ac_name, node_id, sp_num, mpn, online_time, describe, one_star, two_star, three_star, four_star, five_star, low_star, asin_type, is_coupon, search_category, weight_str, account_name, other_seller_name, account_id from {site_name}_self_asin_detail_{year} where 1 = 1 and site = '{site_name}' and bsr_date_info = '{date_info}' and date_info >= '{date_info}' and \$CONDITIONS """ print("sql ======================================================") print(query) db_type = DbTypes.postgresql.name empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type, site_name=site_name, query=query, hive_tb_name=hive_tb, msg_usr=['wujicang'] ) assert check_flag, f"导入hive表{hive_tb}表结构检查失败!请检查query是否异常!!" if not empty_flag: sh = CommonUtil.build_import_sh(site_name=site_name, db_type=db_type, query=query, hdfs_path=hdfs_path) # 导入前先删除 HdfsUtils.delete_hdfs_file(hdfs_path) client = SSHUtil.get_ssh_client() SSHUtil.exec_command_async(client, sh, ignore_err=False) CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb) pass