dwd_buyer_st_pg.py 3.47 KB
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil, DateTypes
from utils.db_util import DBUtil
from sqlalchemy import text
from datetime import datetime


if __name__ == '__main__':
    site_name = CommonUtil.get_sys_arg(1, None)
    date_type = CommonUtil.get_sys_arg(2, None)
    date_info = CommonUtil.get_sys_arg(3, None)

    test_flag = CommonUtil.get_sys_arg(len(sys.argv) - 1, None)

    if test_flag == 'test':
        db_type = 'postgresql_test'
        print("导出到测试库中")
    else:

        CommonUtil.judge_is_work_hours(site_name=site_name, date_type=date_type, date_info=date_info,
                                       principal='wangrui4', priority=2, export_tools_type=1, belonging_to_process='买家搜索词')
        db_type = 'postgresql_cluster'
        print("导出到PG-Cluster库中")
    year_str = CommonUtil.safeIndex(date_info.split("-"), 0, None)
    export_master_tb = f"{site_name}_buyer_st"
    export_tb = f"{site_name}_buyer_st_{year_str}"

    # 获取数据库连接
    mysql_engine = DBUtil.get_db_engine('mysql', 'us')
    pg_clu_engine = DBUtil.get_db_engine(db_type, site_name)

    start_day_sql = ''
    end_day_sql = ''
    if date_type == 'week':
        start_day_sql = text(f"""
                select date from date_20_to_30 
                where year_week='{date_info}' and week_day=1;
            """)
        end_day_sql = text(f"""
                SELECT date from date_20_to_30 
                where year_week='{date_info}' and week_day=7;
            """)
    elif date_type == 'month_week':
        start_day_sql = text(f"""
                    SELECT STR_TO_DATE('{date_info}-01', '%Y-%m-%d') AS date;
                """)
        end_day_sql = text(f"""
                    SELECT LAST_DAY(STR_TO_DATE('{date_info}-01', '%Y-%m-%d')) AS date;
                 """)
    with mysql_engine.connect() as connection:
        start_day = (connection.execute(start_day_sql).first())['date']
        end_day = (connection.execute(end_day_sql).first())['date']
        connection.close()
        start_day = str(start_day).replace("-", "")
        end_day = str(end_day).replace("-", "")
    print(f"{date_info}的月初是:{start_day},月末是:{end_day}")
    if date_type == 'us':
        sql = f"""
            create table if not exists {export_tb} (
                like {export_master_tb} including comments
            );
            delete from {export_tb} where report_date >= '{start_day}' and report_date <= '{end_day}'
        """
    else:
        sql = f"""
                  create table if not exists {export_tb} (
                      like {export_master_tb} including comments
                  );
                  delete from {export_tb} where report_date > '{start_day}' and report_date <= '{end_day}'
              """

    DBUtil.engine_exec_sql(pg_clu_engine, sql)

    # 导出表名
    sh = CommonUtil.build_export_sh(
        site_name=site_name,
        db_type=db_type,
        hive_tb="dwd_buyer_st",
        export_tb=export_tb,
        col=[
            "search_term",
            "report_date",
            "sales",
            "orders",
            "clicks"
        ],
        partition_dict={
            "site_name": site_name,
            "date_type": date_type,
            "date_info": date_info
        }
    )

    client = SSHUtil.get_ssh_client()
    SSHUtil.exec_command_async(client, sh, ignore_err=False)
    client.close()


    print("success")