import os import sys import re import numpy as np sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from utils.templates import Templates # from ..utils.templates import Templates from pyspark.sql import functions as F from pyspark.sql.types import StringType, FloatType, StructType, StructField class DwtThemeBsOrders(Templates): def __init__(self, site_name='us', date_type="month", date_info='2022-1'): super().__init__() self.site_name = site_name self.date_type = date_type self.date_info = date_info self.db_save = f'dwt_theme_bs_orders' self.spark = self.create_spark_object(app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}") self.df_theme = self.spark.sql(f"select 1+1;") self.df_flow = self.spark.sql(f"select 1+1;") self.df_save = self.spark.sql(f"select 1+1;") self.partitions_by = ['site_name'] self.partitions_num = 10 # 注册自定义函数 (UDF) self.u_theme_pattern = F.udf(self.udf_theme_pattern, StringType()) @staticmethod def udf_theme_pattern(title, theme_list_str): found_themes = [theme.strip() for theme in eval(theme_list_str) if theme in title] if found_themes: return ','.join(set(found_themes)) else: return None def read_data(self): # sql = f"select asin, title as asin_title, bsr_orders, dt as date_info from selection_off_line.dwt_asin_month where site='{self.site_name}' and " \ # f"dt in ('2022_7', '2022_8', '2022_9', '2022_10', '2022_11', '2022_12') " \ # f"union all " \ # f"select asin, asin_title, bsr_orders, date_info from dwt_flow_asin where site_name='{self.site_name}' and " \ # f"date_type='month' and date_info in ('2023-01', '2023-02', '2023-03', '2023-04', '2023-05', '2023-06');" sql = f"select asin, asin_title, bsr_orders, date_info from dwt_flow_asin where site_name='{self.site_name}' and " \ f"date_type='month' and date_info >= '2023-01' and date_info <= '2023-12';" # f"date_type='month' and date_info >= '2023-01' and date_info <= '2023-01' limit 1000000;" print("sql:", sql) self.df_flow = self.spark.sql(sql).cache() self.df_flow.show(10, truncate=False) sql = f"select id as theme_id, theme_type_en, theme_en, theme_en_lower, theme_ch from ods_theme where site_name='{self.site_name}'" print("sql:", sql) self.df_theme = self.spark.sql(sql).cache() self.df_theme.show(10, truncate=False) def handle_data_new2(self): # 将主题列表转换为一个单列的DataFrame pdf_theme = self.df_theme.toPandas() theme_list = list(set(pdf_theme['theme_en_lower'])) # 创建一个包含所有主题的正则表达式模式 theme_pattern = '|'.join([f"\\b{theme}\\b" for theme in theme_list]) print(f"theme_pattern: {len(theme_list), theme_pattern[:100]}") # 准备标题数据,转换为小写 self.df_flow = self.df_flow.withColumn("asin_title_lower", F.lower(F.col("asin_title"))) # 使用正则表达式在标题中查找主题 self.df_flow = self.df_flow.withColumn("matched_theme", F.regexp_extract(F.col("asin_title_lower"), theme_pattern, 0)) # 过滤出未找到匹配主题的行 self.df_flow = self.df_flow.filter(F.col("matched_theme") != "") # 将找到的主题与主题DataFrame进行关联 self.df_flow = self.df_flow.join(self.df_theme, self.df_flow['matched_theme'] == self.df_theme['theme_en_lower'], 'inner') # 去除重复项 self.df_flow = self.df_flow.dropDuplicates(['asin', 'date_info', 'theme_ch']) # 按主题和日期对数据进行分组并聚合 # self.df_save = self.df_flow.groupBy("theme_en_lower", "date_info").agg( # F.sum("bsr_orders").alias("total_bsr_orders"), # F.count("asin").alias("total_asins") # ) self.df_save = self.df_flow.groupBy("theme_ch").pivot("date_info").agg( F.sum("bsr_orders"), F.count("asin") ) # 添加站点名称列 self.df_save = self.df_save.withColumn('site_name', F.lit(self.site_name)) # 缓存最终结果以提高性能 self.df_save = self.df_save.cache() # 展示最终结果 self.df_save.show(50, truncate=False) df = self.df_save.toPandas() df.to_csv("/root/theme_new.csv", index=False) def handle_data_new1(self): # 将主题列表转换为DataFrame pdf_theme = self.df_theme.toPandas() theme_list = list(set(pdf_theme['theme_en_lower'])) df_themes = self.spark.createDataFrame([(theme,) for theme in theme_list], ['theme']) # 准备标题数据,加上空格并转换为小写 self.df_flow = self.df_flow.withColumn("asin_title_lower", F.lower(F.concat(F.lit(" "), F.col("asin_title"), F.lit(" ")))) # 为每个主题创建一个标志列,表示标题是否包含该主题 for theme in theme_list: self.df_flow = self.df_flow.withColumn(f"theme_{theme}", F.when(F.col("asin_title_lower").contains(f" {theme} "), 1).otherwise(0)) # 将主题DataFrame与流DataFrame关联,只保留匹配的行 for theme in theme_list: self.df_flow = self.df_flow.join(df_themes, self.df_flow[f"theme_{theme}"] == 1, 'left_outer').drop( f"theme_{theme}") # 过滤出包含至少一个主题的记录 self.df_flow = self.df_flow.filter(self.df_flow['theme'].isNotNull()) # 将主题DataFrame中的其他列(如果有的话)添加到流DataFrame中 self.df_flow = self.df_flow.join(self.df_theme, self.df_flow['theme'] == self.df_theme['theme_en_lower'], 'inner') # 去除重复项 self.df_flow = self.df_flow.dropDuplicates(['asin', 'date_info', 'theme']) # 按主题和日期对数据进行分组并聚合 self.df_save = self.df_flow.groupBy("theme", "date_info").agg( F.sum("bsr_orders").alias("total_bsr_orders"), F.count("asin").alias("total_asins") ) # 添加站点名称列 self.df_save = self.df_save.withColumn('site_name', F.lit(self.site_name)) # 缓存最终结果以提高性能 self.df_save = self.df_save.cache() # 展示最终结果 self.df_save.show(50, truncate=False) def handle_data(self): pdf_theme = self.df_theme.toPandas() theme_list = list(set(pdf_theme.theme_en_lower)) self.theme_list_str = str([f" {theme} " for theme in theme_list]) print("self.theme_list_str:", self.theme_list_str) # 小写 self.df_flow = self.df_flow.withColumn("asin_title_lower", F.lower(self.df_flow["asin_title"])) # 过滤空值 self.df_flow = self.df_flow.filter("asin_title_lower is not null") # 过滤null和none字符串 self.df_flow = self.df_flow.filter("asin_title_lower not in ('none', 'null', 'nan')") self.df_flow = self.df_flow.withColumn("asin_title_lower", F.concat(F.lit(" "), "asin_title_lower", F.lit(" "))) # 标题两头加空字符串用来匹配整个词 self.df_flow = self.df_flow.withColumn("theme_en_lower", self.u_theme_pattern('asin_title_lower', F.lit(self.theme_list_str))) # 将列拆分为数组多列 self.df_flow = self.df_flow.withColumn("theme_en_lower", F.split(self.df_flow["theme_en_lower"], ",")) # 将数组合并到多行 self.df_flow = self.df_flow.withColumn("theme_en_lower", F.explode(self.df_flow["theme_en_lower"])) self.df_flow = self.df_flow.join( self.df_theme, on=['theme_en_lower'], how='left' # 改成inner, 这样避免正则匹配结果不准 ) # self.df_flow.show(50, truncate=False) # self.df_flow = self.df_flow.filter("bsr_orders >0") # self.df_theme = self.df_theme.drop_duplicates(['asin', 'theme_ch']) self.df_flow = self.df_flow.drop_duplicates(['asin', 'date_info', 'theme_ch']) self.df_save = self.df_flow # self.df_save = self.df_flow.join( # self.df_theme, on='asin', how='inner' # ) # self.df_save.show(30, truncate=False) # pivot_df1 = self.df_asin_title.groupBy("asin").pivot("theme_type_en_counts").agg( # F.expr("IFNULL(count(*), 0) AS value")) self.df_save = self.df_save.groupBy("theme_ch").pivot("date_info").agg( F.sum("bsr_orders"), F.count("asin") ) # self.df_save.show(50, truncate=False) self.df_save = self.df_save.withColumn('site_name', F.lit(self.site_name)) self.df_save = self.df_save.cache() self.df_save.show(50, truncate=False) df = self.df_save.toPandas() df.to_csv("/root/theme_new_2023.csv", index=False) if __name__ == '__main__': site_name = sys.argv[1] # 参数1:站点 handle_obj = DwtThemeBsOrders(site_name=site_name) handle_obj.run()