import os import sys import random import string sys.path.append(os.path.dirname(sys.path[0])) from utils.db_util import DBUtil from utils.ssh_util import SSHUtil from utils.common_util import CommonUtil,DateTypes from utils.spark_util import SparkUtil from datetime import datetime from utils.templates import Templates class JudgeCount(Templates): def __init__(self, site_name='us', date_type="month", date_info='2024-01'): super().__init__() self.site_name = site_name self.date_type = date_type self.date_info = date_info self.db_save = 'judge_count' # 初始化self.spark对 self.spark = self.create_spark_object( app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}") def judge_count(self, count=1000): sql = f"select * from dwt_st_asin_reverse where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}' limit {count};" print(f"sql: {sql}") df = self.spark.sql(sql).cache() df.show(10) df_count = df.count() if df_count < count: raise Exception(f"{self.site_name}站点: 反查数量不对, 抛出异常, {df_count}") else: print(f"{self.site_name}站点: 反查数据量正常, {df_count}") def run(self): # self.judge_count() pass if __name__ == '__main__': site_name = CommonUtil.get_sys_arg(1, None) date_type = CommonUtil.get_sys_arg(2, None) date_info = CommonUtil.get_sys_arg(3, None) # 获取最后一个参数 last_flag = CommonUtil.get_sys_arg(len(sys.argv) - 1, None) cur_date = datetime.now().date() handle_obj = JudgeCount(site_name=site_name, date_type=date_type, date_info=date_info) handle_obj.run() # 检验反查数量 sql_date_type = date_type print(f"执行参数为{sys.argv}") db_type = 'postgresql_cluster' CommonUtil.judge_is_work_hours(site_name=site_name, date_type=date_type, date_info=date_info, principal='fangxingjun', priority=3, export_tools_type=1, belonging_to_process='反查搜索词') # 获取数据库连接 engine = DBUtil.get_db_engine(db_type, site_name) suffix = str(date_info).replace("-", "_") export_cols = [ "st_key", "search_term", "st_ao_val", "st_type", "st_rank", "st_search_num", "st_search_rate", "st_search_sum", "st_adv_counts", "st_quantity_being_sold", "asin", "asin_st_zr_orders", "asin_st_zr_orders_sum", "asin_st_zr_flow", "asin_st_sp_orders", "asin_st_sp_orders_sum", "asin_st_sp_flow", "st_asin_zr_page", "st_asin_zr_page_row", "st_asin_zr_page_rank", "st_asin_zr_updated_at", "st_asin_sp_page", "st_asin_sp_page_rank", "st_asin_sp_page_row", "st_asin_sp_updated_at", "st_asin_sb1_page", "st_asin_sb1_updated_at", "st_asin_sb2_page", "st_asin_sb2_updated_at", "st_asin_sb3_page", "st_asin_sb3_updated_at", "st_asin_ac_page", "st_asin_ac_updated_at", "st_asin_bs_page", "st_asin_bs_updated_at", "st_asin_er_page", "st_asin_er_updated_at", "st_asin_tr_page", "st_asin_tr_updated_at", "st_zr_current_page_asin_counts", "st_sp_current_page_asin_counts", "st_brand_label" ] # 4_week逻辑 if date_type == "4_week" or date_type == DateTypes.month_week.name: export_tb_target = f"{site_name}_last_4_week_st" export_tb_copy = f"{site_name}_last_4_week_st_copy1" export_table = export_tb_copy sql = f""" drop table if exists {export_tb_copy}; create table if not exists {export_tb_copy} ( like {export_tb_target} including comments including defaults ); truncate table {export_tb_copy}; SELECT create_distributed_table('{export_tb_copy}', 'asin'); """ # 执行SQL语句 DBUtil.engine_exec_sql(engine, sql) print("导出的字段:", export_cols) if date_type == DateTypes.month.name: # 处理导出表 export_table = f"{site_name}_st_month_{suffix}" year_month_before = CommonUtil.get_month_offset(date_info, -1).replace("-", "_") export_master_table = export_table # export_tb_before = f"us_st_month_{year_month_before}" # 基础数据结构表 export_tb_before = f"us_st_month_2023_base" sql = f""" drop table if exists {export_master_table}; create table if not exists public.{export_master_table} ( like public.{export_tb_before} including comments including defaults ); truncate table public.{export_master_table}; SELECT create_distributed_table('{export_master_table}', 'asin'); """ # 执行SQL语句 DBUtil.engine_exec_sql(engine, sql) print("导出的字段:", export_cols) # 关闭与pg的链接 engine.dispose() if last_flag == "month_append": partition_dict = { "site_name": site_name, "date_type": "month", "date_info": date_info } else: partition_dict = { "site_name": site_name, "date_type": date_type, "date_info": date_info } # 导出执行sqoop的sh编写 sh = CommonUtil.build_export_sh( site_name=site_name, db_type=db_type, hive_tb="dwt_st_asin_reverse", export_tb=export_table, col=export_cols, partition_dict=partition_dict ) client = SSHUtil.get_ssh_client() SSHUtil.exec_command_async(client, sh, ignore_err=False) client.close() # 重新获取数据库连接 engine = DBUtil.get_db_engine(db_type, site_name) # 导出后执行(构建索引、切换表名) salt_id = ''.join(random.sample(string.ascii_letters + string.digits, 8)) sql_index_asin = f""" CREATE INDEX {site_name}_st_asin_{salt_id}_key ON public.{export_table} USING hash (asin); """ # 执行SQL语句 DBUtil.engine_exec_sql(engine, sql_index_asin) sql_index_zr_flow = f""" CREATE INDEX {site_name}_st_zr_{salt_id}_key ON public.{export_table} USING btree (asin_st_zr_flow); """ DBUtil.engine_exec_sql(engine, sql_index_zr_flow) sql_index_search_term = f""" CREATE INDEX {site_name}_search_term_{salt_id}_key ON public.{export_table} USING btree (search_term); """ DBUtil.engine_exec_sql(engine, sql_index_search_term) if date_type == '4_week' or date_type == DateTypes.month_week.name: # 交换表名 #DBUtil.exchange_tb(engine, export_tb_copy, export_tb_target, cp_index_flag=False) sql_1 = f"""alter table {export_tb_target} rename to {export_tb_target}_back; """ DBUtil.engine_exec_sql(engine, sql_1) sql_2 = f"""alter table {export_tb_copy} rename to {export_tb_target};""" DBUtil.engine_exec_sql(engine, sql_2) sql_3 = f"""alter table {export_tb_target}_back rename to {export_tb_copy};""" DBUtil.engine_exec_sql(engine, sql_3) # 往导出流程表插入导出完成数据,方便监听导出脚本是否全部完成 if date_type == '4_week' or date_type == DateTypes.month.name: # us站点可以直接插入数据执行 update_workflow_sql = f""" REPLACE INTO selection.workflow_progress (site_name, page, table_name, date_type, date_info, status, status_val, is_end, over_date, calculate_responsible, exhibition_responsible, remark) VALUES('{site_name}', '反查搜索词', '{export_table}', '{date_type}', '{date_info}', '导出pg集群完成', 6, '是', CURRENT_TIME,'fangxingjun@yswg.com.cn', 'wangfeng@yswg.com.cn', '反查搜索词') """ if date_type == DateTypes.month_week.name or date_type == '4_week': sql_date_type = '30_day' if date_type == '4_week': cur_date = CommonUtil.get_calDay_by_dateInfo(SparkUtil.get_spark_session("get_4_week_date"), date_type, date_info) update_workflow_sql = f""" REPLACE INTO selection.workflow_progress (site_name, page, table_name, date_type, date_info, status, status_val, is_end, over_date, calculate_responsible, exhibition_responsible, remark) VALUES('{site_name}', '反查搜索词', 'us_last_4_week_st', '{sql_date_type}', '{cur_date}', '导出pg集群完成', 6, '是', CURRENT_TIME,'fangxingjun@yswg.com.cn', 'wangfeng@yswg.com.cn', '反查搜索词') """ print(update_workflow_sql) mysql_engine = DBUtil.get_db_engine('mysql', 'us') DBUtil.engine_exec_sql(mysql_engine, update_workflow_sql) # 关闭链接 mysql_engine.dispose() engine.dispose() CommonUtil.modify_export_workflow_status(update_workflow_sql, site_name, date_type, date_info)