1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import os
import sys
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from utils.db_util import DBUtil
from utils.DorisHelper import DorisHelper
from utils.spark_util import SparkUtil
from utils.common_util import CommonUtil
from pyspark.sql import functions as F
class StCpsLast30day(object):
def __init__(self, site_name):
site_id_map = {
"us": 4,
"uk": 3,
"de": 6
}
self.site_name = site_name
self.site_id = site_id_map[site_name]
# 创建spark对象
self.spark = SparkUtil.get_spark_session(app_name=f"st_cps_to_pg: {site_name}")
self.df_keyword_cps = self.spark.sql(f"select 1+1;")
self.df_st_key = self.spark.sql(f"select 1+1;")
self.df_save = self.spark.sql(f"select 1+1;")
def run(self):
# 计算广告词cps相关数据
sql_cps = f"""
SELECT
keyword,
GROUP_CONCAT( CONCAT( strategy, ': ', cps ), '\n' ) AS strategy_cps,
MAX(CASE WHEN strategy = 'down only' THEN cps END) AS down_only_val,
MAX(CASE WHEN strategy = 'up and down' THEN cps END) AS up_and_down_val,
MAX(CASE WHEN strategy = 'fixed' THEN cps END) AS fixed_val
FROM (
SELECT
keywordText AS keyword,
CASE WHEN strategy = 'legacyForSales' THEN 'down only'
WHEN strategy = 'autoForSales' THEN 'up and down'
WHEN strategy = 'manual' THEN 'fixed'
ELSE strategy END AS strategy,
ROUND( SUM( clicks ) / SUM( orders ), 2 ) AS cps
FROM (
SELECT keywordText, clicks, orders, strategy
FROM (
SELECT campaignId, keywordText, SUM(clicks) AS clicks, SUM(attributedConversions7d) AS orders
FROM advertising_manager.keyword_report
WHERE startDate >= CURDATE() - INTERVAL 30 DAY
AND site = {self.site_id}
GROUP BY campaignId, keywordId, keywordText
) a INNER JOIN advertising_manager.adv_campaign b ON a.campaignId = b.campaignId
WHERE b.strategy IN ('manual', 'autoForSales', 'legacyForSales')
) t1
GROUP BY keywordText, strategy
) t2
GROUP BY keyword
"""
self.df_keyword_cps = DorisHelper.spark_import_with_sql(self.spark, sql_cps, 'adv').repartition(40, 'keyword').cache()
print("广告词cps计算结果如下:")
self.df_keyword_cps.show(10, True)
# 读取st_key数据
sql_key = f"""
SELECT search_term AS keyword, cast(st_key as int) AS st_key FROM ods_st_key WHERE site_name = '{self.site_name}'
"""
self.df_st_key = self.spark.sql(sql_key).repartition(40, 'keyword').cache()
print("st_key数据如下:")
self.df_st_key.show(10, True)
# 关联
self.df_save = self.df_keyword_cps.join(
self.df_st_key, on=['keyword'], how='inner'
)
self.df_save = self.df_save.withColumn(
"site_id", F.lit(self.site_id)
).withColumn(
"created_at", F.current_timestamp()
).select(
"site_id", "st_key", "keyword", "created_at", "strategy_cps", "down_only_val", "up_and_down_val", "fixed_val"
).cache()
# 清理历史数据
pg_engine = DBUtil.get_db_engine('postgresql_cluster', self.site_name)
sql_clean = f"""
TRUNCATE TABLE st_cps_current_v2;
"""
DBUtil.engine_exec_sql(pg_engine, sql_clean)
# 写出到pg集群
pg_conn = DBUtil.get_connection_info('postgresql_cluster', self.site_name)
self.df_save.write \
.format("jdbc") \
.option("url", pg_conn["url"]) \
.option("dbtable", "st_cps_current_v2") \
.option("user", pg_conn["username"]) \
.option("password", pg_conn["pwd"]) \
.mode("append") \
.save()
self.spark.stop()
print("success")
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
obj = StCpsLast30day(site_name)
obj.run()