adv_sp_customer_search_report.py 3.53 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
import os
import sys
import pymysql

sys.path.append(os.path.dirname(sys.path[0]))
from sqlalchemy import create_engine
from sqlalchemy.engine import Engine
from utils.common_util import CommonUtil


def get_db_engine():
    # Doris连接参数
    con = {
        "host": "192.168.10.218",
        "port": 9030,
        "user": "root",
        "password": "",
        "database": "adv"
    }
    connection = pymysql.connect(**con)
    return connection


def exec_sql(connection):
    cursor = connection.cursor()
    # 计算数据SQL
    sql1 = """
    TRUNCATE table sp_customer_search_report_copy;
    
    INSERT INTO sp_customer_search_report_copy 
    SELECT
        startDate,
        site,
        userName,
        a.keywordId AS 'keywordId',
        keywordText,
        matchType,
        impressions,
        clicks,
        cost,
        updated_at,
        created_at,
        account,
        attributedConversions7d,
        attributedSales7d,
        customerSearchText,
        a.campaignId AS 'campaignId',
        id,
        c.adGroupId AS 'adGroupId',
        adGroupName 
    FROM
        (
        SELECT
            startDate,
            site,
            keywordId,
            keywordText,
            matchType,
            impressions,
            clicks,
            cost,
            updated_at,
            created_at,
            account,
            attributedConversions7d,
            attributedSales7d,
            customerSearchText,
            campaignId,
            id,
            adGroupId,
            adGroupName 
        FROM
            mysql_adv.advertising_manager_us.us_customer_search_report 
        ) a
        LEFT JOIN ( SELECT userName, campaignId FROM mysql_adv.advertising_manager_us.user_campaign_id GROUP BY userName, campaignId ) b ON a.campaignId = b.campaignId
        LEFT JOIN ( SELECT keywordId, adGroupId FROM mysql_adv.advertising_manager_us.us_keyword GROUP BY keywordId, adGroupId ) c ON a.keywordId = c.keywordId;
    """
    print("----------------------------------------------------")
    print(sql1)

    # 交换表名SQL
    sql2 = """
    ALTER TABLE sp_customer_search_report_copy RENAME sp_customer_search_report_tmp;

    ALTER TABLE sp_customer_search_report RENAME sp_customer_search_report_copy;

    ALTER TABLE sp_customer_search_report_tmp RENAME sp_customer_search_report;
    """
    print("----------------------------------------------------")
    print(sql2)

    # 优化查询计划SQL
    sql3 = """
    ANALYZE TABLE sp_customer_search_report;
    """
    print("----------------------------------------------------")
    print(sql3)

    # 执行计算数据
    try:
        cursor.execute(sql1)
        print("SQL执行成功")
    except Exception as e:
        print(f"SQL执行失败,报错信息:{e}")
        CommonUtil.send_wx_msg(["chenyuanjie"], "【adv:sp_customer_search_report导入失败】", "报错信息请查看日志")
        sys.exit()

    # 计算成功,执行交换表名
    cursor.execute(sql2)
    print("交换表名成功")

    # 优化查询计划

    cursor.execute(sql3)
    print("ANALYZE成功")

    # 关闭数据库engine
    cursor.close()
    connection.close()


if __name__ == '__main__':

    # 获取数据库engine
    connection = get_db_engine()
    print("----------------------------------------------------")
    print("成功获取Doris的engine")

    # 通过engine获取数据库连接,执行SQL
    exec_sql(connection)

    # 完成通知
    CommonUtil.send_wx_msg(["chenyuanjie"], "【adv:sp_customer_search_report导入成功】", "悉知")