dim_asin_bs_info.py 11.8 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228
import os
import sys
import re

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from utils.templates import Templates
# from ..utils.templates import Templates
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
# 导入udf公共方法
from yswg_utils.common_udf import udf_parse_bs_category
# from ..yswg_utils.common_udf import udf_parse_bs_category


class DimBsAsinInfo(Templates):

    def __init__(self, site_name='us', date_type="month", date_info='2022-1'):
        super().__init__()
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        # 初始化self.spark对
        self.db_save = 'dim_asin_bs_info'
        self.spark = self.create_spark_object(
            app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}")
        self.df_save = self.spark.sql("select 1+1;")
        self.df_asin_node_id = self.spark.sql("select 1+1;")
        self.df_bs_asin_detail = self.spark.sql("select 1+1;")
        self.df_bs_category = self.spark.sql("select 1+1;")
        # 定义 UDF 的返回类型,即一个包含三个 DoubleType 字段的 StructType
        schema = StructType([
            StructField('asin_bs_cate_1_id', StringType(), True),
            StructField('asin_bs_cate_current_id', StringType(), True),
            StructField('asin_bs_cate_1_rank', IntegerType(), True),
            StructField('asin_bs_cate_current_rank', IntegerType(), True),
        ])
        # self.u_parse_bs_category = F.udf(self.udf_parse_bs_category, schema)
        self.u_parse_bs_category = F.udf(udf_parse_bs_category, schema)
        # self.pattern1_dict = {
        #     "us": "(\d+).*?See Top 100 in ".lower(),
        #     "uk": "(\d+).*?See Top 100 in ".lower(),
        #     "de": "(\d+).*?Siehe Top 100 in ".lower(),
        #     "es": "(\d+).*?Ver el Top 100 en ".lower(),
        #     "fr": "(\d+).*?Voir les 100 premiers en ".lower(),
        #     "it": "(\d+).*?Visualizza i Top 100 nella categoria ".lower(),
        # }
        self.pattern1_dict = {
            "us": "See Top 100 in ".lower(),
            "uk": "See Top 100 in ".lower(),
            "de": "Siehe Top 100 in ".lower(),
            "es": "Ver el Top 100 en ".lower(),
            "fr": "Voir les 100 premiers en ".lower(),
            "it": "Visualizza i Top 100 nella categoria ".lower(),
        }
        self.pattern_current_dict = {
            "us": "#(\d+) ",
            "uk": "(\d+) in ",
            "de": "(\d+) in ",
            "es": "(\d+) en ",
            "fr": "(\d+) en ",
            "it": "(\d+) in ",
        }
        self.partitions_by = ['site_name', 'date_type', 'date_info']
        self.reset_partitions(partitions_num=20)
        self.get_year_week_tuple()

    @staticmethod
    def udf_parse_bs_category(asin_bs_sellers_rank_lower, last_herf, all_best_sellers_href, cate_current_pattern, cate_1_pattern):

        # if (site_name == 'us' and date_type in ['month', 'month_week'] and date_info >= '2023-11') or (site_name != 'us' and date_type in ['week'] and date_info >= '2023-41'):
        #     href_list = all_best_sellers_href.split("&&&&")

        # 1. 判断用哪个字段来解析分类
        if str(all_best_sellers_href).lower() not in ['', 'none', 'null']:
            bs_href = all_best_sellers_href
        elif str(last_herf).lower() not in ['', 'none', 'null']:
            bs_href = last_herf
        else:
            bs_href = ''
        href_list = bs_href.replace("?tf=1", "").split("&&&&")

        # 2. 解析一级和当前 分类 + 排名
        # 2.1 提取分类
        if href_list:
            if len(href_list) == 1:
                cate_list = re.findall('bestsellers/(.*)/ref', href_list[0])
                if cate_list:
                    if "/" in cate_list[0]:
                        cate_1_id, cate_current_id = cate_list[0].split("/")[0], cate_list[0].split("/")[-1]
                    else:
                        cate_1_id, cate_current_id = cate_list[0].split("/")[0], None
                else:
                    cate_1_id, cate_current_id = None, None
            else:
                cate_1_id = re.findall('bestsellers/(.*)/ref', href_list[0])[0] if re.findall('bestsellers/(.*)/ref', href_list[0]) else None
                cate_current_id = re.findall('bestsellers/(.*)/ref', href_list[-1])[0] if re.findall('bestsellers/(.*)/ref', href_list[-1]) else None
                if "/" in cate_1_id:
                    cate_1_id = cate_1_id.split("/")[0]
                if "/" in cate_current_id:
                    cate_current_id = cate_current_id.split("/")[-1]

        else:
            cate_1_id, cate_current_id = None, None

        # 2.2 提取排名
        asin_bs_sellers_rank_lower2 = asin_bs_sellers_rank_lower.replace(",", "").replace(" 100 ", "")
        rank_list = re.findall(cate_current_pattern, asin_bs_sellers_rank_lower2)  # 匹配排名
        rank_list = [int(rank) for rank in rank_list]  # 转换成int类型
        if rank_list:
            if len(rank_list) == 1:
                if cate_1_pattern in asin_bs_sellers_rank_lower:
                    cate_1_rank, cate_current_rank = rank_list[0], None
                else:
                    cate_1_rank, cate_current_rank = None, rank_list[0]
            else:
                if cate_1_pattern in asin_bs_sellers_rank_lower:
                    cate_1_rank, cate_current_rank = rank_list[0], rank_list[-1]
                else:
                    cate_1_rank, cate_current_rank = None, rank_list[0]
        else:
            cate_1_rank, cate_current_rank = None, None

        return cate_1_id, cate_current_id, cate_1_rank, cate_current_rank

    def read_data(self):
        sql = f"select asin, category_id as asin_bs_cate_current_id_node, category_first_id as asin_bs_cate_1_id_node from dim_asin_detail where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info ='{self.date_info}';"  # and date_info>='2023-15'
        print(f"1. 读取dim_asin_detail表node_id数据: sql -- {sql}")
        self.df_asin_node_id = self.spark.sql(sqlQuery=sql).cache()
        self.df_asin_node_id.show(10, truncate=False)

        # 2. 读取ods_bs_category_asin_detail对应周期的详情表
        params = f" date_info <= '2022-42'" if max(self.year_week_tuple) <= '2022-42' and date_type == 'month' else f" date_info in {self.year_week_tuple}"
        sql = f"select asin, best_sellers_rank as asin_bs_sellers_rank, last_herf, all_best_sellers_href, date_type, date_info, created_at from ods_bs_category_asin_detail " \
              f"where site_name='{self.site_name}' and date_type='week' and {params};"

        if date_type in ['month', 'month_week'] and ((self.site_name == 'us' and date_info >= '2023-10') or (self.site_name in ['uk', 'de'] and self.date_info >= '2024-05')):
            sql = f"select asin, best_sellers_rank as asin_bs_sellers_rank, last_herf, all_best_sellers_href, date_type, date_info, created_at from ods_bs_category_asin_detail " \
                  f"where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}';"
        print(f"2. 读取ods_bs_category_asin_detail对应周期的详情表: sql -- {sql}")

        self.df_bs_asin_detail = self.spark.sql(sqlQuery=sql).cache()
        # self.df_bs_asin_detail = self.df_bs_asin_detail.drop_duplicates(['asin', 'date_info'])
        self.df_bs_asin_detail.show(10, truncate=False)

    def handle_df_asin_node_id(self):
        # 保留asin最新的node_id
        # self.df_asin_node_id = self.df_asin_node_id.filter("asin_bs_cate_current_id is not null")
        # window = Window.partitionBy(['asin']).orderBy(
        #     self.df_asin_node_id.date_info.desc()
        # )
        # self.df_asin_node_id = self.df_asin_node_id.withColumn(
        #     "row_number", F.row_number().over(window=window)
        # )
        # self.df_asin_node_id = self.df_asin_node_id.filter("row_number=1")
        # self.df_asin_node_id = self.df_asin_node_id.drop("row_number")
        pass  # dim_asin_detail已经去重过

    def handle_df_bs_asin_detail(self):
        # 保留asin最新的asin_bs_sellers_rank_lower
        window = Window.partitionBy(['asin']).orderBy(
            self.df_bs_asin_detail.created_at.desc()
        )
        self.df_bs_asin_detail = self.df_bs_asin_detail.withColumn(
            "row_number", F.row_number().over(window=window)
        )
        self.df_bs_asin_detail = self.df_bs_asin_detail.filter("row_number=1")
        self.df_bs_asin_detail = self.df_bs_asin_detail.drop("row_number", "date_info")

        # 小写
        self.df_bs_asin_detail = self.df_bs_asin_detail.withColumn("asin_bs_sellers_rank_lower",
                                                                   F.lower("asin_bs_sellers_rank"))
        # self.df_bs_asin_detail.show(10, truncate=False)
        # 提取分类字符串中的asin_bs_cate_1_rank, asin_bs_cate_current_rank
        # 生成当前分类匹配规则
        cate_current_pattern = self.pattern_current_dict[self.site_name]
        cate_1_pattern = self.pattern1_dict[self.site_name]
        self.df_bs_asin_detail = self.df_bs_asin_detail.withColumn(
            'asin_bs_cate_ranks',
            self.u_parse_bs_category('asin_bs_sellers_rank_lower', 'last_herf', 'all_best_sellers_href',
                                     F.lit(cate_current_pattern), F.lit(cate_1_pattern))
        )
        # self.df_bs_asin_detail.show(10, truncate=False)
        self.df_bs_asin_detail = self.df_bs_asin_detail \
            .withColumn('asin_bs_cate_1_id', self.df_bs_asin_detail.asin_bs_cate_ranks.getField('asin_bs_cate_1_id')) \
            .withColumn('asin_bs_cate_current_id',
                        self.df_bs_asin_detail.asin_bs_cate_ranks.getField('asin_bs_cate_current_id')) \
            .withColumn('asin_bs_cate_1_rank',
                        self.df_bs_asin_detail.asin_bs_cate_ranks.getField('asin_bs_cate_1_rank')) \
            .withColumn('asin_bs_cate_current_rank',
                        self.df_bs_asin_detail.asin_bs_cate_ranks.getField('asin_bs_cate_current_rank')) \
            .drop('asin_bs_cate_ranks')
        self.df_bs_asin_detail.show(10, truncate=False)
        # self.df_save = self.df_asin_node_id.join(
        #     self.df_bs_asin_detail, 'asin', how='left'
        # ).join(
        #     self.df_category_desc_id, 'asin_bs_cate_current_id', how='left'
        # )
        self.df_save = self.df_asin_node_id.join(
            self.df_bs_asin_detail, 'asin', how='left'
        )

        # 用node_id的分类去补充一级分类和当前分类
        self.df_save = self.df_save.withColumn(
            "asin_bs_cate_1_id",
            F.when(F.col("asin_bs_cate_1_id").isNull(), F.col("asin_bs_cate_1_id_node")).otherwise(F.col("asin_bs_cate_1_id"))
        ).withColumn(
            "asin_bs_cate_current_id",
            F.when(F.col("asin_bs_cate_current_id").isNull(), F.col("asin_bs_cate_current_id_node")).otherwise(F.col("asin_bs_cate_current_id"))
        )

        self.df_save = self.df_save.drop("asin_bs_sellers_rank_lower", "asin_bs_cate_1_id_node", "asin_bs_cate_current_id_node")
        self.df_save.show(20)

    def handle_data(self):
        self.handle_df_asin_node_id()
        self.handle_df_bs_asin_detail()
        self.df_save = self.df_save.withColumn("site_name", F.lit(self.site_name))
        self.df_save = self.df_save.withColumn("date_type", F.lit(self.date_type))
        self.df_save = self.df_save.withColumn("date_info", F.lit(self.date_info))


if __name__ == '__main__':
    site_name = sys.argv[1]  # 参数1:站点
    date_type = sys.argv[2]  # 参数2:类型:week/4_week/month/quarter
    date_info = sys.argv[3]  # 参数3:年-周/年-月/年-季, 比如: 2022-1
    handle_obj = DimBsAsinInfo(site_name=site_name, date_type=date_type, date_info=date_info)
    handle_obj.run()