dwt_st_asin_reverse.py 9.06 KB
import os
import sys
import random
import string


sys.path.append(os.path.dirname(sys.path[0]))
from utils.db_util import DBUtil
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil,DateTypes
from utils.spark_util import SparkUtil
from datetime import datetime
from utils.templates import Templates


class JudgeCount(Templates):

    def __init__(self, site_name='us', date_type="month", date_info='2024-01'):
        super().__init__()
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        self.db_save = 'judge_count'
        # 初始化self.spark对
        self.spark = self.create_spark_object(
            app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}")

    def judge_count(self, count=1000):
        sql = f"select * from dwt_st_asin_reverse where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}' limit {count};"
        print(f"sql: {sql}")
        df = self.spark.sql(sql).cache()
        df.show(10)
        df_count = df.count()
        if df_count < count:
            raise Exception(f"{self.site_name}站点: 反查数量不对, 抛出异常, {df_count}")
        else:
            print(f"{self.site_name}站点: 反查数据量正常, {df_count}")

    def run(self):
        # self.judge_count()
        pass


if __name__ == '__main__':
    site_name = CommonUtil.get_sys_arg(1, None)
    date_type = CommonUtil.get_sys_arg(2, None)
    date_info = CommonUtil.get_sys_arg(3, None)
    #  获取最后一个参数
    last_flag = CommonUtil.get_sys_arg(len(sys.argv) - 1, None)
    cur_date = datetime.now().date()
    handle_obj = JudgeCount(site_name=site_name, date_type=date_type, date_info=date_info)
    handle_obj.run()  # 检验反查数量
    sql_date_type = date_type
    print(f"执行参数为{sys.argv}")
    db_type = 'postgresql_cluster'

    CommonUtil.judge_is_work_hours(site_name=site_name, date_type=date_type, date_info=date_info, principal='fangxingjun',
                                   priority=3, export_tools_type=1, belonging_to_process='反查搜索词')

    # 获取数据库连接
    engine = DBUtil.get_db_engine(db_type, site_name)
    suffix = str(date_info).replace("-", "_")

    export_cols = [
        "st_key",
        "search_term",
        "st_ao_val",
        "st_type",
        "st_rank",
        "st_search_num",
        "st_search_rate",
        "st_search_sum",
        "st_adv_counts",
        "st_quantity_being_sold",
        "asin",
        "asin_st_zr_orders",
        "asin_st_zr_orders_sum",
        "asin_st_zr_flow",
        "asin_st_sp_orders",
        "asin_st_sp_orders_sum",
        "asin_st_sp_flow",
        "st_asin_zr_page",
        "st_asin_zr_page_row",
        "st_asin_zr_page_rank",
        "st_asin_zr_updated_at",
        "st_asin_sp_page",
        "st_asin_sp_page_rank",
        "st_asin_sp_page_row",
        "st_asin_sp_updated_at",
        "st_asin_sb1_page",
        "st_asin_sb1_updated_at",
        "st_asin_sb2_page",
        "st_asin_sb2_updated_at",
        "st_asin_sb3_page",
        "st_asin_sb3_updated_at",
        "st_asin_ac_page",
        "st_asin_ac_updated_at",
        "st_asin_bs_page",
        "st_asin_bs_updated_at",
        "st_asin_er_page",
        "st_asin_er_updated_at",
        "st_asin_tr_page",
        "st_asin_tr_updated_at",
        "st_zr_current_page_asin_counts",
        "st_sp_current_page_asin_counts",
        "st_brand_label"
    ]

    # 4_week逻辑
    if date_type == "4_week" or date_type == DateTypes.month_week.name:
        export_tb_target = f"{site_name}_last_4_week_st"
        export_tb_copy = f"{site_name}_last_4_week_st_copy1"
        export_table = export_tb_copy
        sql = f"""
                       drop table if exists {export_tb_copy};
                       create table if not exists {export_tb_copy}
                      (
                          like {export_tb_target} including comments including defaults
                      );
                      truncate table {export_tb_copy};
                      SELECT create_distributed_table('{export_tb_copy}', 'asin');
                      """
        # 执行SQL语句
        DBUtil.engine_exec_sql(engine, sql)
        print("导出的字段:", export_cols)


    if date_type == DateTypes.month.name:
        # 处理导出表
        export_table = f"{site_name}_st_month_{suffix}"
        year_month_before = CommonUtil.get_month_offset(date_info, -1).replace("-", "_")
        export_master_table = export_table
        # export_tb_before = f"us_st_month_{year_month_before}"
        # 基础数据结构表
        export_tb_before = f"us_st_month_2023_base"
        sql = f"""
                drop table if exists {export_master_table};
                create table if not exists public.{export_master_table}
               (
                   like public.{export_tb_before} including comments including defaults
               );
               truncate table public.{export_master_table};
               SELECT create_distributed_table('{export_master_table}', 'asin');
               """
        # 执行SQL语句
        DBUtil.engine_exec_sql(engine, sql)
        print("导出的字段:", export_cols)

        # 关闭与pg的链接
        engine.dispose()

    if last_flag == "month_append":
        partition_dict = {
            "site_name": site_name,
            "date_type": "month",
            "date_info": date_info
        }
    else:
        partition_dict = {
            "site_name": site_name,
            "date_type": date_type,
            "date_info": date_info
        }

    # 导出执行sqoop的sh编写
    sh = CommonUtil.build_export_sh(
        site_name=site_name,
        db_type=db_type,
        hive_tb="dwt_st_asin_reverse",
        export_tb=export_table,
        col=export_cols,
        partition_dict=partition_dict
    )

    client = SSHUtil.get_ssh_client()
    SSHUtil.exec_command_async(client, sh, ignore_err=False)
    client.close()

    # 重新获取数据库连接
    engine = DBUtil.get_db_engine(db_type, site_name)


    # 导出后执行(构建索引、切换表名)
    salt_id = ''.join(random.sample(string.ascii_letters + string.digits, 8))
    sql_index_asin = f"""
            CREATE INDEX {site_name}_st_asin_{salt_id}_key ON public.{export_table} USING hash (asin);


    """
    # 执行SQL语句
    DBUtil.engine_exec_sql(engine, sql_index_asin)

    sql_index_zr_flow = f"""
        CREATE INDEX {site_name}_st_zr_{salt_id}_key ON public.{export_table} USING btree (asin_st_zr_flow);
    """
    DBUtil.engine_exec_sql(engine, sql_index_zr_flow)

    sql_index_search_term = f"""
             CREATE INDEX {site_name}_search_term_{salt_id}_key ON public.{export_table} USING btree (search_term);
        """
    DBUtil.engine_exec_sql(engine, sql_index_search_term)

    if date_type == '4_week' or date_type == DateTypes.month_week.name:
        # 交换表名
        #DBUtil.exchange_tb(engine, export_tb_copy, export_tb_target, cp_index_flag=False)
        sql_1 = f"""alter table {export_tb_target} rename to {export_tb_target}_back; """
        DBUtil.engine_exec_sql(engine, sql_1)
        sql_2 = f"""alter table {export_tb_copy} rename to {export_tb_target};"""
        DBUtil.engine_exec_sql(engine, sql_2)
        sql_3 = f"""alter table {export_tb_target}_back rename to {export_tb_copy};"""
        DBUtil.engine_exec_sql(engine, sql_3)





    # 往导出流程表插入导出完成数据,方便监听导出脚本是否全部完成
    if date_type == '4_week' or date_type == DateTypes.month.name:
        # us站点可以直接插入数据执行
        update_workflow_sql = f"""                
            REPLACE INTO selection.workflow_progress (site_name, page, table_name, date_type, date_info, status, status_val, is_end, over_date, calculate_responsible, exhibition_responsible, remark) 
            VALUES('{site_name}', '反查搜索词', '{export_table}', '{date_type}', '{date_info}', '导出pg集群完成', 6, '是', CURRENT_TIME,'fangxingjun@yswg.com.cn', 'wangfeng@yswg.com.cn', '反查搜索词')
        """
    if date_type == DateTypes.month_week.name or date_type == '4_week':
        sql_date_type = '30_day'
        if date_type == '4_week':
            cur_date = CommonUtil.get_calDay_by_dateInfo(SparkUtil.get_spark_session("get_4_week_date"), date_type, date_info)
        update_workflow_sql = f"""
            REPLACE INTO selection.workflow_progress (site_name, page, table_name, date_type, date_info, status, status_val, is_end, over_date, calculate_responsible, exhibition_responsible, remark) 
            VALUES('{site_name}', '反查搜索词', 'us_last_4_week_st', '{sql_date_type}', '{cur_date}', '导出pg集群完成', 6, '是', CURRENT_TIME,'fangxingjun@yswg.com.cn', 'wangfeng@yswg.com.cn', '反查搜索词')
        """
    print(update_workflow_sql)
    mysql_engine = DBUtil.get_db_engine('mysql', 'us')
    DBUtil.engine_exec_sql(mysql_engine, update_workflow_sql)
    # 关闭链接
    mysql_engine.dispose()
    engine.dispose()
    CommonUtil.modify_export_workflow_status(update_workflow_sql, site_name, date_type, date_info)