1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import os
import sys
import pymysql
sys.path.append(os.path.dirname(sys.path[0]))
from sqlalchemy import create_engine
from sqlalchemy.engine import Engine
from utils.common_util import CommonUtil
def get_db_engine():
# Doris连接参数
con = {
"host": "192.168.10.218",
"port": 9030,
"user": "root",
"password": "",
"database": "adv"
}
connection = pymysql.connect(**con)
return connection
def exec_sql(connection):
cursor = connection.cursor()
# 计算数据SQL
sql1 = """
TRUNCATE table sp_customer_search_report_copy;
INSERT INTO sp_customer_search_report_copy
SELECT
startDate,
site,
userName,
a.keywordId AS 'keywordId',
keywordText,
matchType,
impressions,
clicks,
cost,
updated_at,
created_at,
account,
attributedConversions7d,
attributedSales7d,
customerSearchText,
a.campaignId AS 'campaignId',
id,
c.adGroupId AS 'adGroupId',
adGroupName
FROM
(
SELECT
startDate,
site,
keywordId,
keywordText,
matchType,
impressions,
clicks,
cost,
updated_at,
created_at,
account,
attributedConversions7d,
attributedSales7d,
customerSearchText,
campaignId,
id,
adGroupId,
adGroupName
FROM
mysql_adv.advertising_manager_us.us_customer_search_report
) a
LEFT JOIN ( SELECT userName, campaignId FROM mysql_adv.advertising_manager_us.user_campaign_id GROUP BY userName, campaignId ) b ON a.campaignId = b.campaignId
LEFT JOIN ( SELECT keywordId, adGroupId FROM mysql_adv.advertising_manager_us.us_keyword GROUP BY keywordId, adGroupId ) c ON a.keywordId = c.keywordId;
"""
print("----------------------------------------------------")
print(sql1)
# 交换表名SQL
sql2 = """
ALTER TABLE sp_customer_search_report_copy RENAME sp_customer_search_report_tmp;
ALTER TABLE sp_customer_search_report RENAME sp_customer_search_report_copy;
ALTER TABLE sp_customer_search_report_tmp RENAME sp_customer_search_report;
"""
print("----------------------------------------------------")
print(sql2)
# 优化查询计划SQL
sql3 = """
ANALYZE TABLE sp_customer_search_report;
"""
print("----------------------------------------------------")
print(sql3)
# 执行计算数据
try:
cursor.execute(sql1)
print("SQL执行成功")
except Exception as e:
print(f"SQL执行失败,报错信息:{e}")
CommonUtil.send_wx_msg(["chenyuanjie"], "【adv:sp_customer_search_report导入失败】", "报错信息请查看日志")
sys.exit()
# 计算成功,执行交换表名
cursor.execute(sql2)
print("交换表名成功")
# 优化查询计划
cursor.execute(sql3)
print("ANALYZE成功")
# 关闭数据库engine
cursor.close()
connection.close()
if __name__ == '__main__':
# 获取数据库engine
connection = get_db_engine()
print("----------------------------------------------------")
print("成功获取Doris的engine")
# 通过engine获取数据库连接,执行SQL
exec_sql(connection)
# 完成通知
CommonUtil.send_wx_msg(["chenyuanjie"], "【adv:sp_customer_search_report导入成功】", "悉知")