import os import re import sys import pandas as pd sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from pyspark.storagelevel import StorageLevel from utils.templates import Templates # from ..utils.templates import Templates # from AmazonSpider.pyspark_job.utils.templates_test import Templates from pyspark.sql.types import StringType, IntegerType # 分组排序的udf窗口函数 from pyspark.sql.window import Window from pyspark.sql import functions as F from textblob import Word class DwtAbaAnalyticsTemp(Templates): def __init__(self, site_name='us', date_type="month", date_info='2022-01'): super().__init__() self.site_name = site_name self.date_type = date_type self.date_info = date_info self.db_save = f'dwt_aba_analytics_temp' self.spark = self.create_spark_object( app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}") self.df_st = self.spark.sql(f"select 1+1;") self.df_st_dup = self.spark.sql(f"select 1+1;") self.df_save = self.spark.sql(f"select 1+1;") self.u_words_pattern = F.udf(self.udf_words_pattern, StringType()) # 分区参数 self.partitions_by = ['site_name'] self.partitions_num = 20 @staticmethod def udf_words_pattern(search_term, words_list_str): words_list = [] for words in eval(words_list_str): if "@@" in words: words = words.replace("@@", "") w_list = words.split("and") w_len = len(w_list) num = 0 for w in w_list: if w in search_term: num += 1 if w_len == num: words_list.append(words.strip()) else: if words in search_term: words_list.append(words.strip()) if words_list: return ';'.join(set(words_list)) else: return None def read_data(self): sql = f"select search_term, rank, bsr_orders, date_info from dwt_aba_st_analytics where site_name='{self.site_name}' and date_type='month' and date_info between '2022-07' and '2023-06';" self.df_st = self.spark.sql(sql).cache() self.df_st_dup = self.df_st.drop_duplicates(['search_term']).select('search_term') # print(self.df_st.count(), self.df_st_dup.count()) # 21536435 4703181 self.df_st.show() sql = f"select market_type, search_words, search_words_ch from ods_theme_aba where site_name='{self.site_name}';" self.df_words = self.spark.sql(sql).cache() self.df_words = self.df_words.withColumn("search_words_lower", F.lower("search_words")) self.df_words.show() df_words = self.df_words.toPandas() # self.df_words = pd.read_excel(f"/mnt/data/img_data/aba_data_2023-07-25.xlsx", sheet_name='Sheet2') df_words['search_words_lower'] = df_words.search_words.apply(lambda x: x.lower()) # 颜色主题加上@@符号标记 df_words_1 = df_words.loc[df_words.market_type=='颜色'] df_words_1.search_words_lower = df_words_1.search_words_lower.apply(lambda x: x+"@@") df_words_2 = df_words.loc[df_words.market_type!='颜色'] words_list1 = list(set(df_words_1.search_words_lower)) words_list2 = list(set(df_words_2.search_words_lower)) words_list1.extend(words_list2) # words_list = list(set(df_words.search_words_lower)) self.words_list_str = str([f" {words} " for words in words_list1]) # print("self.words_list_str:", self.words_list_str) # # 根据'search_words_ch'分组,并将'search_words'列的多个值合并成一行 # df_result = self.df_words.groupby(['market_type', 'search_words_ch'])['search_words'].apply(';'.join).reset_index() # self.spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true") # self.df_words = self.spark.createDataFrame(self.df_words) def handle_data(self): self.df_st_dup = self.df_st_dup.withColumn("search_term_lower", F.lower(self.df_st_dup["search_term"])) self.df_st_dup = self.df_st_dup.withColumn("search_term_lower", F.concat(F.lit(" "), "search_term_lower", F.lit(" "))) # 标题两头加空字符串用来匹配整个词 # self.df_st_dup.filter("search_term_lower like '% dark grey %'").show() # self.df_st_dup.show() # 匹配 self.df_st_dup = self.df_st_dup.withColumn("words_en_lower", self.u_words_pattern('search_term_lower', F.lit(self.words_list_str))) # 将列拆分为数组多列 self.df_st_dup = self.df_st_dup.withColumn("words_en_lower", F.split(self.df_st_dup["words_en_lower"], ";")) # 将数组合并到多行 self.df_st_dup = self.df_st_dup.withColumn("search_words_lower", F.explode(self.df_st_dup["words_en_lower"])) # self.df_st_dup.show(30, truncate=False) self.df_st_dup = self.df_st_dup.join( self.df_words, on=['search_words_lower'], how='left' # 改成inner, 这样避免正则匹配结果不准 ) # self.df_st_dup.show(30, truncate=False) # self.df_st_dup.filter("search_term_lower like '% dark grey %'").show() # quit() # self.df_st_dup.filter("search_term_lower='mini electric chopper'").show(30, truncate=False) # df_st_dup = self.df_st_dup.toPandas() # df_st_dup = df_st_dup.merge( # self.df_words, on=['search_words_lower'], how='left' # ) # print(df_st_dup.head(20)) self.df_st = self.df_st.join( self.df_st_dup, on='search_term', how='left' ) self.df_st.show(30, truncate=False) self.df_st.filter("search_term_lower like '% dark grey %'").show() self.df_save = self.df_st.groupBy(['search_words_ch']).pivot("date_info").agg( F.min("rank"), F.count("search_term"), F.sum("bsr_orders") ) # self.df_save.show(30, truncate=False) self.df_save = self.df_save.withColumn("site_name", F.lit(self.site_name)) df_save = self.df_save.toPandas() df_save.to_csv("/root/theme.csv", index=False) if __name__ == '__main__': site_name = sys.argv[1] # 参数1:站点 handle_obj = DwtAbaAnalyticsTemp(site_name=site_name) handle_obj.run()