dwt_st_top_asin_info.py 7.54 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168
"""
   @Author      : HuangJian
   @Description : 找到月关键词下zr类型的top10asin和ac类型asin
   @SourceTable :
                  1.ods_st_key
                  2.dim_st_asin_info
                  3.dwd_st_measure

   @SinkTable   : dwt_st_top_asin_info
   @CreateTime  : 2023/02/13 17:01
   @UpdateTime  : 2023/02/13 17:01
"""

import os
import sys
import re
from functools import reduce

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from utils.templates import Templates
# from ..utils.templates import Templates
# 分组排序的udf窗口函数
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, IntegerType, DoubleType
from utils.common_util import CommonUtil, DateTypes


class DwtStTopAsinInfo(Templates):
    def __init__(self, site_name="us", date_type="week", date_info="2022-1"):
        super().__init__()
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        self.db_save = f"dwt_st_top_asin_info"
        self.spark = self.create_spark_object(
            app_name=f"{self.db_save}, {self.site_name}, {self.date_type}, {self.date_info}")
        self.partitions_num = 10
        self.reset_partitions(partitions_num=self.partitions_num)
        self.partitions_by = ['site_name', 'date_type', 'date_info']

        self.get_date_info_tuple()
        self.get_year_month_days_dict(year=int(self.year))
        # 获取周流程的周tuple整合数据
        self.complete_date_info_tuple = CommonUtil.transform_week_tuple(self.spark, self.date_type, self.date_info)
        self.df_st_key = self.spark.sql(f"select 1+1;")
        self.df_st_asin_zr = self.spark.sql(f"select 1+1;")
        self.df_st_asin_ac = self.spark.sql(f"select 1+1;")
        self.df_st_zr = self.spark.sql(f"select 1+1;")
        self.df_st_ac = self.spark.sql(f"select 1+1;")
        self.df_zr_info = self.spark.sql(f"select 1+1;")
        self.df_ac_info = self.spark.sql(f"select 1+1;")
        self.df_save = self.spark.sql(f"select 1+1;")
        self.date_sql = self.date_sql_padding()

    def date_sql_padding(self):
        if self.site_name == 'us':
            if self.date_type == DateTypes.month_week.name:
                date_sql = f" and date_type='{self.date_type}' and date_info = '{self.date_info}'"
            elif self.date_type == DateTypes.month.name and self.date_info >= '2023-10':
                date_sql = f" and date_type='{self.date_type}' and date_info = '{self.date_info}'"
            else:
                date_sql = f"and date_type='week' and date_info in {self.complete_date_info_tuple}"
        elif self.site_name in ['uk', 'de'] and self.date_type == DateTypes.month.name and self.date_info >= '2024-05':
            date_sql = f" and date_type='{self.date_type}' and date_info = '{self.date_info}'"
        else:
            date_sql = f" and date_type='week' and date_info in {self.complete_date_info_tuple}"
        print(date_sql)
        return date_sql

    def read_data(self):
        print("1.1 读取st的key: ods_st_key表")
        sql = f"select search_term,cast(st_key as int) as search_term_id from ods_st_key where site_name = '{self.site_name}' "
        print("sql:", sql)
        self.df_st_key = self.spark.sql(sqlQuery=sql)

        print("1.3 读取关键词的zr类型数据: ods_search_term_zr")
        sql = f"""select 
            search_term,
            asin,
            page_row as zr_rank,
            'zr' as data_type,date_info from ods_search_term_zr 
            where
            site_name='{self.site_name}' {self.date_sql} and page_row <=10 ;"""
        print("sql:", sql)
        self.df_st_asin_zr = self.spark.sql(sqlQuery=sql).cache()

        print("1.4 读取关键词的ac类型数据: ods_search_term_ac")
        sql = f"""select search_term,
                         asin,
                         null as zr_rank,
                         'ac' as data_type
                  from (
                           select
                                  search_term,
                                  asin,
                                  row_number() over (partition by search_term order by date_info desc, updated_time asc) as ac_rank
                           from ods_search_term_ac
                           where site_name = '{self.site_name}' {self.date_sql} and page = 1) t1
                  where t1.ac_rank = 1;"""
        print("sql:", sql)
        self.df_st_asin_ac = self.spark.sql(sqlQuery=sql).cache()

    def handle_data(self):
        self.handle_st_asin_duplicated()
        self.handle_st_asin_join()

        # 日期字段补全
        self.df_save = self.df_save.withColumn("created_time",
                                               F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS'))
        self.df_save = self.df_save.withColumn("updated_time",
                                               F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS'))
        # 补全分区字段
        self.df_save = self.df_save.withColumn("site_name", F.lit(self.site_name))
        self.df_save = self.df_save.withColumn("date_type", F.lit(self.date_type))
        self.df_save = self.df_save.withColumn("date_info", F.lit(self.date_info))

    def handle_st_asin_duplicated(self):
        self.df_zr_info = self.df_st_asin_zr.select("search_term", "date_info")

        # 对zr类型搜索词进行去重:按搜索词取日期最大 得到最后出现该搜索词日期
        window_zr = Window.partitionBy(['search_term']).orderBy(
            self.df_zr_info.date_info.desc()
        )
        self.df_zr_info = self.df_zr_info \
            .withColumn("zr_rank_row", F.row_number().over(window=window_zr))
        self.df_zr_info = self.df_zr_info.filter("zr_rank_row=1")
        self.df_zr_info = self.df_zr_info.drop("zr_rank_row")
        # self.df_st_asin_zr = self.df_st_asin_zr.cache()

        # 得到最近日期zr类型的st-asin
        self.df_st_asin_zr = self.df_zr_info.join(
            self.df_st_asin_zr, on=['search_term', 'date_info'], how="inner")
        self.df_st_asin_zr = self.df_st_asin_zr.drop("date_info")
        # 对zr类型的同周期下进行去重按照st-asin-rank

        self.df_st_asin_zr = self.df_st_asin_zr.drop_duplicates(['search_term', 'asin', 'zr_rank'])

        # 对zr词进行二次排序
        self.df_st_asin_zr = self.df_st_asin_zr.groupby(['search_term', 'asin', 'data_type']).agg(
            F.max('zr_rank').alias('max_zr_rank')
        )

        print(self.df_st_asin_zr.columns)

        window_zr_rank = Window.partitionBy(['search_term']).orderBy(
            self.df_st_asin_zr.max_zr_rank.asc()
        )
        self.df_st_asin_zr = self.df_st_asin_zr.withColumn('zr_rank', F.row_number().over(window=window_zr_rank))
        self.df_st_asin_zr = self.df_st_asin_zr.filter("zr_rank <= 10")
        self.df_st_asin_zr = self.df_st_asin_zr.drop('max_zr_rank')

    def handle_st_asin_join(self):
        # 将zr类型关键词和ac类型关键词union
        df_st_asin_union = self.df_st_asin_zr.unionByName(self.df_st_asin_ac, allowMissingColumns=False)

        # st关联获取search_term_id
        self.df_save = df_st_asin_union.join(
            self.df_st_key, on=['search_term'], how="inner"
        )


if __name__ == '__main__':
    site_name = sys.argv[1]  # 参数1:站点
    date_type = sys.argv[2]  # 参数2:类型:week/4_week/month/quarter
    date_info = sys.argv[3]  # 参数3:年-周/年-月/年-季, 比如: 2022-1
    handle_obj = DwtStTopAsinInfo(site_name=site_name, date_type=date_type, date_info=date_info)
    handle_obj.run()