import os import sys sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from utils.db_util import DBUtil from utils.DorisHelper import DorisHelper from utils.spark_util import SparkUtil from utils.common_util import CommonUtil from pyspark.sql import functions as F class StCpsLast30day(object): def __init__(self, site_name): site_id_map = { "us": 4, "uk": 3, "de": 6 } self.site_name = site_name self.site_id = site_id_map[site_name] # 创建spark对象 self.spark = SparkUtil.get_spark_session(app_name=f"st_cps_to_pg: {site_name}") self.df_keyword_cps = self.spark.sql(f"select 1+1;") self.df_st_key = self.spark.sql(f"select 1+1;") self.df_save = self.spark.sql(f"select 1+1;") def run(self): # 计算广告词cps相关数据 sql_cps = f""" SELECT keyword, GROUP_CONCAT( CONCAT( strategy, ': ', cps ), '\n' ) AS strategy_cps, MAX(CASE WHEN strategy = 'down only' THEN cps END) AS down_only_val, MAX(CASE WHEN strategy = 'up and down' THEN cps END) AS up_and_down_val, MAX(CASE WHEN strategy = 'fixed' THEN cps END) AS fixed_val FROM ( SELECT keywordText AS keyword, CASE WHEN strategy = 'legacyForSales' THEN 'down only' WHEN strategy = 'autoForSales' THEN 'up and down' WHEN strategy = 'manual' THEN 'fixed' ELSE strategy END AS strategy, ROUND( SUM( clicks ) / SUM( orders ), 2 ) AS cps FROM ( SELECT keywordText, clicks, orders, strategy FROM ( SELECT campaignId, keywordText, SUM(clicks) AS clicks, SUM(attributedConversions7d) AS orders FROM advertising_manager.keyword_report WHERE startDate >= CURDATE() - INTERVAL 30 DAY AND site = {self.site_id} GROUP BY campaignId, keywordId, keywordText ) a INNER JOIN advertising_manager.adv_campaign b ON a.campaignId = b.campaignId WHERE b.strategy IN ('manual', 'autoForSales', 'legacyForSales') ) t1 GROUP BY keywordText, strategy ) t2 GROUP BY keyword """ self.df_keyword_cps = DorisHelper.spark_import_with_sql(self.spark, sql_cps, 'adv').repartition(40, 'keyword').cache() print("广告词cps计算结果如下:") self.df_keyword_cps.show(10, True) # 读取st_key数据 sql_key = f""" SELECT search_term AS keyword, cast(st_key as int) AS st_key FROM ods_st_key WHERE site_name = '{self.site_name}' """ self.df_st_key = self.spark.sql(sql_key).repartition(40, 'keyword').cache() print("st_key数据如下:") self.df_st_key.show(10, True) # 关联 self.df_save = self.df_keyword_cps.join( self.df_st_key, on=['keyword'], how='inner' ) self.df_save = self.df_save.withColumn( "site_id", F.lit(self.site_id) ).withColumn( "created_at", F.current_timestamp() ).select( "site_id", "st_key", "keyword", "created_at", "strategy_cps", "down_only_val", "up_and_down_val", "fixed_val" ).cache() # 清理历史数据 pg_engine = DBUtil.get_db_engine('postgresql_cluster', self.site_name) sql_clean = f""" TRUNCATE TABLE st_cps_current_v2; """ DBUtil.engine_exec_sql(pg_engine, sql_clean) # 写出到pg集群 pg_conn = DBUtil.get_connection_info('postgresql_cluster', self.site_name) self.df_save.write \ .format("jdbc") \ .option("url", pg_conn["url"]) \ .option("dbtable", "st_cps_current_v2") \ .option("user", pg_conn["username"]) \ .option("password", pg_conn["pwd"]) \ .mode("append") \ .save() self.spark.stop() print("success") if __name__ == '__main__': site_name = CommonUtil.get_sys_arg(1, None) obj = StCpsLast30day(site_name) obj.run()