1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
"""
1. 计算上升词,热搜词,新出词
2. quantity_being_sold在售商品数
"""
import os
import sys
import pandas as pd
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from utils.templates import Templates
from pyspark.sql.types import IntegerType
from pyspark.sql import functions as F
class DwdStInfo(Templates):
def __init__(self, site_name='us', date_type="month", date_info='2022-1'):
super().__init__()
self.site_name = site_name
self.date_type = date_type
self.date_info = date_info
self.db_save = f'ods_asin_detail_copy'
self.spark = self.create_spark_object(app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}")
self.df_date = self.get_year_week_tuple()
self.df_save = self.spark.sql(f"select 1+1;")
self.date_info_tuple = tuple()
self.reset_partitions(partitions_num=10)
self.partitions_by = ['site_name', 'date_type', 'date_info']
self.get_date_info_tuple()
def read_data(self):
print("1.1 读取ods_asin_detail_copy表")
sql = f"SELECT *FROM selection_off_line.ods_{self.site_name}_asin_detail WHERE dt = '{self.date_info}';"
print("sql:", sql)
self.df_asin_deail = self.spark.sql(sql).cache()
def handle_data(self):
self.df_save = self.df_asin_deail
self.df_save = self.df_save.drop('dt')
self.df_save = self.df_save.withColumn("site_name", F.lit(self.site_name))
self.df_save = self.df_save.withColumn("date_type", F.lit(self.date_type))
self.df_save = self.df_save.withColumn("date_info", F.lit(self.date_info))
self.df_save.show(10)
def get_date_info_tuple(self):
df_week_start = self.df_date.loc[(self.df_date.year_week == '2020-44')]
id_start = list(df_week_start.id)[0] if list(df_week_start.id) else 0
if self.date_type in ['week', '4_week']:
df_week_current = self.df_date.loc[self.df_date.year_week == self.date_info]
elif self.date_type == 'month':
df_week_current = self.df_date.loc[self.df_date.year_month == self.date_info]
elif self.date_type == 'quarter':
df_week_current = self.df_date.loc[self.df_date.year_quarter == self.date_info]
else:
print("date_type输入错误, 退出")
df_week_current = pd.DataFrame()
id_current_max = max(list(df_week_current.id)) if list(df_week_current.id) else 0
df_week_all = self.df_date.loc[(self.df_date.id >= id_start) & (self.df_date.id <= id_current_max)]
if self.date_type == 'week':
self.date_info_tuple = tuple(df_week_all.year_week)
if self.date_type == "4_week":
df_week_all = self.df_date.loc[(self.df_date.id >= id_start) & (self.df_date.id <= id_current_max - 5)]
self.date_info_tuple = tuple(set(df_week_all.year_month))
if self.date_type == 'month':
self.date_info_tuple = tuple(set(df_week_all.year_month))
if self.date_type == 'quarter':
self.date_info_tuple = tuple(set(df_week_all.year_quarter))
if __name__ == '__main__':
site_name = sys.argv[1] # 参数1:站点
date_type = sys.argv[2] # 参数2:类型:week/4_week/month/quarter
date_info = sys.argv[3] # 参数3:年-周/年-月/年-季, 比如: 2022-1
handle_obj = DwdStInfo(site_name=site_name, date_type=date_type, date_info=date_info)
handle_obj.run()