1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
"""
@Author : HuangJian
@Description : asin详情表-周表
@SourceTable : us_asin_detail_2023_18
@SinkTable : ods_asin_detail
@CreateTime : 2022/05/18 14:55
@UpdateTime : 2022/05/18 14:55
"""
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.common_util import DateTypes
from utils.hdfs_utils import HdfsUtils
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
date_type = CommonUtil.get_sys_arg(2, None)
date_info = CommonUtil.get_sys_arg(3, None)
assert site_name is not None, "site_name 不能为空!"
assert date_type is not None, "date_type 不能为空!"
assert date_info is not None, "date_info 不能为空!"
hive_table = f"ods_asin_detail"
partition_dict = {
"site_name": site_name,
"date_type": date_type,
"date_info": date_info
}
# 落表路径校验
hdfs_path = CommonUtil.build_hdfs_path(hive_table, partition_dict=partition_dict)
print(f"hdfs_path is {hdfs_path}")
# 日期拆分
d1, d2 = CommonUtil.split_month_week_date(date_type, date_info)
if date_type == DateTypes.week.name:
# pg的分区周单位数是带0,如01、02、03
d2 = f'0{d2}' if int(d2) < 10 else f'{d2}'
# 这里主要是区分db链接
if site_name == 'us' and date_info >= '2023-26':
db_type = 'postgresql'
if date_info >= '2023-34':
db_type = 'postgresql_14'
date_col = "launch_time,created_time as created_at,updated_time as updated_at"
new_col = ',describe'
else:
db_type = 'postgresql_14'
date_col = "launch_time,created_time as created_at,updated_time as updated_at"
new_col = ',describe'
print(f"同步连接的db_type:{db_type}")
# 这里主要是区分新增字段
# 18周新增字段weight_str
if date_info >= '2023-18':
new_col += ',weight_str'
# 21周新增字段package_quantity、pattern_name
if date_info >= '2023-21':
new_col += ',package_quantity,pattern_name'
# 49周新增字段follow_sellers
if date_info >= '2023-49':
new_col += ',follow_sellers'
# 51周新增字段product_description,buy_sales
if date_info >= '2023-51':
new_col += ',product_description,buy_sales'
# 2024-02周新增字段image_view
if date_info >= '2024-02':
new_col += ',image_view'
# # 2024-05周新增字段product_json,product_detail_json,review_ai_text,review_label_json
# if date_info >= '2024-05':
# new_col += ',product_json,product_detail_json,review_ai_text,review_label_json'
import_table = f"{site_name}_asin_detail_{d1}_{d2}"
if date_type == DateTypes.month.name or date_type == DateTypes.month_week.name:
db_type = 'postgresql_14'
date_col = "launch_time, created_time as created_at, updated_time as updated_at"
new_col = "describe, weight_str, package_quantity, pattern_name, follow_sellers, product_description, buy_sales, image_view, spider_int, " \
"lob_asin_json, seller_json, customer_reviews_json, product_json, product_detail_json, review_ai_text, review_label_json, sp_initial_seen_asins_json, " \
"sp_4stars_initial_seen_asins_json, sp_delivery_initial_seen_asins_json, compare_similar_asin_json, together_asin_json, min_match_asin_json, " \
"variat_num, current_asin, img_list, variat_list, parent_asin, bundles_this_asins_json"
d2 = f'0{d2}' if int(d2) < 10 else f'{d2}'
import_table = f"{site_name}_asin_detail_month_{d1}_{d2}"
sql_query = f"""
select
id,
asin,
img_url,
title,
title_len,
price,
rating,
total_comments,
buy_box_seller_type,
page_inventory,
category,
volume,
weight,
rank,
{date_col},
category_state,
img_num,
img_type,
activity_type,
one_two_val,
three_four_val,
five_six_val,
eight_val,
qa_num,
one_star,
two_star,
three_star,
four_star,
five_star,
low_star,
together_asin,
brand,
ac_name,
material,
node_id,
data_type,
sp_num,
{new_col}
from {import_table}
where 1=1
and \$CONDITIONS
"""
# 进行schema和数据校验
CommonUtil.check_schema_before_import(db_type=db_type,
site_name=site_name,
query=sql_query,
hive_tb_name=hive_table,
msg_usr=['chenyuanjie'],
partition_dict=partition_dict)
# 生成导出脚本
import_sh = CommonUtil.build_import_sh(site_name=site_name,
db_type=db_type,
query=sql_query,
hdfs_path=hdfs_path,
map_num=50,
key='id')
# 导入前先删除原始hdfs数据
HdfsUtils.delete_hdfs_file(hdfs_path)
# 创建ssh Client对象--用于执行cmd命令
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, import_sh, ignore_err=False)
# 创建lzo索引和修复元数据
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_table)
# 关闭链接
client.close()
# 导入后检测--检测数据一致性
if date_type != 'month_week':
CommonUtil.check_import_sync_num(db_type=db_type,
partition_dict=partition_dict,
import_query=sql_query,
hive_tb_name=hive_table,
msg_usr=['chenyuanjie'])
# 导入后验证--重点字段阈值预警
CommonUtil.check_fields_and_warning(hive_tb_name=hive_table, partition_dict=partition_dict)