dwt_top100.py 5.28 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
import os
import sys
import re

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from utils.templates import Templates
# from ..utils.templates import Templates
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, IntegerType, StringType


class DwtTop100(Templates):

    def __init__(self, site_name='us', date_type="month", date_info='2024-01'):
        # super().__init__()
        super(DwtTop100, self).__init__()
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        # 初始化self.spark对
        self.db_save = 'dwt_top100'
        self.spark = self.create_spark_object(
            app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}")
        print(f"参数信息: {self.site_name}, {self.date_type}, {self.date_info}")
        self.date_info = date_info
        self.df_save = self.spark.sql("select 1+1;")
        self.df_asin_measure = self.spark.sql("select 1+1;")
        self.df_asin_bs_info = self.spark.sql("select 1+1;")
        self.df_bs_category = self.spark.sql("select 1+1;")
        self.df_flow_asin = self.spark.sql("select 1+1;")

    def read_data(self):
        sql = f"""
            select asin, asin_type, bsr_orders, category_first_id, category_id, first_category_rank, current_category_rank, 
            asin_price, asin_rating, asin_buy_box_seller_type, asin_is_new, asin_total_comments, asin_launch_time, asin_launch_time_type, 
            asin_brand_name, is_brand_label, asin_bought_month as buy_data_bought_month 
            from dwt_flow_asin where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'
        """
        print(f"1. 读取dwt_flow_asin数据: sql -- {sql}")
        self.df_flow_asin = self.spark.sql(sqlQuery=sql).cache()
        self.df_flow_asin.show(10, truncate=False)

    def read_data_old(self):
        sql = f"""SELECT asin, asin_bsr_orders  from dwd_asin_measure where site_name = '{self.site_name}'
                  and date_type = '{self.date_type}' 
                  and date_info = '{self.date_info}';"""  # and date_info>='2023-15' --  and asin_bsr_orders >0
        print(f"1. 读取dwd_asin_measure数据: sql -- {sql}")
        self.df_asin_measure = self.spark.sql(sqlQuery=sql).cache()
        self.df_asin_measure.show(10, truncate=False)
        sql = f"""SELECT asin, asin_bs_cate_1_id,  asin_bs_cate_1_rank, asin_bs_cate_current_id  from dim_asin_bs_info where site_name = '{self.site_name}'
                  and date_type = '{self.date_type}'
                  and date_info = '{self.date_info}'
                  and asin_bs_cate_1_id is not null;"""  # and date_info>='2023-15'
        print(f"2. 读取dim_asin_bs_info数据: sql -- {sql}")
        self.df_asin_bs_info = self.spark.sql(sqlQuery=sql).cache()
        self.df_asin_bs_info.show(10, truncate=False)
        sql = f"""SELECT 
                        nodes_num, 
                        category_parent_id, 
                        category_id as asin_bs_cate_current_id, 
                        CASE 
                            WHEN redirect_first_id IS NOT NULL THEN redirect_first_id 
                            ELSE category_first_id 
                        END as asin_bs_cate_1_id
                    FROM 
                        ods_bs_category 
                    WHERE 
                        site_name = '{self.site_name}';"""  # and date_info>='2023-15'
        print(f"3. 读取ods_bs_category数据: sql -- {sql}")
        self.df_bs_category = self.spark.sql(sqlQuery=sql).cache()
        self.df_bs_category.show(10, truncate=False)
        sql = f"select asin, asin_type, asin_price, asin_rating, asin_buy_box_seller_type, asin_is_new, asin_total_comments, asin_launch_time, asin_launch_time_type from dim_asin_detail " \
              f"where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}';"
        print(f"3. 读取dim_asin_detail数据: sql -- {sql}")
        self.df_asin_detail = self.spark.sql(sqlQuery=sql).cache()
        self.df_asin_detail.show(10, truncate=False)

    def handle_data_old(self):
        self.df_save = self.df_asin_measure.join(
            self.df_asin_bs_info, on='asin', how='inner'
        ).join(
            self.df_bs_category, on=['asin_bs_cate_current_id', 'asin_bs_cate_1_id']
        ).join(
            self.df_asin_detail, on=['asin']
        )
        self.df_save = self.df_save.drop_duplicates()
        self.df_save.show(10, truncate=False)

    def save_data(self):
        self.df_save = self.df_flow_asin
        self.df_save = self.df_save.toPandas()
fangxingjun committed
93
        self.df_save.to_csv(f"/home/fangxingjun/asin_bsr_{self.site_name}_{self.date_info}.csv", index=False)
chenyuanjie committed
94 95 96

    def save_data_old(self):
        self.df_save = self.df_save.toPandas()
fangxingjun committed
97
        self.df_save.to_csv(f"/home/fangxingjun/asin_bsr_{self.site_name}_{self.date_info}.csv", index=False)
chenyuanjie committed
98 99 100 101 102 103 104 105


if __name__ == '__main__':
    site_name = sys.argv[1]  # 参数1:站点
    date_type = sys.argv[2]  # 参数2:类型:week/4_week/month/quarter/day
    date_info = sys.argv[3]  # 参数3:年-周/年-月/年-季/年-月-日, 比如: 2022-1
    handle_obj = DwtTop100(site_name=site_name, date_type=date_type, date_info=date_info)
    handle_obj.run()