1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.db_util import DBUtil
from pyspark.sql import functions as F
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil
"""
对 asin图片本地存储地址进行存档,同时进行预处理
"""
class DimAsinImgPath(object):
def __init__(self, site_name):
self.site_name = site_name
app_name = f"{self.__class__.__name__}:{site_name}"
self.spark = SparkUtil.get_spark_session(app_name)
self.hive_tb = "dim_asin_img_path"
def run(self):
sql = f"""
select id,
asin,
asin_img_url,
asin_img_path,
created_at,
bsr_cate_current_id
from ods_asin_img_path
where site_name = '{self.site_name}'
and asin_img_url is not null
and asin_img_url != 'null'
and asin_img_path is not null
"""
print("======================查询sql如下======================")
print(sql)
df_save = self.spark.sql(sql)
if df_save.first() == None:
print("============================无数据跳过===================================")
return
path_sql = f"""
select id as bsr_cate_current_id,
category_id
from us_bs_category
"""
conn_info = DBUtil.get_connection_info("mysql", "us")
id_df = SparkUtil.read_jdbc_query(
session=self.spark,
url=conn_info["url"],
pwd=conn_info["pwd"],
username=conn_info["username"],
query=path_sql
)
# todo
df_save = df_save.join(id_df, on='bsr_cate_current_id').select(
F.col("id"),
F.col("asin"),
F.col("asin_img_url"),
F.col("asin_img_path"),
F.col("created_at"),
df_save["bsr_cate_current_id"],
F.col("category_id"),
F.lit(self.site_name).alias("site_name"),
)
partition_dict = {
"site_name": self.site_name
}
hdfs_path = CommonUtil.build_hdfs_path(self.hive_tb, partition_dict=partition_dict)
HdfsUtils.delete_hdfs_file(hdfs_path)
partition_by = list(partition_dict.keys())
print(f"当前存储的表名为:{self.hive_tb},分区为{partition_by}", )
df_save.write.saveAsTable(name=self.hive_tb, format='hive', mode='append', partitionBy=partition_by)
print("success")
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
obj = DimAsinImgPath(site_name)
obj.run()