import os import sys import pymysql sys.path.append(os.path.dirname(sys.path[0])) from sqlalchemy import create_engine from sqlalchemy.engine import Engine from utils.common_util import CommonUtil def get_db_engine(): # Doris连接参数 con = { "host": "192.168.10.218", "port": 9030, "user": "root", "password": "", "database": "adv" } connection = pymysql.connect(**con) return connection def exec_sql(connection): cursor = connection.cursor() # 计算数据SQL sql1 = """ TRUNCATE table sp_customer_search_report_copy; INSERT INTO sp_customer_search_report_copy SELECT startDate, site, userName, a.keywordId AS 'keywordId', keywordText, matchType, impressions, clicks, cost, updated_at, created_at, account, attributedConversions7d, attributedSales7d, customerSearchText, a.campaignId AS 'campaignId', id, c.adGroupId AS 'adGroupId', adGroupName FROM ( SELECT startDate, site, keywordId, keywordText, matchType, impressions, clicks, cost, updated_at, created_at, account, attributedConversions7d, attributedSales7d, customerSearchText, campaignId, id, adGroupId, adGroupName FROM mysql_adv.advertising_manager_us.us_customer_search_report ) a LEFT JOIN ( SELECT userName, campaignId FROM mysql_adv.advertising_manager_us.user_campaign_id GROUP BY userName, campaignId ) b ON a.campaignId = b.campaignId LEFT JOIN ( SELECT keywordId, adGroupId FROM mysql_adv.advertising_manager_us.us_keyword GROUP BY keywordId, adGroupId ) c ON a.keywordId = c.keywordId; """ print("----------------------------------------------------") print(sql1) # 交换表名SQL sql2 = """ ALTER TABLE sp_customer_search_report_copy RENAME sp_customer_search_report_tmp; ALTER TABLE sp_customer_search_report RENAME sp_customer_search_report_copy; ALTER TABLE sp_customer_search_report_tmp RENAME sp_customer_search_report; """ print("----------------------------------------------------") print(sql2) # 优化查询计划SQL sql3 = """ ANALYZE TABLE sp_customer_search_report; """ print("----------------------------------------------------") print(sql3) # 执行计算数据 try: cursor.execute(sql1) print("SQL执行成功") except Exception as e: print(f"SQL执行失败,报错信息:{e}") CommonUtil.send_wx_msg(["chenyuanjie"], "【adv:sp_customer_search_report导入失败】", "报错信息请查看日志") sys.exit() # 计算成功,执行交换表名 cursor.execute(sql2) print("交换表名成功") # 优化查询计划 cursor.execute(sql3) print("ANALYZE成功") # 关闭数据库engine cursor.close() connection.close() if __name__ == '__main__': # 获取数据库engine connection = get_db_engine() print("----------------------------------------------------") print("成功获取Doris的engine") # 通过engine获取数据库连接,执行SQL exec_sql(connection) # 完成通知 CommonUtil.send_wx_msg(["chenyuanjie"], "【adv:sp_customer_search_report导入成功】", "悉知")