Commit ba46b2f2 by chenyuanjie

ABA周增长

parent af0b00c2
import os
import sys
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.types import *
from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil
from utils.common_util import CommonUtil
from utils.db_util import DBUtil
class DwtStDetailWeek(object):
def __init__(self, site_name, date_type, date_info):
super().__init__()
self.site_name = site_name
self.date_type = date_type
self.date_info = date_info
self.hive_tb = f'dim_st_detail_week'
self.partition_dict = {
"site_name": site_name,
"date_type": date_type,
"date_info": date_info
}
self.hdfs_path = CommonUtil.build_hdfs_path(self.hive_tb, partition_dict=self.partition_dict)
app_name = f"{self.__class__.__name__}:{site_name}:{date_type}:{date_info}"
self.spark = SparkUtil.get_spark_session(app_name)
self.partitions_by = ['site_name', 'date_type', 'date_info']
rows = self.spark.sql(f"""SELECT DISTINCT year_week FROM dim_date_20_to_30 WHERE year_week < '{date_info}' ORDER BY year_week DESC LIMIT 12""").collect()
self.date_info_last_week = rows[0]['year_week'] if len(rows) > 0 else None
self.date_info_2_week_ago = rows[1]['year_week'] if len(rows) > 1 else None
self.date_info_3_week_ago = rows[2]['year_week'] if len(rows) > 2 else None
self.date_info_4_week_ago = rows[3]['year_week'] if len(rows) > 3 else None
self.date_info_12_week_ago = rows[11]['year_week'] if len(rows) > 11 else None
year_month = self.spark.sql(f"""SELECT year_month FROM dim_date_20_to_30 WHERE year_week = '{date_info}' and week_day = 1""").collect()
self.year_month = year_month[0]['year_month'] if len(year_month) > 0 else None
self.df_st_detail = self.spark.sql(f"select 1+1;")
self.df_st_detail_last_week = self.spark.sql(f"select 1+1;")
self.df_st_detail_last_4_week = self.spark.sql(f"select 1+1;")
self.df_st_detail_4_week_ago = self.spark.sql(f"select 1+1;")
self.df_st_detail_12_week_ago = self.spark.sql(f"select 1+1;")
self.df_st_rank = self.spark.sql(f"select 1+1;")
self.df_st_key = self.spark.sql(f"select 1+1;")
self.df_st_month = self.spark.sql(f"select 1+1;")
self.df_st_detail_week = self.spark.sql(f"select 1+1;")
self.df_st_detail_1_week_ago = self.spark.sql(f"select 1+1;")
self.df_st_detail_2_week_ago = self.spark.sql(f"select 1+1;")
self.df_st_detail_3_week_ago = self.spark.sql(f"select 1+1;")
self.df_save = self.spark.sql(f"select 1+1;")
self.cols = ['rank', 'asin1', 'product_title1', 'click_share1', 'conversion_share1', 'brand1', 'category1',
'asin2', 'product_title2', 'click_share2', 'conversion_share2', 'brand2', 'category2',
'asin3', 'product_title3', 'click_share3', 'conversion_share3', 'brand3', 'category3', 'quantity_being_sold']
self.sp_symbols = []
def st_word_count(self, sp_symbols):
def udf_st_word_count(name):
# 特殊字符基准列表---迁移到数据库维护 -已处理
# sp_symbols = ['?', '!', '-', '%', '&', '|']
split_list = name.split(" ")
# 取切割list中包含的特殊字符
sp_list = list(set(split_list).intersection(set(sp_symbols)))
# 排除掉特殊list中的特殊字符
word_list = list(filter(lambda x: x not in sp_list, split_list))
word_count = len(word_list)
# 存在多个特殊字符都设定为 1
if len(sp_list) > 0:
symbol_count = 1
else:
symbol_count = 0
return word_count + symbol_count
return F.udf(udf_st_word_count, IntegerType())
def read_data(self):
print("读取ods_brand_analytics搜索词周报告")
select_cols = ",".join(['search_term', 'date_info', 'updated_time'] + self.cols)
sql = f"""
select
{select_cols}
from ods_brand_analytics
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info <= '{self.date_info}'
and (date_info >= '{self.date_info_4_week_ago}' or date_info = '{self.date_info_12_week_ago}')
and search_term is not null;
"""
self.df_st_detail = self.spark.sql(sqlQuery=sql)
# 清洗去重,防止每周有重复搜索词
window = Window.partitionBy(['search_term', 'date_info']).orderBy(self.df_st_detail.updated_time.desc_nulls_last())
self.df_st_detail = self.df_st_detail.withColumn(
'dt_rank', F.row_number().over(window=window)
).filter('dt_rank=1').drop('dt_rank', 'updated_time').cache()
self.df_st_detail_last_week = self.df_st_detail.filter(f"date_info = '{self.date_info_last_week}'")
for col in self.cols:
self.df_st_detail_last_week = self.df_st_detail_last_week.withColumnRenamed(
col, f'{col}_last_week'
)
self.df_st_detail_last_week.cache()
print("1周前数据如下:")
self.df_st_detail_last_week.show(10, True)
self.df_st_detail_last_4_week = self.df_st_detail.filter(f"date_info > '{self.date_info_4_week_ago}'")
for col in self.cols:
self.df_st_detail_last_4_week = self.df_st_detail_last_4_week.withColumnRenamed(
col, f'{col}_last_4_week'
)
self.df_st_detail_last_4_week.cache()
print("近4周数据如下:")
self.df_st_detail_last_4_week.show(10, True)
self.df_st_detail_4_week_ago = self.df_st_detail.filter(f"date_info = '{self.date_info_4_week_ago}'")
for col in self.cols:
self.df_st_detail_4_week_ago = self.df_st_detail_4_week_ago.withColumnRenamed(
col, f'{col}_4_week_ago'
)
self.df_st_detail_4_week_ago.cache()
print("4周前数据如下:")
self.df_st_detail_4_week_ago.show(10, True)
self.df_st_detail_12_week_ago = self.df_st_detail.filter(f"date_info = '{self.date_info_12_week_ago}'")
for col in self.cols:
self.df_st_detail_12_week_ago = self.df_st_detail_12_week_ago.withColumnRenamed(
col, f'{col}_12_week_ago'
)
self.df_st_detail_12_week_ago.cache()
print("12周前数据如下:")
self.df_st_detail_12_week_ago.show(10, True)
self.df_st_detail = self.df_st_detail.filter(f"date_info = '{self.date_info}'").cache()
print("本周数据如下:")
self.df_st_detail.show(10, True)
sql = f"""
select rank, search_num as search_volume, rate as st_search_rate, date_info from ods_rank_search_rate_repeat where site_name = '{self.site_name}';
"""
self.df_st_rank = self.spark.sql(sql)
window = Window.partitionBy(['rank']).orderBy(self.df_st_rank.date_info.desc())
self.df_st_rank = self.df_st_rank.withColumn(
'date_info_rank', F.row_number().over(window=window)
).filter('date_info_rank=1').drop('date_info_rank', 'date_info').cache()
print("搜索词排名+搜索量+转化率如下:")
self.df_st_rank.show(10, True)
sql = f"""
select st_key, search_term from ods_st_key where site_name = '{self.site_name}';
"""
self.df_st_key = self.spark.sql(sql).cache()
print("搜索词key如下:")
self.df_st_key.show(10, True)
sql = f"""
select search_term, st_bsr_cate_1_id_new as category_id, st_bsr_cate_current_id_new as category_current_id, market_cycle_type, date_info from dwt_aba_st_analytics where site_name = '{self.site_name}' and date_type = 'month' and date_info <= '{self.year_month}';
"""
self.df_st_month = self.spark.sql(sql)
window = Window.partitionBy(['search_term']).orderBy(self.df_st_month.date_info.desc())
self.df_st_month = self.df_st_month.withColumn(
'date_info_rank', F.row_number().over(window=window)
).filter('date_info_rank=1').drop('date_info_rank', 'date_info').cache()
print("分类、市场周期月数据如下:")
self.df_st_month.show(10, True)
# 从pgsql获取特殊字符匹配字典表:match_character_dict
pg_sql = f"""
select character_name from match_character_dict where match_type = '特殊字符'
"""
conn_info = DBUtil.get_connection_info("mysql", "us")
chart_dict_df = SparkUtil.read_jdbc_query(
session=self.spark,
url=conn_info["url"],
pwd=conn_info["pwd"],
username=conn_info["username"],
query=pg_sql
)
dict_df = chart_dict_df.toPandas()
# 提取特殊字符list
self.sp_symbols = dict_df["character_name"].values.tolist()
# 读取前三周计算结果,用于筛选连续增长/下降
sql = f"""
select search_term, rank_change_last_1_week, rank_rate_last_1_week, date_info from dim_st_detail_week where site_name = '{self.site_name}' and date_type = 'week' and date_info > '{self.date_info_4_week_ago}' and date_info < '{self.date_info}';
"""
self.df_st_detail_week = self.spark.sql(sql).cache()
self.df_st_detail_1_week_ago = self.df_st_detail_week.filter(F.col('date_info') == self.date_info_last_week).withColumnRenamed(
'rank_change_last_1_week', 'rank_change_1_week_ago'
).withColumnRenamed(
'rank_rate_last_1_week', 'rank_rate_1_week_ago'
).select(
'search_term', 'rank_change_1_week_ago', 'rank_rate_1_week_ago'
).cache()
print("1周前排名变化数据如下:")
self.df_st_detail_1_week_ago.show(10, True)
self.df_st_detail_2_week_ago = self.df_st_detail_week.filter(F.col('date_info') == self.date_info_2_week_ago).withColumnRenamed(
'rank_change_last_1_week', 'rank_change_2_week_ago'
).withColumnRenamed(
'rank_rate_last_1_week', 'rank_rate_2_week_ago'
).select(
'search_term', 'rank_change_2_week_ago', 'rank_rate_2_week_ago'
).cache()
print("2周前排名变化数据如下:")
self.df_st_detail_2_week_ago.show(10, True)
self.df_st_detail_3_week_ago = self.df_st_detail_week.filter(F.col('date_info') == self.date_info_3_week_ago).withColumnRenamed(
'rank_change_last_1_week', 'rank_change_3_week_ago'
).withColumnRenamed(
'rank_rate_last_1_week', 'rank_rate_3_week_ago'
).select(
'search_term', 'rank_change_3_week_ago', 'rank_rate_3_week_ago'
).cache()
print("3周前排名变化数据如下:")
self.df_st_detail_3_week_ago.show(10, True)
self.df_st_detail_week.unpersist()
def handle_st_flag(self):
# 热搜词:最近4周中,出现的次数大于80%,即近4周都出现
df_hot_search_term = self.df_st_detail_last_4_week.groupBy('search_term').agg(
F.count('date_info').alias('last_4_week_cnt')
).filter(
'last_4_week_cnt = 4'
).withColumn(
'is_search_text', F.lit(1)
).select('search_term', 'is_search_text').cache()
print("热搜词如下:")
df_hot_search_term.show(10, True)
# 上升词:本周环比上周排名增长50%的搜索词
df_rising_search_term = self.df_st_detail.join(
self.df_st_detail_last_week, 'search_term', 'inner'
).withColumn(
"is_ascending_text", (((F.col('rank_last_week') - F.col('rank')) / F.col('rank_last_week')) >= 0.5).cast('int')
).select('search_term', 'is_ascending_text').cache()
print("上升词如下:")
df_rising_search_term.show(10, True)
# 新增词:本周环比上周新出现的搜索词
df_first_search_term = self.df_st_detail.join(
self.df_st_detail_last_week, 'search_term', 'anti'
).withColumn(
'is_first_text', F.lit(1)
).select('search_term', 'is_first_text').cache()
print("新增词如下:")
df_first_search_term.show(10, True)
# 高回报词:最近4周都出现且点击占比(总)>转化占比(总)
df_high_return_search_term = self.df_st_detail_last_4_week.groupBy(['search_term', 'date_info']).agg(
F.sum(F.col('click_share1_last_4_week') + F.col('click_share2_last_4_week') + F.col('click_share3_last_4_week')).alias('click_share_total_last_4_week'),
F.sum(F.col('conversion_share1_last_4_week') + F.col('conversion_share2_last_4_week') + F.col('conversion_share3_last_4_week')).alias('conversion_share_total_last_4_week')
).filter(
'click_share_total_last_4_week > conversion_share_total_last_4_week'
).groupBy('search_term').agg(
F.count('date_info').alias('last_4_week_cnt')
).filter(
'last_4_week_cnt = 4'
).withColumn(
'is_high_return_text', F.lit(1)
).select('search_term', 'is_high_return_text').cache()
print("高回报词如下:")
df_high_return_search_term.show(10, True)
self.df_st_detail = self.df_st_detail.join(
df_hot_search_term, 'search_term', 'left'
).join(
df_rising_search_term, 'search_term', 'left'
).join(
df_first_search_term, 'search_term', 'left'
).join(
df_high_return_search_term, 'search_term', 'left'
).fillna({
'is_search_text': 0,
'is_ascending_text': 0,
'is_first_text': 0,
'is_high_return_text': 0
}).cache()
df_hot_search_term.unpersist()
df_rising_search_term.unpersist()
df_first_search_term.unpersist()
df_high_return_search_term.unpersist()
def handle_rank_rate(self):
# 计算搜索量、预估销量
self.df_st_rank = self.df_st_rank.withColumn(
'search_volume', F.round(F.col('search_volume') / 4)
).withColumn(
'orders', F.round(F.col('search_volume') * F.col('st_search_rate'))
).select('rank', 'search_volume', 'orders')
# 关联4周、12周前数据
self.df_st_detail = self.df_st_detail.join(
self.df_st_detail_last_week.select('search_term', 'rank_last_week'), on='search_term', how='left'
).join(
self.df_st_detail_4_week_ago.select('search_term', 'rank_4_week_ago'), on='search_term', how='left'
).join(
self.df_st_detail_12_week_ago.select('search_term', 'rank_12_week_ago'), on='search_term', how='left'
).join(
self.df_st_rank, on='rank', how='left'
)
# 计算排名变化、排名变化率
self.df_st_detail = self.df_st_detail.withColumn(
'rank_change_last_week',
F.when(F.col('rank_last_week').isNull(), None).otherwise(F.col('rank_last_week') - F.col('rank'))
).withColumn(
'rank_change_4_week_ago',
F.when(F.col('rank_4_week_ago').isNull(), None).otherwise(F.col('rank_4_week_ago') - F.col('rank'))
).withColumn(
'rank_change_12_week_ago',
F.when(F.col('rank_12_week_ago').isNull(), None).otherwise(F.col('rank_12_week_ago') - F.col('rank'))
).withColumn(
'rank_change_rate_last_week',
F.when(F.col('rank_last_week').isNull(), None).otherwise(
F.round((F.col('rank_last_week') - F.col('rank')) / F.col('rank_last_week'), 4)
)
).withColumn(
'rank_change_rate_4_week_ago',
F.when(F.col('rank_4_week_ago').isNull(), None).otherwise(
F.round((F.col('rank_4_week_ago') - F.col('rank')) / F.col('rank_4_week_ago'), 4)
)
).withColumn(
'rank_change_rate_12_week_ago',
F.when(F.col('rank_12_week_ago').isNull(), None).otherwise(
F.round((F.col('rank_12_week_ago') - F.col('rank')) / F.col('rank_12_week_ago'), 4)
)
).cache()
self.df_st_detail_last_week.unpersist()
self.df_st_detail_last_4_week.unpersist()
self.df_st_detail_4_week_ago.unpersist()
self.df_st_detail_12_week_ago.unpersist()
self.df_st_rank.unpersist()
def handle_other(self):
# 计算ABA词汇数
self.df_st_detail = self.df_st_detail.withColumn(
'st_word_num',
self.st_word_count(self.sp_symbols)(F.col('search_term'))
)
# 点击、转化之和
self.df_st_detail = self.df_st_detail.withColumn(
'click_share_total',
F.round(F.col('click_share1') + F.col('click_share2') + F.col('click_share3'), 4)
).withColumn(
'conversion_share_total',
F.round(F.col('conversion_share1') + F.col('conversion_share2') + F.col('conversion_share3'), 4)
).cache()
def save_data(self):
self.df_save = self.df_st_detail.filter(
'length(asin1) <= 10 AND length(asin2) <= 10 AND length(asin3) <= 10'
).join(
self.df_st_key, on='search_term', how='inner'
).join(
self.df_st_month, on='search_term', how='left'
).join(
self.df_st_detail_1_week_ago, on='search_term', how='left'
).join(
self.df_st_detail_2_week_ago, on='search_term', how='left'
).join(
self.df_st_detail_3_week_ago, on='search_term', how='left'
).withColumn(
'week', F.lit(int(self.date_info.split('-')[-1]))
).select(
'st_key',
'search_term',
'is_search_text',
'is_ascending_text',
'is_first_text',
'is_high_return_text',
'rank',
'search_volume',
'orders',
F.col('rank_last_week').alias('rank_last_1_week'),
F.col('rank_4_week_ago').alias('rank_last_4_week'),
F.col('rank_12_week_ago').alias('rank_last_12_week'),
F.col('rank_change_last_week').alias('rank_change_last_1_week'),
F.col('rank_change_4_week_ago').alias('rank_change_last_4_week'),
F.col('rank_change_12_week_ago').alias('rank_change_last_12_week'),
F.col('rank_change_rate_last_week').alias('rank_rate_last_1_week'),
F.col('rank_change_rate_4_week_ago').alias('rank_rate_last_4_week'),
F.col('rank_change_rate_12_week_ago').alias('rank_rate_last_12_week'),
'st_word_num',
'asin1',
'asin2',
'asin3',
'product_title1',
'product_title2',
'product_title3',
'click_share1',
'click_share2',
'click_share3',
'click_share_total',
'conversion_share1',
'conversion_share2',
'conversion_share3',
'conversion_share_total',
'brand1',
'brand2',
'brand3',
'category1',
'category2',
'category3',
'quantity_being_sold',
'category_id',
'category_current_id',
'market_cycle_type',
'week',
'rank_change_1_week_ago',
'rank_change_2_week_ago',
'rank_change_3_week_ago',
'rank_rate_1_week_ago',
'rank_rate_2_week_ago',
'rank_rate_3_week_ago'
).withColumn(
'site_name', F.lit(self.site_name)
).withColumn(
'date_type', F.lit(self.date_type)
).withColumn(
'date_info', F.lit(self.date_info)
).fillna({
'rank': -1,
'search_volume': -1,
'orders': -1,
'rank_last_1_week': -1,
'rank_last_4_week': -1,
'rank_last_12_week': -1,
'category_id': '',
'category_current_id': ''
})
print(f"清除hdfs目录中:{self.hdfs_path}")
HdfsUtils.delete_file_in_folder(self.hdfs_path)
print(f"当前存储的表名为:{self.hive_tb},分区为:{self.partitions_by}")
self.df_save.repartition(5).write.saveAsTable(name=self.hive_tb, format='hive', mode='append', partitionBy=self.partitions_by)
print("success")
def run(self):
# 读取数据
self.read_data()
# 处理关键词标签
self.handle_st_flag()
# 计算排名及变化率
self.handle_rank_rate()
# 计算其他
self.handle_other()
# 数据落盘
self.save_data()
if __name__ == '__main__':
site_name = sys.argv[1] # 参数1:站点
date_type = sys.argv[2] # 参数2:类型:day/week/4_week/month/quarter
date_info = sys.argv[3] # 参数3:年-月-日/年-周/年-月/年-季, 比如: 2022-1
handle_obj = DwtStDetailWeek(site_name=site_name, date_type=date_type, date_info=date_info)
handle_obj.run()
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.db_util import DBUtil, DbTypes
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
date_type = CommonUtil.get_sys_arg(2, None)
date_info = CommonUtil.get_sys_arg(3, None)
CommonUtil.judge_is_work_hours(
site_name=site_name, date_type=date_type, date_info=date_info,
principal='chenyuanjie', priority=1, export_tools_type=1, belonging_to_process='ABA周增长'
)
db_type = DbTypes.postgresql_cluster.name
print("导出到PG集群中")
year_str, week_str = date_info.split('-')
year = int(year_str)
week = int(week_str)
export_master_tb = f"{site_name}_aba_report_week_{year}"
export_tb = f"{export_master_tb}_{week}"
next_week = week + 1
engine = DBUtil.get_db_engine(db_type, site_name)
with engine.connect() as connection:
sql = f"""
drop table if exists {export_tb};
create table if not exists {export_tb}
(
like {export_master_tb} including defaults including comments
);
"""
print("================================执行sql================================")
print(sql)
connection.execute(sql)
# 导出表名
sh = CommonUtil.build_export_sh(
site_name=site_name,
db_type=db_type,
hive_tb="dim_st_detail_week",
export_tb=export_tb,
col=[
'st_key',
'search_term',
'is_search_text',
'is_ascending_text',
'is_first_text',
'is_high_return_text',
'rank',
'search_volume',
'orders',
'rank_last_1_week',
'rank_last_4_week',
'rank_last_12_week',
'rank_change_last_1_week',
'rank_change_last_4_week',
'rank_change_last_12_week',
'rank_rate_last_1_week',
'rank_rate_last_4_week',
'rank_rate_last_12_week',
'st_word_num',
'asin1',
'asin2',
'asin3',
'product_title1',
'product_title2',
'product_title3',
'click_share1',
'click_share2',
'click_share3',
'click_share_total',
'conversion_share1',
'conversion_share2',
'conversion_share3',
'conversion_share_total',
'brand1',
'brand2',
'brand3',
'category1',
'category2',
'category3',
'quantity_being_sold',
'category_id',
'category_current_id',
'market_cycle_type',
'week',
'rank_change_1_week_ago',
'rank_change_2_week_ago',
'rank_change_3_week_ago',
'rank_rate_1_week_ago',
'rank_rate_2_week_ago',
'rank_rate_3_week_ago'
],
partition_dict={
"site_name": site_name,
"date_type": date_type,
"date_info": date_info
}
)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
client.close()
with engine.connect() as connection:
sql = f"""
update {export_tb} set keyword_tsv = to_tsvector('english_amazonword', search_term);
"""
print("================================执行sql================================")
print(sql)
connection.execute(sql)
# 将子表加入母表中并复制母表索引
DBUtil.add_pg_part(
engine,
source_tb_name=export_tb,
part_master_tb=export_master_tb,
part_val={
"from": [week],
"to": [next_week]
},
cp_index_flag=True,
)
# 插入流程记录表
sql = f"""
REPLACE INTO selection.workflow_everyday
(site_name, report_date, status, status_val, table_name, date_type, page, is_end, remark, export_db_type)
VALUES
('{site_name}', '{date_info}', '导出PG数据库', 14, '{site_name}_aba_report_week', 'week', 'ABA搜索词周报告', '是', 'ABA搜索词周报告表', 'postgresql_cluster');
"""
DBUtil.engine_exec_sql(DBUtil.get_db_engine('mysql', 'us'), sql)
print("success")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment