dim_bs_asin_info.py 11.4 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221
"""
# author: 方星钧(ffman)
# description: 基于ods_bs_category,ods_bs_category_asin_detail表,计算出dim_asin_bs_category维度的asin的bs分类情况
# table_read_name: ods_bs_category,ods_bs_category_asin_detail
# table_save_name: dim_asin_bs_category
# table_save_level: dim
# version: 1.0
# created_date: 2022-11-03
# updated_date: 2022-11-03
"""

import os
import sys
import re

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from utils.templates import Templates
# from ..utils.templates import Templates
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
from pyspark.sql.window import Window


class DimBsAsinInfo(Templates):

    def __init__(self, site_name='us', date_type="month", date_info='2022-1'):
        super().__init__()
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        # 初始化self.spark对
        self.db_save = 'dim_asin_bs_category'
        self.spark = self.create_spark_object(
            app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}")
        self.df_save = self.spark.sql("select 1+1;")
        self.df_save_list = []
        self.df_cate = self.spark.sql("select 1+1;")
        self.df_cate_detail = self.spark.sql("select 1+1;")
        self.df_cate_asin_detail = self.spark.sql("select 1+1;")
        self.pattern_1 = ''  # 一级分类正则匹配
        self.pattern_current = ''  # 当前分类正则匹配
        self.node_1_list = []  # 一级分类名称对应的列表
        self.partitions_by = ['site_name', 'date_type', 'date_info']
        self.reset_partitions(partitions_num=20)
        self.u_cate_1_en_name = self.spark.udf.register("u_cate_1_en_name", self.udf_cate_1_en_name, StringType())
        self.u_cate_current_en_name = self.spark.udf.register("u_cate_current_en_name", self.udf_cate_current_en_name,
                                                              StringType())

    @staticmethod
    def udf_cate_1_en_name(best_sellers_rank, site_name, cate_1_pattern):
        if site_name in ['us', 'uk']:
            pattern_1 = f"See Top 100 in ({cate_1_pattern})"
        elif site_name in ['de']:
            pattern_1 = f"Siehe Top 100 in ({cate_1_pattern})"
        elif site_name in ['es']:
            pattern_1 = f"Ver el Top 100 en ({cate_1_pattern})"
        elif site_name in ['fr']:
            pattern_1 = f"Voir les 100 premiers en ({cate_1_pattern})"
        elif site_name in ['it']:
            pattern_1 = f"Visualizza i Top 100 nella categoria ({cate_1_pattern})"
        else:
            pattern_1 = ''
        result_pattern = re.findall(pattern_1, best_sellers_rank)[-1] if re.findall(pattern_1,
                                                                                    best_sellers_rank) else ''
        if result_pattern == '':
            result_pattern = re.findall(cate_1_pattern, best_sellers_rank)[-1] if re.findall(cate_1_pattern,
                                                                                             best_sellers_rank) else ''
        return result_pattern

    @staticmethod
    def udf_cate_current_en_name(best_sellers_rank, site_name):
        if site_name in ['us', 'uk', 'de', 'it']:
            pattern_current = f"(\d+ in )"
        elif site_name in ['es', 'fr']:
            # pattern_current = f"\d+[.,]{0,}\d{0,} en "
            pattern_current = f"(\d+ en )"
        else:
            pattern_current = ''
        result_pattern = re.findall(pattern_current, best_sellers_rank)[-1] if re.findall(pattern_current,
                                                                                          best_sellers_rank) else ''
        r_id = best_sellers_rank.rfind(result_pattern)
        r_id_2 = best_sellers_rank.rfind(" (")
        if r_id != -1:
            if r_id_2 != -1 and r_id_2 > r_id:
                return best_sellers_rank[(r_id + len(result_pattern)):r_id_2]
            else:
                if ")" in best_sellers_rank[(r_id + len(result_pattern)):]:
                    return ''
                else:
                    return best_sellers_rank[r_id + len(result_pattern):]
        else:
            return ''

    def read_data(self):
        sql = f"select id as bs_id, en_name, nodes_num, one_category_id, and_en_name from ods_bs_category where site_name='{self.site_name}';"
        self.df_cate = self.spark.sql(sqlQuery=sql).cache()
        self.df_cate.show(10, truncate=False)
        # sql = f"select * from ods_{self.site_name}_bs_category_detail;"
        # self.df_cate_detail = self.spark.sql(sqlQuery=sql)
        sql = f"select asin, best_sellers_rank, site_name from ods_bs_category_asin_detail " \
              f"where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}';"
        print("sql:", sql)
        self.df_cate_asin_detail = self.spark.sql(sqlQuery=sql).cache()
        self.df_cate_asin_detail.show(10, truncate=False)

    def handle_data(self):
        self.handle_cate_1()
        self.handle_cate_current()
        self.handle_cate_by_en_name()
        self.df_save = self.df_cate_asin_detail
        self.df_save = self.df_save.withColumn("site_name", F.lit(self.site_name))
        self.df_save = self.df_save.withColumn("date_type", F.lit(self.date_type))
        self.df_save = self.df_save.withColumn("date_info", F.lit(self.date_info))
        # self.df_save.show(10, truncate=False)

    def handle_cate_1(self):
        """
        匹配一级分类
        """
        # 1. 通过see top 100 in 进行匹配一级分类
        df_cate_1 = self.df_cate.filter("nodes_num==2").toPandas()
        pattern1_list = df_cate_1.en_name.to_numpy()
        # pattern_list = [f"{cate}" for cate in list(df_cate_1.en_name.to_numpy())]
        pattern = "|".join(pattern1_list)
        self.df_cate_asin_detail = self.df_cate_asin_detail.withColumn(
            "bs_cate_1_pattern",
            F.lit(pattern)
        )
        self.df_cate_asin_detail = self.df_cate_asin_detail.withColumn(
            "bs_cate_1_en_name",
            self.u_cate_1_en_name("best_sellers_rank", "site_name", "bs_cate_1_pattern")
        )
        self.df_cate_asin_detail = self.df_cate_asin_detail.drop("bs_cate_1_pattern")
        # self.df_cate_asin_detail.show(10)
        print(self.df_cate_asin_detail.filter("bs_cate_1_en_name =''").count(), self.df_cate_asin_detail.count())

    def handle_cate_current(self):
        """
        当前分类:进行正则匹配的规则
        """
        # df_cate_asin['rank_cuurent'] = df_cate_asin.best_sellers_rank.apply(
        #     lambda x: re.findall('\d+[.,]{0,}\d{0,} in ', x)[-1] if re.findall('\d+[.,]{0,}\d{0,} in ', x) else np.nan)
        self.df_cate_asin_detail = self.df_cate_asin_detail.withColumn(
            "bs_cate_current_en_name",
            self.u_cate_current_en_name('best_sellers_rank', 'site_name')
        )
        # self.df_cate_asin_detail.show(10)
        print(self.df_cate_asin_detail.filter("bs_cate_current_en_name =''").count(), self.df_cate_asin_detail.count())

    def handle_cate_by_en_name(self):
        print("1. 通过en_name匹配bs_cate_1_id和bs_cate_current_id")
        df_cate_1 = self.df_cate.filter("nodes_num=2").select("bs_id", "en_name")
        df_cate_1 = df_cate_1.withColumnRenamed("en_name", "bs_cate_1_en_name").withColumnRenamed("bs_id",
                                                                                                  "bs_cate_1_id")
        df_cate_current = self.df_cate.filter("nodes_num>2").select("bs_id", "en_name")
        df_cate_current = df_cate_current.withColumnRenamed("en_name", "bs_cate_current_en_name").withColumnRenamed(
            "bs_id", "bs_cate_current_id")
        self.df_cate_asin_detail = self.df_cate_asin_detail.join(
            df_cate_1, on=['bs_cate_1_en_name'], how='left'
        )
        # print("self.df_cate_asin_detail1:", self.df_cate_asin_detail.count())
        self.df_cate_asin_detail = self.df_cate_asin_detail.join(
            df_cate_current, on=['bs_cate_current_en_name'], how='left'
        )
        print("2. 当bs_cate_current_id是null的时候, 通过当前分类的别名去匹配bs_cate_current_id")
        df_current_id_not_null = self.df_cate_asin_detail.filter("bs_cate_current_id is not null")
        df_current_id_null = self.df_cate_asin_detail.filter("bs_cate_current_id is null").drop("bs_cate_current_id")
        df_cate_and_en_name = self.df_cate.filter("nodes_num>2").select("and_en_name", "bs_id") \
            .withColumnRenamed("and_en_name", "bs_cate_current_en_name") \
            .withColumnRenamed("bs_id", "bs_cate_current_id")
        df_current_id_null = df_current_id_null.join(
            df_cate_and_en_name, on="bs_cate_current_en_name", how='left'
        )
        self.df_cate_asin_detail = df_current_id_not_null.unionByName(
            df_current_id_null, allowMissingColumns=True
        )
        # 3. 匹配1级分类为null的情况
        print("3.1 根据asin, bs_cate_1_id去重, null值在后")
        # print("3.2 当bs_cate_1_id为null时, 通过bs_cate_current_id匹配bs_cate_1_id")  # 没有一级分类,就不进行匹配
        window = Window.partitionBy(['asin', 'bs_cate_1_id']).orderBy(
            self.df_cate_asin_detail.bs_cate_current_id.asc_nulls_last(),
        )
        self.df_cate_asin_detail = self.df_cate_asin_detail.withColumn(
            "asin_top", F.row_number().over(window=window)
        )
        self.df_cate_asin_detail = self.df_cate_asin_detail.filter("asin_top=1")
        self.df_cate_asin_detail = self.df_cate_asin_detail.drop("asin_top")

        # 暂时去掉通过当前分类寻找一级分类
        # df_1_id_not_null = self.df_cate_asin_detail.filter("bs_cate_1_id is not null")
        # df_1_id_null = self.df_cate_asin_detail.filter("bs_cate_1_id is null").drop("bs_cate_1_id")
        # # print("df_1_id_null:", df_1_id_null.show(10))
        # df_cate_1_id = self.df_cate.filter("nodes_num>2").select("bs_id", "one_category_id") \
        #     .withColumnRenamed("bs_id", "bs_cate_current_id") \
        #     .withColumnRenamed("one_category_id", "bs_cate_1_id")
        # df_1_id_null = df_1_id_null.join(
        #     df_cate_1_id, on="bs_cate_current_id", how='left'
        # )
        # self.df_cate_asin_detail = df_1_id_not_null.unionByName(
        #     df_1_id_null, allowMissingColumns=True
        # )
        # # print("df_1_id_not_null, df_1_id_null:", df_1_id_not_null.count(), df_1_id_null.count())
        # # print("df_1_id_null:", df_1_id_null.show(10))
        # # print("self.df_cate_asin_detail:", self.df_cate_asin_detail.count(), self.df_cate_asin_detail.show(10))

        print("4. 重命名字段名称, 和hive表的字段名称保持一致")

        self.df_cate_asin_detail = self.df_cate_asin_detail. \
            withColumnRenamed("best_sellers_rank", "asin_bs_sellers_rank"). \
            withColumnRenamed("bs_cate_1_id", "asin_bs_cate_1_id"). \
            withColumnRenamed("bs_cate_current_id", "asin_bs_cate_current_id"). \
            withColumnRenamed("bs_cate_1_en_name", "asin_bs_cate_1_en_name"). \
            withColumnRenamed("bs_cate_current_en_name", "asin_bs_cate_current_en_name")


if __name__ == '__main__':
    site_name = sys.argv[1]  # 参数1:站点
    date_type = sys.argv[2]  # 参数2:类型:week/4_week/month/quarter
    date_info = sys.argv[3]  # 参数3:年-周/年-月/年-季, 比如: 2022-1
    handle_obj = DimBsAsinInfo(site_name=site_name, date_type=date_type, date_info=date_info)
    handle_obj.run()