1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from pyspark.sql.types import BooleanType
from utils.db_util import DBUtil
from utils.common_util import CommonUtil, DateTypes
from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil
from pyspark.sql import functions as F
from yswg_utils.common_udf import category_craw_flag
"""
获取当前榜单系统中没有asin详情的数据
"""
class DwdDayAsin(object):
def __init__(self, site_name, date_info):
self.site_name = site_name
self.date_info = date_info
app_name = f"{self.__class__.__name__}:{site_name}:{date_info}"
self.spark = SparkUtil.get_spark_session(app_name)
self.udf_category_craw_flag = F.udf(category_craw_flag, BooleanType())
self.hive_tb = "dwd_day_asin"
def run(self):
# 判断前一日asin有没有导入没有的话警告
df_self_asin_part = CommonUtil.select_partitions_df(self.spark, "ods_self_asin_detail")
last_day = CommonUtil.get_day_offset(self.date_info, -1)
last_7day = CommonUtil.get_day_offset(self.date_info, -7)
flag = df_self_asin_part.filter(
f"date_type = '{DateTypes.day.name}' and site_name = '{self.site_name}' and date_info = '{last_day}'").count()
if flag == 0:
CommonUtil.send_wx_msg(['wujicang', 'hezhe'], "异常提醒", f"{last_day}日asin无数据,请检查")
month = CommonUtil.reformat_date(date_info, "%Y-%m-%d", "%Y-%m", )
sql = f"""
with asin_list1 as (
select asin, category_first_id
from dwt_bsr_asin_detail
where title is null
and site_name = '{self.site_name}'
and date_info = '{month}'
),
asin_list2 as (
select asin, category_first_id
from dwt_nsr_asin_detail
where title is null
and site_name = '{self.site_name}'
and date_info = '{month}'
),
asin_exist as (
select asin
from dim_cal_asin_history_detail
where site_name = '{self.site_name}'
),
-- 失效的asin
asin_lost as (
select asin
from dwd_day_asin
where date_info >= '{last_7day}'
and date_info < '{date_info}'
and site_name = '{self.site_name}'
and queue_name = 'day_queue'
and craw_state = 2
),
asin_all as (
select asin, category_first_id
from asin_list1
union all
select asin, category_first_id
from asin_list2
)
select asin_all.asin, category_first_id
from asin_all
left anti join asin_exist on asin_all.asin = asin_exist.asin
left anti join asin_lost on asin_all.asin = asin_lost.asin
"""
# left anti join 类似 not exist
print("======================查询sql如下======================")
print(sql)
df_save = self.spark.sql(sql)
# 过滤不爬的asin
df_save = df_save.filter(self.udf_category_craw_flag(F.col("category_first_id")))
# limit 限制数量
df_save = df_save.dropDuplicates(['asin'])
count_all = df_save.count()
count = 40 * 10000
df_save = df_save.limit(count)
df_save = df_save.select(
"asin",
F.lit("day_queue").alias("queue_name"),
F.lit(0).alias("craw_state"),
F.lit(self.site_name).alias("site_name"),
F.lit(self.date_info).alias("date_info"),
).orderBy(F.col("asin").desc())
count = df_save.count()
partition_dict = {
"site_name": self.site_name,
"date_info": self.date_info,
}
hdfs_path = CommonUtil.build_hdfs_path(self.hive_tb, partition_dict=partition_dict)
HdfsUtils.delete_file_in_folder(hdfs_path)
df_save = df_save.repartition(1)
partition_by = list(partition_dict.keys())
print(f"当前存储的表名为:{self.hive_tb},分区为{partition_by}", )
df_save.write.saveAsTable(name=self.hive_tb, format='hive', mode='append', partitionBy=partition_by)
CommonUtil.send_wx_msg(['wujicang'], "提醒", f"{date_info}日asin计算成功,条数为{count}/{count_all}")
def run_update(self):
# 获取未爬取的数据集合
craw_sql = f"""
select asin
from {self.site_name}_self_all_syn
where data_type = 4
and date_info = '{self.date_info}'
and state in (4, 12, 13)
"""
conn_info = DBUtil.get_connection_info("mysql", self.site_name)
craw_asin_df = SparkUtil.read_jdbc_query(
session=self.spark,
url=conn_info["url"],
pwd=conn_info["pwd"],
username=conn_info["username"],
query=craw_sql
).cache()
craw_asin_df = craw_asin_df.dropDuplicates(['asin'])
# 标记
sql = f"""
select asin, queue_name, craw_state,date_info,site_name
from dwd_day_asin
where date_info = '{self.date_info}'
and site_name = '{self.site_name}'
and queue_name = 'day_queue'
"""
df_update = self.spark.sql(sql)
if df_update.count() == 0:
return
df_update = df_update.join(craw_asin_df, on=['asin'], how="left").select(
df_update['asin'].alias("asin"),
F.col("queue_name"),
F.when(craw_asin_df['asin'].isNotNull(), F.lit("2")).otherwise(F.lit("1")).alias("craw_state"),
F.col("date_info"),
F.col("site_name"),
)
# 更新
partition_dict = {
"site_name": self.site_name,
"date_info": self.date_info,
}
# 更新表
CommonUtil.save_or_update_table(
spark_session=self.spark,
hive_tb_name=self.hive_tb,
partition_dict=partition_dict,
df_save=df_update
)
pass
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
date_info = CommonUtil.get_sys_arg(2, None)
update_flag = CommonUtil.get_sys_arg(len(sys.argv) - 1, None) == 'update'
obj = DwdDayAsin(site_name, date_info)
if update_flag:
# 更新数据
obj.run_update()
else:
# 构建数据
obj.run()