dwd_buyer_st_pg.py 3.48 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil, DateTypes
from utils.db_util import DBUtil
from sqlalchemy import text
from datetime import datetime


if __name__ == '__main__':
    site_name = CommonUtil.get_sys_arg(1, None)
    date_type = CommonUtil.get_sys_arg(2, None)
    date_info = CommonUtil.get_sys_arg(3, None)

    test_flag = CommonUtil.get_sys_arg(len(sys.argv) - 1, None)

    if test_flag == 'test':
        db_type = 'postgresql_test'
        print("导出到测试库中")
    else:

        CommonUtil.judge_is_work_hours(site_name=site_name, date_type=date_type, date_info=date_info,
wangrui committed
25
                                       principal='chenyuanjie', priority=2, export_tools_type=1, belonging_to_process='买家搜索词')
chenyuanjie committed
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
        db_type = 'postgresql_cluster'
        print("导出到PG-Cluster库中")
    year_str = CommonUtil.safeIndex(date_info.split("-"), 0, None)
    export_master_tb = f"{site_name}_buyer_st"
    export_tb = f"{site_name}_buyer_st_{year_str}"

    # 获取数据库连接
    mysql_engine = DBUtil.get_db_engine('mysql', 'us')
    pg_clu_engine = DBUtil.get_db_engine(db_type, site_name)

    start_day_sql = ''
    end_day_sql = ''
    if date_type == 'week':
        start_day_sql = text(f"""
                select date from date_20_to_30 
                where year_week='{date_info}' and week_day=1;
            """)
        end_day_sql = text(f"""
                SELECT date from date_20_to_30 
                where year_week='{date_info}' and week_day=7;
            """)
    elif date_type == 'month_week':
        start_day_sql = text(f"""
                    SELECT STR_TO_DATE('{date_info}-01', '%Y-%m-%d') AS date;
                """)
        end_day_sql = text(f"""
                    SELECT LAST_DAY(STR_TO_DATE('{date_info}-01', '%Y-%m-%d')) AS date;
                 """)
    with mysql_engine.connect() as connection:
        start_day = (connection.execute(start_day_sql).first())['date']
        end_day = (connection.execute(end_day_sql).first())['date']
        connection.close()
        start_day = str(start_day).replace("-", "")
        end_day = str(end_day).replace("-", "")
    print(f"{date_info}的月初是:{start_day},月末是:{end_day}")
    if date_type == 'us':
        sql = f"""
            create table if not exists {export_tb} (
                like {export_master_tb} including comments
            );
            delete from {export_tb} where report_date >= '{start_day}' and report_date <= '{end_day}'
        """
    else:
        sql = f"""
                  create table if not exists {export_tb} (
                      like {export_master_tb} including comments
                  );
                  delete from {export_tb} where report_date > '{start_day}' and report_date <= '{end_day}'
              """

    DBUtil.engine_exec_sql(pg_clu_engine, sql)

    # 导出表名
    sh = CommonUtil.build_export_sh(
        site_name=site_name,
        db_type=db_type,
        hive_tb="dwd_buyer_st",
        export_tb=export_tb,
        col=[
            "search_term",
            "report_date",
            "sales",
            "orders",
            "clicks"
        ],
        partition_dict={
            "site_name": site_name,
            "date_type": date_type,
            "date_info": date_info
        }
    )

    client = SSHUtil.get_ssh_client()
    SSHUtil.exec_command_async(client, sh, ignore_err=False)
    client.close()


    print("success")