dwt_aba_analytics_temp.py 6.26 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
import os
import re
import sys

import pandas as pd

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from pyspark.storagelevel import StorageLevel
from utils.templates import Templates
# from ..utils.templates import Templates
# from AmazonSpider.pyspark_job.utils.templates_test import Templates
from pyspark.sql.types import StringType, IntegerType
# 分组排序的udf窗口函数
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from textblob import Word


class DwtAbaAnalyticsTemp(Templates):

    def __init__(self, site_name='us', date_type="month", date_info='2022-01'):
        super().__init__()
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        self.db_save = f'dwt_aba_analytics_temp'
        self.spark = self.create_spark_object(
            app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}")
        self.df_st = self.spark.sql(f"select 1+1;")
        self.df_st_dup = self.spark.sql(f"select 1+1;")
        self.df_save = self.spark.sql(f"select 1+1;")
        self.u_words_pattern = F.udf(self.udf_words_pattern, StringType())
        # 分区参数
        self.partitions_by = ['site_name']
        self.partitions_num = 20

    @staticmethod
    def udf_words_pattern(search_term, words_list_str):
        words_list = []
        for words in eval(words_list_str):
            if "@@" in words:
                words = words.replace("@@", "")
                w_list = words.split("and")
                w_len = len(w_list)
                num = 0
                for w in w_list:
                    if w in search_term:
                        num += 1
                if w_len == num:
                    words_list.append(words.strip())
            else:
                if words in search_term:
                    words_list.append(words.strip())
        if words_list:
            return ';'.join(set(words_list))
        else:
            return None

    def read_data(self):
        sql = f"select search_term, rank, bsr_orders, date_info from dwt_aba_st_analytics where site_name='{self.site_name}' and date_type='month' and date_info between '2022-07' and '2023-06';"
        self.df_st = self.spark.sql(sql).cache()
        self.df_st_dup = self.df_st.drop_duplicates(['search_term']).select('search_term')
        # print(self.df_st.count(), self.df_st_dup.count())  # 21536435 4703181
        self.df_st.show()

        sql = f"select market_type, search_words, search_words_ch from ods_theme_aba where site_name='{self.site_name}';"
        self.df_words = self.spark.sql(sql).cache()
        self.df_words = self.df_words.withColumn("search_words_lower", F.lower("search_words"))
        self.df_words.show()

        df_words = self.df_words.toPandas()
        # self.df_words = pd.read_excel(f"/mnt/data/img_data/aba_data_2023-07-25.xlsx", sheet_name='Sheet2')
        df_words['search_words_lower'] = df_words.search_words.apply(lambda x: x.lower())
        # 颜色主题加上@@符号标记

        df_words_1 = df_words.loc[df_words.market_type=='颜色']
        df_words_1.search_words_lower = df_words_1.search_words_lower.apply(lambda x: x+"@@")
        df_words_2 = df_words.loc[df_words.market_type!='颜色']
        words_list1 = list(set(df_words_1.search_words_lower))
        words_list2 = list(set(df_words_2.search_words_lower))

        words_list1.extend(words_list2)

        # words_list = list(set(df_words.search_words_lower))
        self.words_list_str = str([f" {words} " for words in words_list1])

        # print("self.words_list_str:", self.words_list_str)
        # # 根据'search_words_ch'分组,并将'search_words'列的多个值合并成一行
        # df_result = self.df_words.groupby(['market_type', 'search_words_ch'])['search_words'].apply(';'.join).reset_index()
        # self.spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

        # self.df_words = self.spark.createDataFrame(self.df_words)

    def handle_data(self):
        self.df_st_dup = self.df_st_dup.withColumn("search_term_lower", F.lower(self.df_st_dup["search_term"]))
        self.df_st_dup = self.df_st_dup.withColumn("search_term_lower", F.concat(F.lit(" "), "search_term_lower", F.lit(" ")))  # 标题两头加空字符串用来匹配整个词
        # self.df_st_dup.filter("search_term_lower like '% dark grey %'").show()
        # self.df_st_dup.show()
        # 匹配
        self.df_st_dup = self.df_st_dup.withColumn("words_en_lower", self.u_words_pattern('search_term_lower', F.lit(self.words_list_str)))
        # 将列拆分为数组多列
        self.df_st_dup = self.df_st_dup.withColumn("words_en_lower", F.split(self.df_st_dup["words_en_lower"], ";"))
        # 将数组合并到多行
        self.df_st_dup = self.df_st_dup.withColumn("search_words_lower", F.explode(self.df_st_dup["words_en_lower"]))
        # self.df_st_dup.show(30, truncate=False)
        self.df_st_dup = self.df_st_dup.join(
            self.df_words, on=['search_words_lower'], how='left'  # 改成inner, 这样避免正则匹配结果不准
        )
        # self.df_st_dup.show(30, truncate=False)
        # self.df_st_dup.filter("search_term_lower like '% dark grey %'").show()
        # quit()
        # self.df_st_dup.filter("search_term_lower='mini electric chopper'").show(30, truncate=False)

        # df_st_dup = self.df_st_dup.toPandas()
        # df_st_dup = df_st_dup.merge(
        #     self.df_words, on=['search_words_lower'], how='left'
        # )
        # print(df_st_dup.head(20))

        self.df_st = self.df_st.join(
            self.df_st_dup, on='search_term', how='left'
        )
        self.df_st.show(30, truncate=False)
        self.df_st.filter("search_term_lower like '% dark grey %'").show()

        self.df_save = self.df_st.groupBy(['search_words_ch']).pivot("date_info").agg(
            F.min("rank"),
            F.count("search_term"),
            F.sum("bsr_orders")
        )
        # self.df_save.show(30, truncate=False)
        self.df_save = self.df_save.withColumn("site_name", F.lit(self.site_name))
        df_save = self.df_save.toPandas()
        df_save.to_csv("/root/theme.csv", index=False)


if __name__ == '__main__':
    site_name = sys.argv[1]  # 参数1:站点
    handle_obj = DwtAbaAnalyticsTemp(site_name=site_name)
    handle_obj.run()