Commit 13cc3af4 by chenyuanjie

adv_cps计算

parent a9a8d09b
# Default ignored files
/shelf/
/workspace.xml
# Editor-based HTTP Client requests
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
<component name="PyDocumentationSettings">
<option name="format" value="PLAIN" />
<option name="myDocStringFormat" value="Plain" />
</component>
<component name="TemplatesService">
<option name="TEMPLATE_CONFIGURATION" value="Jinja2" />
<option name="TEMPLATE_FOLDERS">
<list>
<option value="$MODULE_DIR$/py_gpt/app/templates" />
</list>
</option>
</component>
</module>
\ No newline at end of file
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.9" project-jdk-type="Python SDK" />
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/Amazon-Selection-Data.iml" filepath="$PROJECT_DIR$/.idea/Amazon-Selection-Data.iml" />
</modules>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>
\ No newline at end of file
import os
import sys
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from utils.db_util import DBUtil
from utils.DorisHelper import DorisHelper
from utils.spark_util import SparkUtil
from utils.common_util import CommonUtil
from pyspark.sql import functions as F
class StCpsLast30day(object):
def __init__(self, site_name):
site_id_map = {
"us": 4,
"uk": 3,
"de": 6
}
self.site_name = site_name
self.site_id = site_id_map[site_name]
# 创建spark对象
self.spark = SparkUtil.get_spark_session(app_name=f"st_cps_to_pg: {site_name}")
self.df_keyword_cps = self.spark.sql(f"select 1+1;")
self.df_st_key = self.spark.sql(f"select 1+1;")
self.df_save = self.spark.sql(f"select 1+1;")
def run(self):
# 计算广告词cps相关数据
sql_cps = f"""
SELECT
keyword,
GROUP_CONCAT( CONCAT( strategy, ': ', cps ), '\n' ) AS strategy_cps,
MAX(CASE WHEN strategy = 'down only' THEN cps END) AS down_only_val,
MAX(CASE WHEN strategy = 'up and down' THEN cps END) AS up_and_down_val,
MAX(CASE WHEN strategy = 'fixed' THEN cps END) AS fixed_val
FROM (
SELECT
keywordText AS keyword,
CASE WHEN strategy = 'legacyForSales' THEN 'down only'
WHEN strategy = 'autoForSales' THEN 'up and down'
WHEN strategy = 'manual' THEN 'fixed'
ELSE strategy END AS strategy,
ROUND( SUM( clicks ) / SUM( orders ), 2 ) AS cps
FROM (
SELECT keywordText, clicks, orders, strategy
FROM (
SELECT campaignId, keywordText, SUM(clicks) AS clicks, SUM(attributedConversions7d) AS orders
FROM advertising_manager.keyword_report
WHERE startDate >= CURDATE() - INTERVAL 30 DAY
AND site = {self.site_id}
GROUP BY campaignId, keywordId, keywordText
) a INNER JOIN advertising_manager.adv_campaign b ON a.campaignId = b.campaignId
WHERE b.strategy IN ('manual', 'autoForSales', 'legacyForSales')
) t1
GROUP BY keywordText, strategy
) t2
GROUP BY keyword
"""
self.df_keyword_cps = DorisHelper.spark_import_with_sql(self.spark, sql_cps, 'adv').repartition(40, 'keyword').cache()
print("广告词cps计算结果如下:")
self.df_keyword_cps.show(10, True)
# 读取st_key数据
sql_key = f"""
SELECT search_term AS keyword, cast(st_key as int) AS st_key FROM ods_st_key WHERE site_name = '{self.site_name}'
"""
self.df_st_key = self.spark.sql(sql_key).repartition(40, 'keyword').cache()
print("st_key数据如下:")
self.df_st_key.show(10, True)
# 关联
self.df_save = self.df_keyword_cps.join(
self.df_st_key, on=['keyword'], how='inner'
)
self.df_save = self.df_save.withColumn(
"site_id", F.lit(self.site_id)
).withColumn(
"created_at", F.current_timestamp()
).select(
"site_id", "st_key", "keyword", "created_at", "strategy_cps", "down_only_val", "up_and_down_val", "fixed_val"
).cache()
# 清理历史数据
pg_engine = DBUtil.get_db_engine('postgresql_cluster', self.site_name)
sql_clean = f"""
TRUNCATE TABLE st_cps_current_v2;
"""
DBUtil.engine_exec_sql(pg_engine, sql_clean)
# 写出到pg集群
pg_conn = DBUtil.get_connection_info('postgresql_cluster', self.site_name)
self.df_save.write \
.format("jdbc") \
.option("url", pg_conn["url"]) \
.option("dbtable", "st_cps_current_v2") \
.option("user", pg_conn["username"]) \
.option("password", pg_conn["pwd"]) \
.mode("append") \
.save()
self.spark.stop()
print("success")
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
obj = StCpsLast30day(site_name)
obj.run()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment