st_cps_last30day.py 4.17 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录

from utils.db_util import DBUtil
from utils.DorisHelper import DorisHelper
from utils.spark_util import SparkUtil
from utils.common_util import CommonUtil
from pyspark.sql import functions as F


class StCpsLast30day(object):

    def __init__(self, site_name):
        site_id_map = {
            "us": 4,
            "uk": 3,
            "de": 6
        }
        self.site_name = site_name
        self.site_id = site_id_map[site_name]
        # 创建spark对象
        self.spark = SparkUtil.get_spark_session(app_name=f"st_cps_to_pg: {site_name}")
        self.df_keyword_cps = self.spark.sql(f"select 1+1;")
        self.df_st_key = self.spark.sql(f"select 1+1;")
        self.df_save = self.spark.sql(f"select 1+1;")

    def run(self):
        # 计算广告词cps相关数据
        sql_cps = f"""
        SELECT
            keyword,
            GROUP_CONCAT( CONCAT( strategy, ': ', cps ), '\n' ) AS strategy_cps,
            MAX(CASE WHEN strategy = 'down only' THEN cps END) AS down_only_val, 
            MAX(CASE WHEN strategy = 'up and down' THEN cps END) AS up_and_down_val, 
            MAX(CASE WHEN strategy = 'fixed' THEN cps END) AS fixed_val 
        FROM (
            SELECT 
                keywordText AS keyword,
                CASE WHEN strategy = 'legacyForSales' THEN 'down only'
                     WHEN strategy = 'autoForSales' THEN 'up and down'
                     WHEN strategy = 'manual' THEN 'fixed'
                     ELSE strategy END AS strategy, 
                ROUND( SUM( clicks ) / SUM( orders ), 2 ) AS cps 
            FROM (
                SELECT keywordText, clicks, orders, strategy 
                FROM ( 
                    SELECT campaignId, keywordText, SUM(clicks) AS clicks, SUM(attributedConversions7d) AS orders 
                    FROM advertising_manager.keyword_report 
                    WHERE startDate >= CURDATE() - INTERVAL 30 DAY 
                      AND site = {self.site_id}
                    GROUP BY campaignId, keywordId, keywordText
                    ) a INNER JOIN advertising_manager.adv_campaign b ON a.campaignId = b.campaignId 
                WHERE b.strategy IN ('manual', 'autoForSales', 'legacyForSales')
                ) t1
            GROUP BY keywordText, strategy
            ) t2
        GROUP BY keyword
        """
        self.df_keyword_cps = DorisHelper.spark_import_with_sql(self.spark, sql_cps, 'adv').repartition(40, 'keyword').cache()
        print("广告词cps计算结果如下:")
        self.df_keyword_cps.show(10, True)

        # 读取st_key数据
        sql_key = f"""
        SELECT search_term AS keyword, cast(st_key as int) AS st_key FROM ods_st_key WHERE site_name = '{self.site_name}' 
        """
        self.df_st_key = self.spark.sql(sql_key).repartition(40, 'keyword').cache()
        print("st_key数据如下:")
        self.df_st_key.show(10, True)

        # 关联
        self.df_save = self.df_keyword_cps.join(
            self.df_st_key, on=['keyword'], how='inner'
        )
        self.df_save = self.df_save.withColumn(
            "site_id", F.lit(self.site_id)
        ).withColumn(
            "created_at", F.current_timestamp()
        ).select(
            "site_id", "st_key", "keyword", "created_at", "strategy_cps", "down_only_val", "up_and_down_val", "fixed_val"
        ).cache()

        # 清理历史数据
        pg_engine = DBUtil.get_db_engine('postgresql_cluster', self.site_name)
        sql_clean = f"""
        TRUNCATE TABLE st_cps_current_v2;
        """
        DBUtil.engine_exec_sql(pg_engine, sql_clean)

        # 写出到pg集群
        pg_conn = DBUtil.get_connection_info('postgresql_cluster', self.site_name)
        self.df_save.write \
            .format("jdbc") \
            .option("url", pg_conn["url"]) \
            .option("dbtable", "st_cps_current_v2") \
            .option("user", pg_conn["username"]) \
            .option("password", pg_conn["pwd"]) \
            .mode("append") \
            .save()

        self.spark.stop()
        print("success")


if __name__ == '__main__':
    site_name = CommonUtil.get_sys_arg(1, None)
    obj = StCpsLast30day(site_name)
    obj.run()