import os import sys sys.path.append(os.path.dirname(sys.path[0])) from utils.db_util import DBUtil, DbTypes from utils.ssh_util import SSHUtil from utils.common_util import CommonUtil from utils.hdfs_utils import HdfsUtils def export_detail(engine, type, site_name, day): suffix = CommonUtil.reformat_date(day, "%Y-%m-%d", "%Y_%m") export_flag = len(list(DBUtil.engine_exec_sql(engine, f"select * from time_partitions where partition::varchar like '{site_name}_{type}_asin_detail_{suffix}';"))) > 0 # 没有导出的导出 if not export_flag: print(f"{type}:{site_name}:{day} asin_detail 导出中") os.system( f"/opt/module/anaconda3/envs/pyspark/bin/python3.8 /tmp/wjc_py/sqoop_export/dwt_bsr_nsr_asin_detail_new.py {site_name} {day} {type}") pass pass if __name__ == '__main__': engine = DBUtil.get_db_engine(DbTypes.postgresql_cluster.name, "us") for type in ['bsr', 'nsr']: for site_name in ['us']: parts = HdfsUtils.read_list(f"/home/big_data_selection/dwd/dwd_{type}_asin_rank/site_name={site_name}/date_type=last30day") existDays = list(map(lambda item: item[item.find("=") + 1:], parts)) existDays.reverse() for day in existDays: if day <= '2023-02-01': continue # 判断是否已导出 suffix = CommonUtil.reformat_date(day, "%Y-%m-%d", "%Y_%m_%d", ) export_flag = len(list(DBUtil.engine_exec_sql(engine, f"select * from time_partitions where partition::varchar like '{site_name}_{type}_asin_rank_{suffix}';"))) > 0 # 没有导出的导出 if not export_flag: print(f"{type}:{site_name}:{day} asin_rank 导出中") os.system( f"/opt/module/anaconda3/envs/pyspark/bin/python3.8 /tmp/wjc_py/sqoop_export/dwd_bsr_nsr_asin_rank_day.py {site_name} {day} {type}") pass # 导出detail if day.endswith("-01"): export_detail(engine, type, site_name, day) pass update_sql = f""" insert ignore into workflow_everyday (site_name, report_date, status, status_val, table_name, date_type, page, is_end, remark, export_db_type,freeze_flag) values ('{site_name}', '{day}', '导出PG数据库', 14, '{site_name}_{type}_asin_rank', 'day', '{str(type).upper()}榜单', '是', '{str(type).upper()}榜单对应的TOP100ASIN','postgresql_cluster','enable') """ print(update_sql) DBUtil.exec_sql(DbTypes.mysql.name, "us", update_sql, dispose_flag=True) update_sql = f""" update workflow_everyday set export_db_type ='postgresql_cluster', freeze_flag='enable' where table_name = '{site_name}_{type}_asin_rank' and report_date in ('{day}'); """ DBUtil.exec_sql(DbTypes.mysql.name, "us", update_sql, dispose_flag=True) print(update_sql) print(f"{type}:{site_name}:{day} success") pass pass