hivedb_to_bighive.py 3.42 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
"""
1. 计算上升词,热搜词,新出词
2. quantity_being_sold在售商品数
"""

import os
import sys

import pandas as pd

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录

from utils.templates import Templates
from pyspark.sql.types import IntegerType
from pyspark.sql import functions as F


class DwdStInfo(Templates):

    def __init__(self, site_name='us', date_type="month", date_info='2022-1'):
        super().__init__()
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        self.db_save = f'ods_asin_detail_copy'
        self.spark   = self.create_spark_object(app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}")
        self.df_date = self.get_year_week_tuple()
        self.df_save = self.spark.sql(f"select 1+1;")
        self.date_info_tuple = tuple()
        self.reset_partitions(partitions_num=10)
        self.partitions_by = ['site_name', 'date_type', 'date_info']
        self.get_date_info_tuple()

    def read_data(self):
        print("1.1 读取ods_asin_detail_copy表")
        sql = f"SELECT  *FROM  selection_off_line.ods_{self.site_name}_asin_detail WHERE  dt  = '{self.date_info}';"
        print("sql:", sql)
        self.df_asin_deail = self.spark.sql(sql).cache()


    def handle_data(self):
        self.df_save = self.df_asin_deail
        self.df_save = self.df_save.drop('dt')
        self.df_save = self.df_save.withColumn("site_name", F.lit(self.site_name))
        self.df_save = self.df_save.withColumn("date_type", F.lit(self.date_type))
        self.df_save = self.df_save.withColumn("date_info", F.lit(self.date_info))
        self.df_save.show(10)


    def get_date_info_tuple(self):
        df_week_start = self.df_date.loc[(self.df_date.year_week == '2020-44')]
        id_start = list(df_week_start.id)[0] if list(df_week_start.id) else 0
        if self.date_type in ['week', '4_week']:
            df_week_current = self.df_date.loc[self.df_date.year_week == self.date_info]
        elif self.date_type == 'month':
            df_week_current = self.df_date.loc[self.df_date.year_month == self.date_info]
        elif self.date_type == 'quarter':
            df_week_current = self.df_date.loc[self.df_date.year_quarter == self.date_info]
        else:
            print("date_type输入错误, 退出")
            df_week_current = pd.DataFrame()
        id_current_max = max(list(df_week_current.id)) if list(df_week_current.id) else 0
        df_week_all = self.df_date.loc[(self.df_date.id >= id_start) & (self.df_date.id <= id_current_max)]
        if self.date_type == 'week':
            self.date_info_tuple = tuple(df_week_all.year_week)
        if self.date_type == "4_week":
            df_week_all = self.df_date.loc[(self.df_date.id >= id_start) & (self.df_date.id <= id_current_max - 5)]
            self.date_info_tuple = tuple(set(df_week_all.year_month))
        if self.date_type == 'month':
            self.date_info_tuple = tuple(set(df_week_all.year_month))
        if self.date_type == 'quarter':
            self.date_info_tuple = tuple(set(df_week_all.year_quarter))



if __name__ == '__main__':

    site_name = sys.argv[1]  # 参数1:站点
    date_type = sys.argv[2]  # 参数2:类型:week/4_week/month/quarter
    date_info = sys.argv[3]  # 参数3:年-周/年-月/年-季, 比如: 2022-1
    handle_obj = DwdStInfo(site_name=site_name, date_type=date_type, date_info=date_info)
    handle_obj.run()