adv_sp_customer_search_report.py 3.53 KB
import os
import sys
import pymysql

sys.path.append(os.path.dirname(sys.path[0]))
from sqlalchemy import create_engine
from sqlalchemy.engine import Engine
from utils.common_util import CommonUtil


def get_db_engine():
    # Doris连接参数
    con = {
        "host": "192.168.10.218",
        "port": 9030,
        "user": "root",
        "password": "",
        "database": "adv"
    }
    connection = pymysql.connect(**con)
    return connection


def exec_sql(connection):
    cursor = connection.cursor()
    # 计算数据SQL
    sql1 = """
    TRUNCATE table sp_customer_search_report_copy;
    
    INSERT INTO sp_customer_search_report_copy 
    SELECT
        startDate,
        site,
        userName,
        a.keywordId AS 'keywordId',
        keywordText,
        matchType,
        impressions,
        clicks,
        cost,
        updated_at,
        created_at,
        account,
        attributedConversions7d,
        attributedSales7d,
        customerSearchText,
        a.campaignId AS 'campaignId',
        id,
        c.adGroupId AS 'adGroupId',
        adGroupName 
    FROM
        (
        SELECT
            startDate,
            site,
            keywordId,
            keywordText,
            matchType,
            impressions,
            clicks,
            cost,
            updated_at,
            created_at,
            account,
            attributedConversions7d,
            attributedSales7d,
            customerSearchText,
            campaignId,
            id,
            adGroupId,
            adGroupName 
        FROM
            mysql_adv.advertising_manager_us.us_customer_search_report 
        ) a
        LEFT JOIN ( SELECT userName, campaignId FROM mysql_adv.advertising_manager_us.user_campaign_id GROUP BY userName, campaignId ) b ON a.campaignId = b.campaignId
        LEFT JOIN ( SELECT keywordId, adGroupId FROM mysql_adv.advertising_manager_us.us_keyword GROUP BY keywordId, adGroupId ) c ON a.keywordId = c.keywordId;
    """
    print("----------------------------------------------------")
    print(sql1)

    # 交换表名SQL
    sql2 = """
    ALTER TABLE sp_customer_search_report_copy RENAME sp_customer_search_report_tmp;

    ALTER TABLE sp_customer_search_report RENAME sp_customer_search_report_copy;

    ALTER TABLE sp_customer_search_report_tmp RENAME sp_customer_search_report;
    """
    print("----------------------------------------------------")
    print(sql2)

    # 优化查询计划SQL
    sql3 = """
    ANALYZE TABLE sp_customer_search_report;
    """
    print("----------------------------------------------------")
    print(sql3)

    # 执行计算数据
    try:
        cursor.execute(sql1)
        print("SQL执行成功")
    except Exception as e:
        print(f"SQL执行失败,报错信息:{e}")
        CommonUtil.send_wx_msg(["chenyuanjie"], "【adv:sp_customer_search_report导入失败】", "报错信息请查看日志")
        sys.exit()

    # 计算成功,执行交换表名
    cursor.execute(sql2)
    print("交换表名成功")

    # 优化查询计划

    cursor.execute(sql3)
    print("ANALYZE成功")

    # 关闭数据库engine
    cursor.close()
    connection.close()


if __name__ == '__main__':

    # 获取数据库engine
    connection = get_db_engine()
    print("----------------------------------------------------")
    print("成功获取Doris的engine")

    # 通过engine获取数据库连接,执行SQL
    exec_sql(connection)

    # 完成通知
    CommonUtil.send_wx_msg(["chenyuanjie"], "【adv:sp_customer_search_report导入成功】", "悉知")