import os import sys sys.path.append(os.path.dirname(sys.path[0])) from utils.ssh_util import SSHUtil from utils.common_util import CommonUtil if __name__ == '__main__': site_name = CommonUtil.get_sys_arg(1, None) assert site_name is not None, "site_name 不能为空!" # hive_tb = "tmp_asin_state" # # partition_dict = { # "site_name": site_name # } # # hdfs_path = CommonUtil.build_hdfs_path(hive_tb , partition_dict) # print(f"hdfs_path is {hdfs_path}") # # query = f""" # select # asin, # state, # created_at, # updated_at, # 3 as flag # from us_all_syn_st_history_2022 # where 1 = 1 # and \$CONDITIONS # """ # print(query) # db_type = "mysql" # empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type, # site_name=site_name, # query=query, # hive_tb_name=hive_tb, # msg_usr=['chenyuanjie'] # ) # # if not empty_flag: # sh = CommonUtil.build_import_sh_v2(site_name=site_name, # db_type=db_type, # query=query, # hdfs_path=hdfs_path, # map_num=15, # key="state" # ) # # client = SSHUtil.get_ssh_client() # SSHUtil.exec_command_async(client, sh, ignore_err=False) # CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb) # client.close() # # # 导入后检测--检测数据一致性 # CommonUtil.check_import_sync_num(db_type=db_type, # partition_dict=partition_dict, # import_query=query, # hive_tb_name=hive_tb, # msg_usr=['chenyuanjie'] # ) # 导出到pg数据库 db_type = "postgresql" export_tb = "us_all_syn_st_asin" sh = CommonUtil.build_export_sh( site_name=site_name, db_type=db_type, hive_tb="tmp_asin_state_copy", export_tb=export_tb, col=[ "asin", "state" ], partition_dict={ "site_name": site_name } ) client = SSHUtil.get_ssh_client() SSHUtil.exec_command_async(client, sh, ignore_err=False) client.close() pass