dwt_theme_bsr_orders_new.py 9.12 KB
import os
import sys
import re

import numpy as np

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from utils.templates import Templates
# from ..utils.templates import Templates
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, FloatType, StructType, StructField


class DwtThemeBsOrders(Templates):

    def __init__(self, site_name='us', date_type="month", date_info='2022-1'):
        super().__init__()
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        self.db_save = f'dwt_theme_bs_orders'
        self.spark = self.create_spark_object(app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}")
        self.df_theme = self.spark.sql(f"select 1+1;")
        self.df_flow = self.spark.sql(f"select 1+1;")
        self.df_save = self.spark.sql(f"select 1+1;")
        self.partitions_by = ['site_name']
        self.partitions_num = 10
        # 注册自定义函数 (UDF)
        self.u_theme_pattern = F.udf(self.udf_theme_pattern, StringType())

    @staticmethod
    def udf_theme_pattern(title, theme_list_str):
        found_themes = [theme.strip() for theme in eval(theme_list_str) if theme in title]
        if found_themes:
            return ','.join(set(found_themes))
        else:
            return None

    def read_data(self):
        # sql = f"select asin, title as asin_title, bsr_orders, dt as date_info from selection_off_line.dwt_asin_month where site='{self.site_name}' and " \
        #       f"dt in ('2022_7', '2022_8', '2022_9', '2022_10', '2022_11', '2022_12') " \
        #       f"union all " \
        #       f"select asin, asin_title, bsr_orders, date_info from dwt_flow_asin where site_name='{self.site_name}' and " \
        #       f"date_type='month' and date_info in ('2023-01', '2023-02', '2023-03', '2023-04', '2023-05', '2023-06');"
        sql = f"select asin, asin_title, bsr_orders, date_info from dwt_flow_asin where site_name='{self.site_name}' and " \
              f"date_type='month' and date_info >= '2023-01' and date_info <= '2023-12';"
              # f"date_type='month' and date_info >= '2023-01' and date_info <= '2023-01' limit 1000000;"

        print("sql:", sql)
        self.df_flow = self.spark.sql(sql).cache()
        self.df_flow.show(10, truncate=False)

        sql = f"select id as theme_id, theme_type_en, theme_en, theme_en_lower, theme_ch from ods_theme where site_name='{self.site_name}'"
        print("sql:", sql)
        self.df_theme = self.spark.sql(sql).cache()
        self.df_theme.show(10, truncate=False)

    def handle_data_new2(self):
        # 将主题列表转换为一个单列的DataFrame
        pdf_theme = self.df_theme.toPandas()
        theme_list = list(set(pdf_theme['theme_en_lower']))

        # 创建一个包含所有主题的正则表达式模式
        theme_pattern = '|'.join([f"\\b{theme}\\b" for theme in theme_list])
        print(f"theme_pattern: {len(theme_list), theme_pattern[:100]}")
        # 准备标题数据,转换为小写
        self.df_flow = self.df_flow.withColumn("asin_title_lower", F.lower(F.col("asin_title")))

        # 使用正则表达式在标题中查找主题
        self.df_flow = self.df_flow.withColumn("matched_theme", F.regexp_extract(F.col("asin_title_lower"), theme_pattern, 0))

        # 过滤出未找到匹配主题的行
        self.df_flow = self.df_flow.filter(F.col("matched_theme") != "")

        # 将找到的主题与主题DataFrame进行关联
        self.df_flow = self.df_flow.join(self.df_theme, self.df_flow['matched_theme'] == self.df_theme['theme_en_lower'], 'inner')

        # 去除重复项
        self.df_flow = self.df_flow.dropDuplicates(['asin', 'date_info', 'theme_ch'])

        # 按主题和日期对数据进行分组并聚合
        # self.df_save = self.df_flow.groupBy("theme_en_lower", "date_info").agg(
        #     F.sum("bsr_orders").alias("total_bsr_orders"),
        #     F.count("asin").alias("total_asins")
        # )
        self.df_save = self.df_flow.groupBy("theme_ch").pivot("date_info").agg(
            F.sum("bsr_orders"), F.count("asin")
        )

        # 添加站点名称列
        self.df_save = self.df_save.withColumn('site_name', F.lit(self.site_name))

        # 缓存最终结果以提高性能
        self.df_save = self.df_save.cache()

        # 展示最终结果
        self.df_save.show(50, truncate=False)
        df = self.df_save.toPandas()
        df.to_csv("/root/theme_new.csv", index=False)

    def handle_data_new1(self):
        # 将主题列表转换为DataFrame
        pdf_theme = self.df_theme.toPandas()
        theme_list = list(set(pdf_theme['theme_en_lower']))
        df_themes = self.spark.createDataFrame([(theme,) for theme in theme_list], ['theme'])

        # 准备标题数据,加上空格并转换为小写
        self.df_flow = self.df_flow.withColumn("asin_title_lower",
                                               F.lower(F.concat(F.lit(" "), F.col("asin_title"), F.lit(" "))))

        # 为每个主题创建一个标志列,表示标题是否包含该主题
        for theme in theme_list:
            self.df_flow = self.df_flow.withColumn(f"theme_{theme}",
                                                   F.when(F.col("asin_title_lower").contains(f" {theme} "),
                                                          1).otherwise(0))

        # 将主题DataFrame与流DataFrame关联,只保留匹配的行
        for theme in theme_list:
            self.df_flow = self.df_flow.join(df_themes, self.df_flow[f"theme_{theme}"] == 1, 'left_outer').drop(
                f"theme_{theme}")

        # 过滤出包含至少一个主题的记录
        self.df_flow = self.df_flow.filter(self.df_flow['theme'].isNotNull())

        # 将主题DataFrame中的其他列(如果有的话)添加到流DataFrame中
        self.df_flow = self.df_flow.join(self.df_theme, self.df_flow['theme'] == self.df_theme['theme_en_lower'],
                                         'inner')

        # 去除重复项
        self.df_flow = self.df_flow.dropDuplicates(['asin', 'date_info', 'theme'])

        # 按主题和日期对数据进行分组并聚合
        self.df_save = self.df_flow.groupBy("theme", "date_info").agg(
            F.sum("bsr_orders").alias("total_bsr_orders"),
            F.count("asin").alias("total_asins")
        )

        # 添加站点名称列
        self.df_save = self.df_save.withColumn('site_name', F.lit(self.site_name))

        # 缓存最终结果以提高性能
        self.df_save = self.df_save.cache()

        # 展示最终结果
        self.df_save.show(50, truncate=False)

    def handle_data(self):
        pdf_theme = self.df_theme.toPandas()
        theme_list = list(set(pdf_theme.theme_en_lower))
        self.theme_list_str = str([f" {theme} " for theme in theme_list])
        print("self.theme_list_str:", self.theme_list_str)

        # 小写
        self.df_flow = self.df_flow.withColumn("asin_title_lower", F.lower(self.df_flow["asin_title"]))
        # 过滤空值
        self.df_flow = self.df_flow.filter("asin_title_lower is not null")
        # 过滤null和none字符串
        self.df_flow = self.df_flow.filter("asin_title_lower not in ('none', 'null', 'nan')")

        self.df_flow = self.df_flow.withColumn("asin_title_lower", F.concat(F.lit(" "), "asin_title_lower", F.lit(" ")))  # 标题两头加空字符串用来匹配整个词
        self.df_flow = self.df_flow.withColumn("theme_en_lower", self.u_theme_pattern('asin_title_lower', F.lit(self.theme_list_str)))
        # 将列拆分为数组多列
        self.df_flow = self.df_flow.withColumn("theme_en_lower", F.split(self.df_flow["theme_en_lower"], ","))
        # 将数组合并到多行
        self.df_flow = self.df_flow.withColumn("theme_en_lower", F.explode(self.df_flow["theme_en_lower"]))
        self.df_flow = self.df_flow.join(
            self.df_theme, on=['theme_en_lower'], how='left'  # 改成inner, 这样避免正则匹配结果不准
        )
        # self.df_flow.show(50, truncate=False)

        # self.df_flow = self.df_flow.filter("bsr_orders >0")
        # self.df_theme = self.df_theme.drop_duplicates(['asin', 'theme_ch'])
        self.df_flow = self.df_flow.drop_duplicates(['asin', 'date_info', 'theme_ch'])
        self.df_save = self.df_flow
        # self.df_save = self.df_flow.join(
        #     self.df_theme, on='asin', how='inner'
        # )
        # self.df_save.show(30, truncate=False)
        # pivot_df1 = self.df_asin_title.groupBy("asin").pivot("theme_type_en_counts").agg(
        #     F.expr("IFNULL(count(*), 0) AS value"))
        self.df_save = self.df_save.groupBy("theme_ch").pivot("date_info").agg(
            F.sum("bsr_orders"), F.count("asin")
        )
        # self.df_save.show(50, truncate=False)
        self.df_save = self.df_save.withColumn('site_name', F.lit(self.site_name))
        self.df_save = self.df_save.cache()
        self.df_save.show(50, truncate=False)
        df = self.df_save.toPandas()
        df.to_csv("/root/theme_new_2023.csv", index=False)


if __name__ == '__main__':
    site_name = sys.argv[1]  # 参数1:站点
    handle_obj = DwtThemeBsOrders(site_name=site_name)
    handle_obj.run()