dim_asin_stable_info_old.py 11.4 KB
import os
import sys

os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"
sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from utils.templates import Templates
from ..utils.templates import Templates
from pyspark.sql.types import StringType, FloatType, StructType, StructField
# 分组排序的udf窗口函数
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from ..yswg_utils.common_udf import parse_weight_str


class DimAsinStableInfo(Templates):

    def __init__(self, site_name='us'):
        super().__init__()
        self.site_name = site_name
        self.db_save = f'dim_asin_stable_info'
        self.spark = self.create_spark_object(app_name=f"{self.db_save}: {self.site_name}")
        self.df_asin_detail = self.spark.sql(f"select 1+1;")
        self.df_theme = self.spark.sql(f"select 1+1;")
        self.df_asin_img_url = self.spark.sql(f"select 1+1;")
        self.df_asin_title = self.spark.sql(f"select 1+1;")
        self.df_asin_weight = self.spark.sql(f"select 1+1;")
        self.df_asin_weight_new = self.spark.sql(f"select 1+1;")
        self.df_asin_weight_old = self.spark.sql(f"select 1+1;")
        self.df_asin_volume = self.spark.sql(f"select 1+1;")
        self.df_save = self.spark.sql(f"select 1+1;")
        self.partitions_by = ['site_name']
        self.reset_partitions(100)
        self.window = Window.orderBy(['asin']).orderBy(F.desc("date_info"))  # 按照 date_info 列进行分区,并按照 date 列进行排序

        schema = StructType([
            StructField('weight', FloatType(), True),
            StructField('weight_type', StringType(), True),
        ])
        self.u_get_weight = F.udf(parse_weight_str, schema)
        self.weight_type = 'pounds' if site_name == 'us' else 'grams'

        self.db_save_vertical = f'dim_asin_title_info_vertical'  # 主题竖表

    def sort_by_latest(self, df):
        df = df.withColumn('row_number', F.row_number().over(self.window))  # 使用窗口函数为每个分区的行编号
        df = df.filter(df.row_number == 1).drop(
            'row_number')  # 只保留每个分区中 row_number 最大的行,并删除 row_number 列
        return df

    def read_data(self):
        sql = f"select asin, img_url, title, weight, weight_str, volume, date_info " \
              f"from ods_asin_detail where site_name='{self.site_name}' and date_type='week';"
        print("sql:", sql)
        self.df_asin_detail = self.spark.sql(sql).cache()
        self.df_asin_detail.show(10, truncate=False)
        sql = f"select id as theme_id, theme_type_en, theme_en, theme_en_lower, theme_ch from ods_theme where site_name='{self.site_name}'"
        print("sql:", sql)
        self.df_theme = self.spark.sql(sql).cache()
        self.df_theme.show(10, truncate=False)

    def handle_data(self):
        self.handle_img_url()
        self.handle_title()
        self.handle_weight()
        self.handle_volume()

    def handle_img_url(self):
        self.df_asin_img_url = self.df_asin_detail.select("asin", "img_url").filter("img_url is not null")
        self.df_asin_img_url = self.df_asin_img_url.filter(self.df_asin_img_url.asin_img_url.contains('amazon'))  # 保留包含amazon的字符串记录
        self.df_asin_img_url = self.sort_by_latest(df=self.df_asin_img_url)
        for i in range(1, 10, 1):
            self.df_asin_img_url = self.df_asin_img_url.withColumn(f"asin_trun_{i}", F.substring(self.df_asin_img_url.asin, 1, 1))
        self.df_asin_img_url = self.df_asin_img_url.withColumn(
            "asin_img_path",
            F.concat(
                F.lit("/"), self.df_asin_img_url.asin_trun_1,
                F.lit("/"), self.df_asin_img_url.asin_trun_2,
                F.lit("/"), self.df_asin_img_url.asin_trun_3,
                F.lit("/"), self.df_asin_img_url.asin_trun_4,
                F.lit("/"), self.df_asin_img_url.asin_trun_5,
                F.lit("/"), self.df_asin_img_url.asin_trun_6,
                F.lit("/")
            )
        )

    def handle_title(self):
        # 过滤null和none字符串
        self.df_asin_title = self.df_asin_detail.select("asin", "title").filter("title is not null and title not in ('none', 'null', 'nan')")
        # 小写
        self.df_asin_title = self.df_asin_title.withColumn("title_lower", F.lower(self.df_asin_title["title"]))  # 小写
        # 取最新的date_info对应的title
        self.df_asin_title = self.sort_by_latest(df=self.df_asin_title)
        # 匹配主题数据
        self.handle_title_theme()
        self.reset_partitions(partitions_num=100)
        self.save_data_common(
            df_save=self.df_save_vertical,
            db_save=self.db_save_vertical,
            partitions_num=self.partitions_num,
            partitions_by=self.partitions_by
        )

    def handle_title_theme(self):
        pdf_theme = self.df_theme.toPandas()
        theme_list = list(set(pdf_theme.theme_en_lower))
        self.theme_list_str = str([f" {theme} " for theme in theme_list])
        print("self.theme_list_str:", self.theme_list_str)
        # 匹配宽表时用到
        df_asin_title = self.df_asin_title.cache()  # 后面用作匹配asin_title

        self.df_asin_title = self.df_asin_title.withColumn("asin_title_lower", F.concat(F.lit(" "), "asin_title_lower", F.lit(" ")))  # 标题两头加空字符串用来匹配整个词
        self.df_asin_title = self.df_asin_title.withColumn("theme_en_lower", self.u_theme_pattern('asin_title_lower', F.lit(self.theme_list_str)))
        # 将列拆分为数组多列
        self.df_asin_title = self.df_asin_title.withColumn("theme_en_lower", F.split(self.df_asin_title["theme_en_lower"], ","))
        # 将数组合并到多行
        self.df_asin_title = self.df_asin_title.withColumn("theme_en_lower", F.explode(self.df_asin_title["theme_en_lower"]))
        self.df_asin_title = self.df_asin_title.join(
            self.df_theme, on=['theme_en_lower'], how='left'  # 改成inner, 这样避免正则匹配结果不准
        )
        # 1. 竖表
        self.df_save_vertical = self.df_asin_title.cache()
        print(self.df_save_vertical.columns)
        self.df_save_vertical.show(30, truncate=False)
        # self.df_save_vertical.filter("theme_en_lower is not null").show(30, truncate=False)

        # 2. 宽表
        self.df_asin_title = self.df_asin_title.drop_duplicates(['asin', 'theme_type_en', 'theme_ch'])
        self.df_asin_title = self.df_asin_title.withColumn("theme_type_en_counts", F.concat("theme_type_en", F.lit("_counts")))
        self.df_asin_title = self.df_asin_title.withColumn("theme_type_en_ids", F.concat("theme_type_en", F.lit("_ids")))
        # self.df_asin_title.filter('theme_type_en_counts is null').show(20, truncate=False)   # 没有记录
        self.df_asin_title = self.df_asin_title.filter('theme_type_en_counts is not null')
        pivot_df1 = self.df_asin_title.groupBy("asin").pivot("theme_type_en_counts").agg(
            F.expr("IFNULL(count(*), 0) AS value"))
        pivot_df1 = pivot_df1.na.fill(0)
        pivot_df2 = self.df_asin_title.groupBy("asin").pivot("theme_type_en_ids").agg(
            F.concat_ws(",", F.collect_list("theme_id")))
        pivot_df1.show(30, truncate=False)
        pivot_df2.show(30, truncate=False)
        self.df_save_wide = df_asin_title.join(
            pivot_df1, on='asin', how='left'
        ).join(
            pivot_df2, on='asin', how='left'
        )
        # self.df_save_wide.show(30, truncate=False)
        print(self.df_save_wide.columns)

    def handle_weight(self):
        self.df_asin_weight_new = self.df_asin_detail.select("asin", "weight", "weight_str").filter("date_info >= '2023-18'").cache()
        self.df_asin_weight_old = self.df_asin_detail.select("asin", "weight", "weight_str").filter("date_info < '2023-18'").cache()
        self.handle_weight_new()
        self.handle_weight_old()
        print("self.df_asin_weight.columns:", self.df_asin_weight.columns)
        print("self.df_asin_weight_old.columns:", self.df_asin_weight_old.columns)
        self.df_asin_weight = self.df_asin_weight_new.unionByName(self.df_asin_weight_old, allowMissingColumns=True)
        self.df_asin_weight = self.sort_by_latest(df=self.df_asin_weight)
        # 将weight列中的'none'转为null,并转为浮点数类型
        self.df_asin_weight = self.df_asin_weight.withColumn("weight", F.when(F.col("weight") == 'none', None).otherwise(
                                                                 F.col("weight").cast(FloatType())))
        # weight列中小于等于0.001的值设为0.001
        self.df_asin_weight = self.df_asin_weight.withColumn("weight", F.when(F.col("weight") <= 0.001, 0.001).otherwise(F.col("weight")))
        # 保留4位小数
        self.df_asin_weight = self.df_asin_weight.withColumn("weight", F.round(self.df_asin_weight["weight"], 4))
        # self.df_asin_weight.show(20, truncate=False)


        self.df_asin_weight = self.df_asin_weight.withColumnRenamed(
            "weight_str", "asin_weight_str"
        ).withColumnRenamed(
            "weight", "asin_weight"
        ).withColumnRenamed(
            "weight_type", "asin_weight_type"
        )

    def handle_weight_new(self):
        print("开始处理重量数据: 2023-18周之后")
        # 将列类型转为字符串并转为小写
        self.df_asin_weight_new = self.df_asin_weight_new.withColumn("weight_str", F.lower(F.col("weight_str").cast(StringType())))
        # 提取体积字符串中的weight_info, weight_type
        self.df_asin_weight_new = self.df_asin_weight_new.withColumn('weight_detail', self.u_get_weight('weight_str', 'site_name'))
        self.df_asin_weight_new = self.df_asin_weight_new \
            .withColumn('weight', self.df_asin_weight_new.weight_detail.getField('weight')) \
            .withColumn('weight_type', self.df_asin_weight_new.weight_detail.getField('weight_type')) \
            .drop('weight_detail')

        # # 将weight列中的'none'转为null,并转为浮点数类型
        # self.df_asin_weight_new = self.df_asin_weight_new.withColumn("weight", F.when(F.col("weight") == 'none', None).otherwise(
        #                                                          F.col("weight").cast(FloatType())))
        #
        # # weight列中小于等于0.001的值设为0.001
        # self.df_asin_weight = self.df_asin_weight.withColumn("weight", F.when(F.col("weight") <= 0.001, 0.001).otherwise(F.col("weight")))

        # # 将weight_str列中的'none'转为null
        # self.df_asin_weight = self.df_asin_weight.withColumn("weight_str", F.when(F.col("weight_str") == 'none', None).otherwise(F.col("weight_str")))

    def handle_weight_old(self):
        print("开始处理重量数据: 2023-18周之前")
        self.df_asin_weight_old = self.df_asin_weight_old.withColumn("weight_type", F.lit(self.weight_type))
        window = Window.partitionBy(['asin']).orderBy(self.df_asin_weight_old.date_info.desc())
        self.df_asin_weight_old = self.df_asin_weight_old.withColumn(
            "row_number", F.row_number().over(window)
        )
        self.df_asin_weight_old = self.df_asin_weight_old.withColumn('row_number',
                                                                     F.row_number().over(window))  # 使用窗口函数为每个分区的行编号
        self.df_asin_weight_old = self.df_asin_weight_old.filter(self.df_asin_weight_old.row_number == 1).drop(
            'row_number')  # 只保留每个分区中 row_number 最大的行,并删除 row_number 列

    def handle_volume(self):
        pass