import os import sys sys.path.append(os.path.dirname(sys.path[0])) from utils.ssh_util import SSHUtil from utils.common_util import CommonUtil if __name__ == '__main__': site_name = CommonUtil.get_sys_arg(1, None) year_month = CommonUtil.get_sys_arg(2, None) assert site_name is not None, "site_name 不能为空!" assert year_month is not None, "year_month 不能为空!" year,month = year_month.split("-") hive_tb = "tmp_st_month_2209_2303" partition_dict = { "site_name": site_name, "year_month": year_month } hdfs_path = CommonUtil.build_hdfs_path(hive_tb,partition_dict) print(f"hdfs_path is {hdfs_path}") query = f""" select id, search_term, st_ao_val, st_type, st_rank, st_rank_avg, st_search_num, st_search_rate, st_search_sum, st_adv_counts, st_quantity_being_sold, asin, asin_st_zr_orders, asin_st_zr_orders_sum, asin_st_zr_flow, asin_st_sp_orders, asin_st_sp_orders_sum, asin_st_sp_flow, st_asin_zr_page, st_asin_zr_page_row, st_asin_zr_page_rank, st_asin_zr_updated_at, st_asin_sp_page, st_asin_sp_page_rank, st_asin_sp_page_row, st_asin_sp_updated_at, st_asin_sb1_page, st_asin_sb1_updated_at, st_asin_sb2_page, st_asin_sb2_updated_at, st_asin_sb3_page, st_asin_sb3_updated_at, st_asin_ac_page, st_asin_ac_updated_at, st_asin_bs_page, st_asin_bs_updated_at, st_asin_er_page, st_asin_er_updated_at, st_asin_tr_page, st_asin_tr_updated_at, created_at, updated_at from {site_name}_st_month_{year}_{month} where 1 = 1 and \$CONDITIONS """ print(query) db_type = "mysql" empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type, site_name=site_name, query=query, hive_tb_name=hive_tb, msg_usr=['chenyuanjie'] ) if not empty_flag: sh = CommonUtil.build_import_sh_v2(site_name=site_name, db_type=db_type, query=query, hdfs_path=hdfs_path, map_num=10, key="id" ) client = SSHUtil.get_ssh_client() SSHUtil.exec_command_async(client, sh, ignore_err=False) CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb) client.close() # 导入后检测--检测数据一致性 CommonUtil.check_import_sync_num(db_type=db_type, partition_dict=partition_dict, import_query=query, hive_tb_name=hive_tb, msg_usr=['chenyuanjie'] ) pass