Commit 1b5a6368 by chenyuanjie

sqoop导入脚本重构-隐藏数据库连接信息

parent 4d168d08
"""
@Author : HuangJian
@Description : asin详情表-周表
@SourceTable : us_asin_detail_2023_18
@SinkTable : ods_asin_detail
@CreateTime : 2022/05/18 14:55
@UpdateTime : 2022/05/18 14:55
"""
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.common_util import DateTypes
from utils.hdfs_utils import HdfsUtils
from utils.secure_db_client import get_remote_engine
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
......@@ -24,151 +14,40 @@ if __name__ == '__main__':
assert date_type is not None, "date_type 不能为空!"
assert date_info is not None, "date_info 不能为空!"
hive_table = f"ods_asin_detail"
d1, d2 = CommonUtil.split_month_week_date(date_type, date_info)
d2 = f'0{d2}' if int(d2) < 10 else f'{d2}'
db_type = 'postgresql_14'
import_table = f"{site_name}_asin_detail_month_{d1}_{d2}"
hive_table = "ods_asin_detail"
partition_dict = {
"site_name": site_name,
"date_type": date_type,
"date_info": date_info
}
# 落表路径校验
hdfs_path = CommonUtil.build_hdfs_path(hive_table, partition_dict=partition_dict)
print(f"hdfs_path is {hdfs_path}")
# 日期拆分
d1, d2 = CommonUtil.split_month_week_date(date_type, date_info)
if date_type == DateTypes.week.name:
# pg的分区周单位数是带0,如01、02、03
d2 = f'0{d2}' if int(d2) < 10 else f'{d2}'
# 这里主要是区分db链接
if site_name == 'us' and date_info >= '2023-26':
db_type = 'postgresql'
if date_info >= '2023-34':
db_type = 'postgresql_14'
date_col = "launch_time,created_time as created_at,updated_time as updated_at"
new_col = ',describe'
else:
db_type = 'postgresql_14'
date_col = "launch_time,created_time as created_at,updated_time as updated_at"
new_col = ',describe'
print(f"同步连接的db_type:{db_type}")
# 这里主要是区分新增字段
# 18周新增字段weight_str
if date_info >= '2023-18':
new_col += ',weight_str'
# 21周新增字段package_quantity、pattern_name
if date_info >= '2023-21':
new_col += ',package_quantity,pattern_name'
# 49周新增字段follow_sellers
if date_info >= '2023-49':
new_col += ',follow_sellers'
# 51周新增字段product_description,buy_sales
if date_info >= '2023-51':
new_col += ',product_description,buy_sales'
# 2024-02周新增字段image_view
if date_info >= '2024-02':
new_col += ',image_view'
# # 2024-05周新增字段product_json,product_detail_json,review_ai_text,review_label_json
# if date_info >= '2024-05':
# new_col += ',product_json,product_detail_json,review_ai_text,review_label_json'
import_table = f"{site_name}_asin_detail_{d1}_{d2}"
if date_type == DateTypes.month.name or date_type == DateTypes.month_week.name:
db_type = 'postgresql_14'
date_col = "launch_time, created_time as created_at, updated_time as updated_at"
new_col = "describe, weight_str, package_quantity, pattern_name, follow_sellers, product_description, buy_sales, image_view, spider_int, " \
"lob_asin_json, seller_json, customer_reviews_json, product_json, product_detail_json, review_ai_text, review_label_json, sp_initial_seen_asins_json, " \
"sp_4stars_initial_seen_asins_json, sp_delivery_initial_seen_asins_json, compare_similar_asin_json, together_asin_json, min_match_asin_json, " \
"variat_num, current_asin, img_list, variat_list, parent_asin, bundles_this_asins_json, video_m3u8_url, result_list_json, bundle_asin_component_json"
d2 = f'0{d2}' if int(d2) < 10 else f'{d2}'
import_table = f"{site_name}_asin_detail_month_{d1}_{d2}"
sql_query = f"""
select
id,
asin,
img_url,
title,
title_len,
price,
rating,
total_comments,
buy_box_seller_type,
page_inventory,
category,
volume,
weight,
rank,
{date_col},
category_state,
img_num,
img_type,
activity_type,
one_two_val,
three_four_val,
five_six_val,
eight_val,
qa_num,
one_star,
two_star,
three_star,
four_star,
five_star,
low_star,
together_asin,
brand,
ac_name,
material,
node_id,
data_type,
sp_num,
{new_col}
from {import_table}
where 1=1
and \$CONDITIONS
"""
# 进行schema和数据校验
CommonUtil.check_schema_before_import(db_type=db_type,
site_name=site_name,
query=sql_query,
hive_tb_name=hive_table,
msg_usr=['chenyuanjie'],
partition_dict=partition_dict)
# 生成导出脚本
import_sh = CommonUtil.build_import_sh(site_name=site_name,
db_type=db_type,
query=sql_query,
hdfs_path=hdfs_path,
map_num=50,
key='id')
# 导入前先删除原始hdfs数据
HdfsUtils.delete_hdfs_file(hdfs_path)
# 创建ssh Client对象--用于执行cmd命令
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, import_sh, ignore_err=False)
# 创建lzo索引和修复元数据
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_table)
# 关闭链接
client.close()
# 导入后检测--检测数据一致性
if date_type != 'month_week':
CommonUtil.check_import_sync_num(db_type=db_type,
partition_dict=partition_dict,
import_query=sql_query,
hive_tb_name=hive_table,
msg_usr=['chenyuanjie'])
# 导入后验证--重点字段阈值预警
CommonUtil.check_fields_and_warning(hive_tb_name=hive_table, partition_dict=partition_dict)
cols = "id, asin, img_url, title, title_len, price, rating, total_comments, buy_box_seller_type, page_inventory, " \
"category, volume, weight, rank, launch_time, created_time as created_at, updated_time as updated_at, " \
"category_state, img_num, img_type, activity_type, one_two_val, three_four_val, five_six_val, eight_val, " \
"qa_num, one_star, two_star, three_star, four_star, five_star, low_star, together_asin, brand, ac_name, " \
"material, node_id, data_type, sp_num, describe, weight_str, package_quantity, pattern_name, follow_sellers, " \
"product_description, buy_sales, image_view, spider_int, lob_asin_json, seller_json, customer_reviews_json, " \
"product_json, product_detail_json, review_ai_text, review_label_json, sp_initial_seen_asins_json, " \
"sp_4stars_initial_seen_asins_json, sp_delivery_initial_seen_asins_json, compare_similar_asin_json, " \
"together_asin_json, min_match_asin_json, variat_num, current_asin, img_list, variat_list, parent_asin, " \
"bundles_this_asins_json, video_m3u8_url, result_list_json, bundle_asin_component_json"
engine = get_remote_engine(
site_name=site_name,
db_type=db_type
)
engine.sqoop_raw_import(
query=f"SELECT {cols} FROM {import_table} WHERE 1=1 and $CONDITIONS",
hive_table=hive_table,
hdfs_path=hdfs_path,
partitions=partition_dict,
m=50,
split_by='id'
)
pass
......@@ -2,16 +2,17 @@ import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
from utils.db_util import DbTypes
from utils.secure_db_client import get_remote_engine
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
assert site_name is not None, "site_name 不能为空!"
import_tb = f"{site_name}_all_syn_st_asin"
db_type = DbTypes.postgresql.name
import_tb = f"{site_name}_all_syn_st_asin"
query = f"""
select asin,
state,
......@@ -20,32 +21,22 @@ if __name__ == '__main__':
where state = 4
and \$CONDITIONS
"""
hive_tb = "ods_asin_err_state"
partition_dict = {
"site_name": site_name
}
hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict=partition_dict)
print(f"hdfs_path is {hdfs_path}")
db_type = DbTypes.postgresql.name
empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type,
site_name=site_name,
query=query,
hive_tb_name=hive_tb,
msg_usr=['wujicang']
)
assert check_flag, f"导入hive表{hive_tb}表结构检查失败!请检查query是否异常!!"
engine = get_remote_engine(
site_name=site_name,
db_type=db_type
)
engine.sqoop_raw_import(
query=query,
hive_table=hive_tb,
hdfs_path=hdfs_path,
partitions=partition_dict
)
if not empty_flag:
sh = CommonUtil.build_import_sh(site_name=site_name,
db_type=db_type,
query=query,
hdfs_path=hdfs_path)
# 导入前先删除
HdfsUtils.delete_hdfs_file(hdfs_path)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb)
client.close()
pass
......@@ -2,75 +2,59 @@ import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
from utils.secure_db_client import get_remote_engine
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
assert site_name is not None, "site_name 不能为空!"
hive_tb = "ods_bs_category"
db_type = "mysql"
import_tb = f"{site_name}_bs_category"
partition_dict = {
"site_name": site_name,
}
hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict=partition_dict)
print(f"hdfs_path is {hdfs_path}")
query = f"""
select
id,
p_id,
ch_name,
en_name,
nodes_num,
path,
is_show,
one_category_id,
and_en_name,
leaf_node,
delete_time,
full_name,
category_id,
category_parent_id,
category_first_id,
category_state,
redirect_flag,
redirect_first_id,
created_at,
updated_at
select
id,
p_id,
ch_name,
en_name,
nodes_num,
path,
is_show,
one_category_id,
and_en_name,
leaf_node,
delete_time,
full_name,
category_id,
category_parent_id,
category_first_id,
category_state,
redirect_flag,
redirect_first_id,
created_at,
updated_at
from {import_tb}
where 1 = 1
and \$CONDITIONS
"""
"""
empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type,
site_name=site_name,
query=query,
hive_tb_name=hive_tb,
msg_usr=['chenyuanjie']
)
assert check_flag, f"导入hive表{hive_tb}表结构检查失败!请检查query是否异常!!"
hive_tb = "ods_bs_category"
partition_dict = {
"site_name": site_name,
}
hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict=partition_dict)
if not empty_flag:
sh = CommonUtil.build_import_sh(site_name=site_name,
db_type=db_type,
query=query,
hdfs_path=hdfs_path)
# 导入前先删除
HdfsUtils.delete_hdfs_file(hdfs_path)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb)
client.close()
engine = get_remote_engine(
site_name=site_name,
db_type=db_type
)
# 导入后检测--检测数据一致性
CommonUtil.check_import_sync_num(db_type=db_type,
partition_dict=partition_dict,
import_query=query,
hive_tb_name=hive_tb,
msg_usr=['chenyuanjie']
)
engine.sqoop_raw_import(
query=query,
hive_table=hive_tb,
hdfs_path=hdfs_path,
partitions=partition_dict
)
pass
......@@ -2,9 +2,9 @@ import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil,DateTypes
from utils.hdfs_utils import HdfsUtils
from utils.common_util import CommonUtil
from utils.secure_db_client import get_remote_engine
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
......@@ -13,7 +13,19 @@ if __name__ == '__main__':
assert site_name is not None, "site_name 不能为空!"
assert date_type is not None, "date_type 不能为空!"
assert date_info is not None, "date_info 不能为空!"
year, week = date_info.split("-")
d1, d2 = CommonUtil.split_month_week_date(date_type, date_info)
d2 = f'0{d2}' if int(d2) < 10 else f'{d2}'
db_type = 'postgresql_14'
import_tb = f"{site_name}_bs_category_asin_detail_month_{d1}_{d2}"
query = f"""
select
id, asin, null as week, best_sellers_rank, created_time as created_at, updated_time as updated_at, last_herf, all_best_sellers_href
from {import_tb}
where 1=1
and \$CONDITIONS
"""
hive_tb = "ods_bs_category_asin_detail"
partition_dict = {
"site_name": site_name,
......@@ -21,77 +33,19 @@ if __name__ == '__main__':
"date_info": date_info,
}
hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict=partition_dict)
print(f"hdfs_path is {hdfs_path}")
if date_type == DateTypes.week.name:
if site_name == "us":
if date_info >= '2023-18':
db_type = "postgresql"
if date_info >= '2023-34':
db_type = 'postgresql_14'
import_tb = f"{site_name}_bs_category_asin_detail_{year}_{week}"
cols = f"id,asin,{week} as week,best_sellers_rank,created_time as created_at,updated_time as updated_at,last_herf,all_best_sellers_href"
params = "1 = 1"
else:
db_type = "mysql"
import_tb = f"{site_name}_bs_category_asin_detail"
cols = "id,asin,week,best_sellers_rank,created_at,updated_at,last_herf"
params = f"week = {int(week)} and DATE_FORMAT(created_at,'%Y') = {year}"
else:
db_type = "postgresql_14"
import_tb = f"{site_name}_bs_category_asin_detail_{year}_{week}"
cols = f"id,asin,{week} as week,best_sellers_rank,created_time as created_at,updated_time as updated_at,last_herf,all_best_sellers_href"
params = "1 = 1"
if date_type == DateTypes.month.name or date_type == DateTypes.month_week.name:
# 日期拆分
d1, d2 = CommonUtil.split_month_week_date(date_type, date_info)
if site_name in ['us', 'uk', 'de']:
db_type = 'postgresql_14'
# pg的分区单位数是带0,如01、02、03
d2 = f'0{d2}' if int(d2) < 10 else f'{d2}'
cols = f"id,asin,null as week,best_sellers_rank,created_time as created_at,updated_time as updated_at,last_herf,all_best_sellers_href"
import_tb = f"{site_name}_bs_category_asin_detail_month_{d1}_{d2}"
params = f" 1=1 "
else:
print(f"其他站点{date_type}数据暂未明确,请检查是否dateType传输有误")
exit()
query = f"""
select
{cols}
from {import_tb}
where {params}
and \$CONDITIONS
"""
empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type,
site_name=site_name,
query=query,
hive_tb_name=hive_tb,
msg_usr=['chenyuanjie']
)
assert check_flag, f"导入hive表{hive_tb}表结构检查失败!请检查query是否异常!!"
if not empty_flag:
sh = CommonUtil.build_import_sh(site_name=site_name,
db_type=db_type,
query=query,
hdfs_path=hdfs_path)
# 导入前先删除
HdfsUtils.delete_hdfs_file(hdfs_path)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb)
client.close()
# 导入后检测--检测数据一致性
CommonUtil.check_import_sync_num(db_type=db_type,
partition_dict=partition_dict,
import_query=query,
hive_tb_name=hive_tb,
msg_usr=['chenyuanjie']
)
engine = get_remote_engine(
site_name=site_name,
db_type=db_type
)
engine.sqoop_raw_import(
query=query,
hive_table=hive_tb,
hdfs_path=hdfs_path,
partitions=partition_dict,
m=50,
split_by='id'
)
pass
......@@ -2,72 +2,69 @@ import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
from utils.secure_db_client import get_remote_engine
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
date_info = CommonUtil.get_sys_arg(2, None)
assert site_name is not None, "sitename 不能为空!"
assert date_info is not None, "date_info 不能为空!"
hive_tb = "ods_bs_category_top100_asin"
partition_dict = {
"site_name": site_name
}
hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict=partition_dict)
print(f"hdfs_path is {hdfs_path}")
db_type = "mysql"
if date_info == 'all':
query = f"""
select id,
asin,
cate_1_id,
cate_current_id,
bsr_rank,
rating,
total_comments,
created_at as updated_at,
date_info,
category_id
select
id,
asin,
cate_1_id,
cate_current_id,
bsr_rank,
rating,
total_comments,
created_at as updated_at,
date_info,
category_id
from {site_name}_bs_category_top100_asin
where 1 = 1
and \$CONDITIONS
"""
pass
"""
else:
query = f"""
select id,
asin,
cate_1_id,
cate_current_id,
bsr_rank,
rating,
total_comments,
created_at as updated_at,
date_info,
category_id
select
id,
asin,
cate_1_id,
cate_current_id,
bsr_rank,
rating,
total_comments,
created_at as updated_at,
date_info,
category_id
from {site_name}_bs_category_top100_asin
where 1 = 1
and date_info = '{date_info}'
and \$CONDITIONS
"""
pass
empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type,
site_name=site_name,
query=query,
hive_tb_name=hive_tb,
msg_usr=['wujicang'])
assert check_flag, f"导入hive表{hive_tb}表结构检查失败!请检查query是否异常!!"
"""
hive_tb = "ods_bs_category_top100_asin"
partition_dict = {
"site_name": site_name
}
hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict=partition_dict)
engine = get_remote_engine(
site_name=site_name,
db_type=db_type
)
engine.sqoop_raw_import(
query=query,
hive_table=hive_tb,
hdfs_path=hdfs_path,
partitions=partition_dict
)
if not empty_flag:
sh = CommonUtil.build_import_sh(site_name=site_name,
db_type=db_type,
query=query,
hdfs_path=hdfs_path)
# 导入前先删除
HdfsUtils.delete_hdfs_file(hdfs_path)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb)
pass
......@@ -2,61 +2,45 @@ import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
from utils.secure_db_client import get_remote_engine
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
assert site_name is not None, "site_name 不能为空!"
hive_tb = "ods_bsr_end"
db_type = "mysql"
import_tb = f"{site_name}_bsr_end"
query = f"""
select
id,
rank,
bsr_name,
created_at,
updated_at,
category_id
from {import_tb}
where 1 = 1
and \$CONDITIONS
"""
hive_tb = "ods_bsr_end"
partition_dict = {
"site_name": site_name,
}
hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict=partition_dict)
print(f"hdfs_path is {hdfs_path}")
query = f"""
select
id,
rank,
bsr_name,
created_at,
updated_at,
category_id
from {import_tb}
where 1 = 1
and \$CONDITIONS
"""
engine = get_remote_engine(
site_name=site_name,
db_type=db_type
)
empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type,
site_name=site_name,
query=query,
hive_tb_name=hive_tb,
msg_usr=['chenyuanjie']
)
assert check_flag, f"导入hive表{hive_tb}表结构检查失败!请检查query是否异常!!"
if not empty_flag:
sh = CommonUtil.build_import_sh(site_name=site_name,
db_type=db_type,
query=query,
hdfs_path=hdfs_path)
# 导入前先删除
HdfsUtils.delete_hdfs_file(hdfs_path)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb)
client.close()
# 导入后检测--检测数据一致性
CommonUtil.check_import_sync_num(db_type=db_type,
partition_dict=partition_dict,
import_query=query,
hive_tb_name=hive_tb,
msg_usr=['chenyuanjie']
)
engine.sqoop_raw_import(
query=query,
hive_table=hive_tb,
hdfs_path=hdfs_path,
partitions=partition_dict
)
pass
......@@ -2,76 +2,69 @@ import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
from utils.secure_db_client import get_remote_engine
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
date_info = CommonUtil.get_sys_arg(2, None)
assert site_name is not None, "sitename 不能为空!"
assert date_info is not None, "date_info 不能为空!"
hive_tb = "ods_new_releases_top100_asin"
partition_dict = {
"site_name": site_name
}
hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict=partition_dict)
print(f"hdfs_path is {hdfs_path}")
db_type = "mysql"
if date_info == 'all':
query = f"""
select id,
asin,
cate_1_id,
cate_current_id,
bsr_rank,
rating,
total_comments,
created_at as updated_at,
date_info,
category_id
select
id,
asin,
cate_1_id,
cate_current_id,
bsr_rank,
rating,
total_comments,
created_at as updated_at,
date_info,
category_id
from {site_name}_new_releases_top100_asin
where 1 = 1
and \$CONDITIONS
"""
pass
"""
else:
query = f"""
select id,
asin,
cate_1_id,
cate_current_id,
bsr_rank,
rating,
total_comments,
created_at as updated_at,
date_info,
category_id
select
id,
asin,
cate_1_id,
cate_current_id,
bsr_rank,
rating,
total_comments,
created_at as updated_at,
date_info,
category_id
from {site_name}_new_releases_top100_asin
where 1 = 1
and date_info = '{date_info}'
and \$CONDITIONS
"""
pass
"""
print("================================sql====================================")
print(query)
db_type = "mysql"
empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type,
site_name=site_name,
query=query,
hive_tb_name=hive_tb,
msg_usr=['wujicang']
)
assert check_flag, f"导入hive表{hive_tb}表结构检查失败!请检查query是否异常!!"
hive_tb = "ods_new_releases_top100_asin"
partition_dict = {
"site_name": site_name
}
hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict=partition_dict)
engine = get_remote_engine(
site_name=site_name,
db_type=db_type
)
engine.sqoop_raw_import(
query=query,
hive_table=hive_tb,
hdfs_path=hdfs_path,
partitions=partition_dict
)
if not empty_flag:
sh = CommonUtil.build_import_sh(site_name=site_name,
db_type=db_type,
query=query,
hdfs_path=hdfs_path)
# 导入前先删除
HdfsUtils.delete_hdfs_file(hdfs_path)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb)
pass
"""
@Author : HuangJian
@Description : 各站点店铺asin详情表-- 月抓取
@SourceTable : us_other_search_term_data_2023_18
@SinkTable : ods_other_search_term_data
@CreateTime : 2022/05/23 09:55
@UpdateTime : 2022/05/23 09:55
"""
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.common_util import DateTypes
from utils.hdfs_utils import HdfsUtils
from utils.common_util import CommonUtil
from utils.secure_db_client import get_remote_engine
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
......@@ -25,96 +14,51 @@ if __name__ == '__main__':
assert date_type is not None, "date_type 不能为空!"
assert date_info is not None, "date_info 不能为空!"
hive_table = f"ods_other_search_term_data"
db_type = 'postgresql_14'
d1, d2 = CommonUtil.split_month_week_date(date_type, date_info)
d2 = f'0{d2}' if int(d2) < 10 else f'{d2}'
import_table = f"{site_name}_other_search_term_month_{d1}_{d2}"
sql_query = f"""
select
id,
search_term,
asin,
page,
buy_data,
label,
created_time,
updated_time,
asin_brand
from {import_table}
where 1=1
and \$CONDITIONS
"""
if site_name == 'us':
map_num = 20
else:
map_num = 5
hive_table = "ods_other_search_term_data"
partition_dict = {
"site_name": site_name,
"date_type": date_type,
"date_info": date_info
}
# 落表路径校验
hdfs_path = CommonUtil.build_hdfs_path(hive_table, partition_dict=partition_dict)
print(f"hdfs_path is {hdfs_path}")
# 日期拆分
d1, d2 = CommonUtil.split_month_week_date(date_type, date_info)
db_type = ''
if date_type == DateTypes.week.name:
d2 = f'0{d2}' if int(d2) < 10 else f'{d2}'
if site_name == 'us' and date_info >= '2023-18':
db_type = 'postgresql'
if date_info >= '2023-34':
db_type = 'postgresql_14'
# pg的分区周单位数是带0,如01、02、03
import_table = f"{site_name}_other_search_term_{d1}_{d2}"
else:
db_type = 'postgresql_14'
import_table = f"{site_name}_other_search_term_{d1}_{d2}"
if date_type == DateTypes.month.name or date_type == DateTypes.month_week.name:
if site_name in ['us', 'uk', 'de']:
db_type = 'postgresql_14'
# pg的分区单位数是带0,如01、02、03
d2 = f'0{d2}' if int(d2) < 10 else f'{d2}'
import_table = f"{site_name}_other_search_term_month_{d1}_{d2}"
else :
print(f"其他站点{date_type}数据暂未明确,请检查是否dateType传输有误")
exit()
assert db_type != '', "未获取到db_type,请检查!"
sql_query = f"""
select
id,
search_term,
asin,
page,
buy_data,
label,
created_time,
updated_time,
asin_brand
from {import_table}
where 1=1
and \$CONDITIONS
"""
# 进行schema和数据校验
if site_name not in ('fr', 'it', 'es'):
CommonUtil.check_schema_before_import(db_type=db_type,
site_name=site_name,
query=sql_query,
hive_tb_name=hive_table,
msg_usr=['fangxingjun','pengyanbing','chenyuanjie']
,partition_dict = partition_dict)
engine = get_remote_engine(
site_name=site_name,
db_type=db_type
)
if site_name == 'us':
map_num = 20
else:
map_num = 5
# 生成导出脚本
import_sh = CommonUtil.build_import_sh(site_name=site_name,
db_type=db_type,
query=sql_query,
hdfs_path=hdfs_path,
map_num=map_num,
key='id'
)
# 导入前先删除原始hdfs数据
HdfsUtils.delete_hdfs_file(hdfs_path)
# 创建ssh Client对象--用于执行cmd命令
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, import_sh, ignore_err=False)
# 创建lzo索引和修复元数据
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_table)
# 关闭链接
client.close()
engine.sqoop_raw_import(
query=sql_query,
hive_table=hive_table,
hdfs_path=hdfs_path,
partitions=partition_dict,
m=map_num,
split_by='id'
)
# 导入后检测--检测同步数据数据量的一致性
CommonUtil.check_import_sync_num(db_type=db_type,
partition_dict=partition_dict,
import_query=sql_query,
hive_tb_name=hive_table,
msg_usr=['fangxingjun','pengyanbing','chenyuanjie'])
\ No newline at end of file
pass
......@@ -2,10 +2,9 @@ import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.common_util import DateTypes
from utils.hdfs_utils import HdfsUtils
from utils.secure_db_client import get_remote_engine
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
......@@ -21,8 +20,6 @@ if __name__ == '__main__':
print("uk站点已无ac类型词,退出执行!")
sys.exit(0)
hive_tb = f"ods_search_term_{st_type}"
if st_type in ["zr", "sp"]:
cols = "search_term,asin,page,page_row,created_time,updated_time,id"
elif st_type in ["sb", "tr"]:
......@@ -30,92 +27,48 @@ if __name__ == '__main__':
else:
cols = "search_term,asin,page,created_time,updated_time,id"
# 日期拆分
db_type = 'postgresql_14'
d1, d2 = CommonUtil.split_month_week_date(date_type, date_info)
if date_type == DateTypes.week.name:
d2 = f'0{d2}' if int(d2) < 10 else f'{d2}'
if site_name == 'us' and date_info >= '2023-18':
db_type = 'postgresql'
# pg的分区周单位数是带0,如01、02、03
if date_info >= '2023-34':
db_type = 'postgresql_14'
import_tb = f"{site_name}_search_term_rank_{st_type}_{d1}_{d2}"
else:
db_type = 'postgresql_14'
import_tb = f"{site_name}_search_term_rank_{st_type}_{d1}_{d2}"
if date_type == DateTypes.month.name or date_type == DateTypes.month_week.name:
if site_name in ['us', 'uk', 'de']:
db_type = 'postgresql_14'
# pg的分区单位数是带0,如01、02、03
d2 = f'0{d2}' if int(d2) < 10 else f'{d2}'
import_tb = f"{site_name}_search_term_rank_{st_type}_month_{d1}_{d2}"
else :
print(f"其他站点{date_type}数据暂未明确,请检查是否dateType传输有误")
exit()
d2 = f'0{d2}' if int(d2) < 10 else f'{d2}'
import_tb = f"{site_name}_search_term_rank_{st_type}_month_{d1}_{d2}"
query = f"""
select {cols}
from {import_tb}
where 1 = 1
and \$CONDITIONS
select {cols} from {import_tb} where 1 = 1 and \$CONDITIONS
"""
print(f"当前链接的数据库为:{db_type},同步的表为:{import_tb}")
hive_tb = f"ods_search_term_{st_type}"
partition_dict = {
"site_name": site_name,
"date_type": date_type,
"date_info": date_info,
}
hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict=partition_dict)
print(f"hdfs_path is {hdfs_path}")
if st_type in ['er', 'tr']:
empty_flag = False
print(f"st_type类型为{st_type},符合不检测类型跳过检测!")
else:
empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type,
site_name=site_name,
query=query,
hive_tb_name=hive_tb,
msg_usr=['fangxingjun','pengyanbing','chenyuanjie'],
partition_dict=partition_dict
)
assert check_flag, f"导入hive表{hive_tb}表结构检查失败!请检查query是否异常!!"
if not empty_flag:
# zr的数据量较大,同步时进行多进程同步
if st_type in ['zr']:
sh = CommonUtil.build_import_sh(site_name=site_name,
db_type=db_type,
query=query,
hdfs_path=hdfs_path,
map_num=10,
key='id')
if st_type == "zr":
if site_name == "us":
map_num = 40
else:
sh = CommonUtil.build_import_sh(site_name=site_name,
db_type=db_type,
query=query,
hdfs_path=hdfs_path)
# 导入前先删除
HdfsUtils.delete_hdfs_file(hdfs_path)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb)
client.close()
# 导入后检测--检测数据一致性
if date_type != 'month_week':
CommonUtil.check_import_sync_num(db_type=db_type,
partition_dict=partition_dict,
import_query=query,
hive_tb_name=hive_tb,
msg_usr=['fangxingjun','pengyanbing','chenyuanjie'])
map_num = 15
elif st_type in ["sb", "sp"]:
if site_name == "us":
map_num = 6
else:
map_num = 2
else:
map_num = 1
engine = get_remote_engine(
site_name=site_name,
db_type=db_type
)
engine.sqoop_raw_import(
query=query,
hive_table=hive_tb,
hdfs_path=hdfs_path,
partitions=partition_dict,
m=map_num,
split_by='id'
)
pass
......@@ -2,61 +2,42 @@ import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
from utils.secure_db_client import get_remote_engine
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
assert site_name is not None, "site_name 不能为空!"
hive_tb = "ods_self_asin"
partition_dict = {
"site_name": site_name,
}
hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict=partition_dict)
print(f"hdfs_path is {hdfs_path}")
db_type = "mysql"
query = f"""
select
id,
asin,
created_at as created_time,
updated_at as updated_time
id,
asin,
created_at as created_time,
updated_at as updated_time
from {site_name}_self_asin
where 1 = 1
and \$CONDITIONS
"""
db_type = "mysql"
empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type,
site_name=site_name,
query=query,
hive_tb_name=hive_tb,
msg_usr=['chenyuanjie']
)
assert check_flag, f"导入hive表{hive_tb}表结构检查失败!请检查query是否异常!!"
if not empty_flag:
sh = CommonUtil.build_import_sh(site_name=site_name,
db_type=db_type,
query=query,
hdfs_path=hdfs_path)
# 导入前先删除
HdfsUtils.delete_hdfs_file(hdfs_path)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb)
client.close()
# 导入后检测--检测数据一致性
CommonUtil.check_import_sync_num(db_type=db_type,
partition_dict=partition_dict,
import_query=query,
hive_tb_name=hive_tb,
msg_usr=['chenyuanjie'])
hive_tb = "ods_self_asin"
partition_dict = {
"site_name": site_name,
}
hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict=partition_dict)
engine = get_remote_engine(
site_name=site_name,
db_type=db_type
)
engine.sqoop_raw_import(
query=query,
hive_table=hive_tb,
hdfs_path=hdfs_path,
partitions=partition_dict
)
pass
......@@ -2,96 +2,89 @@ import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
from utils.db_util import DbTypes
from utils.secure_db_client import get_remote_engine
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
date_type = CommonUtil.get_sys_arg(2, None)
date_info = CommonUtil.get_sys_arg(3, None)
assert site_name is not None, "sitename 不能为空!"
assert site_name is not None, "site_name 不能为空!"
assert date_info is not None, "date_info 不能为空!"
year = CommonUtil.reformat_date(date_info, "%Y-%m-%d", "%Y", )
hive_tb = "ods_self_asin_detail"
partition_dict = {
"site_name": site_name,
"date_type": date_type,
"date_info": date_info,
}
hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict=partition_dict)
print(f"hdfs_path is {hdfs_path}")
db_type = DbTypes.postgresql.name
year = CommonUtil.reformat_date(date_info, "%Y-%m-%d", "%Y", )
query = f"""
select
asin,
img_url,
title,
title_len,
price,
rating,
total_comments,
buy_box_seller_type,
page_inventory,
category,
volume,
weight,
rank,
launch_time,
video_url,
add_url,
material,
created_at,
img_num,
img_type,
qa_num,
brand,
ac_name,
node_id,
sp_num,
mpn,
online_time,
describe,
one_star,
two_star,
three_star,
four_star,
five_star,
low_star,
asin_type,
is_coupon,
search_category,
weight_str,
account_name,
other_seller_name,
account_id
asin,
img_url,
title,
title_len,
price,
rating,
total_comments,
buy_box_seller_type,
page_inventory,
category,
volume,
weight,
rank,
launch_time,
video_url,
add_url,
material,
created_at,
img_num,
img_type,
qa_num,
brand,
ac_name,
node_id,
sp_num,
mpn,
online_time,
describe,
one_star,
two_star,
three_star,
four_star,
five_star,
low_star,
asin_type,
is_coupon,
search_category,
weight_str,
account_name,
other_seller_name,
account_id
from {site_name}_self_asin_detail_{year}
where 1 = 1
and site = '{site_name}'
and bsr_date_info = '{date_info}'
and date_info >= '{date_info}'
and \$CONDITIONS
"""
print("sql ======================================================")
print(query)
db_type = DbTypes.postgresql.name
empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type,
site_name=site_name,
query=query,
hive_tb_name=hive_tb,
msg_usr=['wujicang']
)
assert check_flag, f"导入hive表{hive_tb}表结构检查失败!请检查query是否异常!!"
"""
hive_tb = "ods_self_asin_detail"
partition_dict = {
"site_name": site_name,
"date_type": date_type,
"date_info": date_info,
}
hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict=partition_dict)
engine = get_remote_engine(
site_name=site_name,
db_type=db_type
)
engine.sqoop_raw_import(
query=query,
hive_table=hive_tb,
hdfs_path=hdfs_path,
partitions=partition_dict
)
if not empty_flag:
sh = CommonUtil.build_import_sh(site_name=site_name,
db_type=db_type,
query=query,
hdfs_path=hdfs_path)
# 导入前先删除
HdfsUtils.delete_hdfs_file(hdfs_path)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb)
pass
......@@ -3,22 +3,15 @@ import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
from utils.secure_db_client import get_remote_engine
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
assert site_name is not None, "site_name 不能为空!"
hive_table = "ods_self_asin_related_traffic"
partition_dict = {"site_name": site_name}
hdfs_path = CommonUtil.build_hdfs_path(hive_table, partition_dict=partition_dict)
print(f"hdfs_path is {hdfs_path}")
db_type = 'mysql'
import_table = f"{site_name}_self_asin_detail"
sql_query = f"""
select
id,
......@@ -37,17 +30,29 @@ if __name__ == '__main__':
and \$CONDITIONS
"""
# 生成导出脚本
import_sh = CommonUtil.build_import_sh(
site_name=site_name, db_type=db_type, query=sql_query, hdfs_path=hdfs_path, map_num=25, key='id'
hive_table = "ods_self_asin_related_traffic"
partition_dict = {
"site_name": site_name
}
hdfs_path = CommonUtil.build_hdfs_path(hive_table, partition_dict=partition_dict)
if site_name == 'us':
map_num = 25
else:
map_num = 1
engine = get_remote_engine(
site_name=site_name,
db_type=db_type
)
engine.sqoop_raw_import(
query=sql_query,
hive_table=hive_table,
hdfs_path=hdfs_path,
partitions=partition_dict,
m=map_num,
split_by='id'
)
# 导入前先删除原始hdfs数据
HdfsUtils.delete_hdfs_file(hdfs_path)
# 创建ssh Client对象--用于执行cmd命令
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, import_sh, ignore_err=False)
# 创建lzo索引和修复元数据
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_table)
# 关闭链接
client.close()
pass
......@@ -2,11 +2,10 @@ import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.common_util import DateTypes
from utils.hdfs_utils import HdfsUtils
from utils.secure_db_client import get_remote_engine
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
......@@ -16,17 +15,6 @@ if __name__ == '__main__':
assert date_type is not None, "date_type 不能为空!"
assert date_info is not None, "date_info 不能为空!"
hive_table = f"ods_seller_account_feedback"
partition_dict = {
"site_name": site_name,
"date_type": date_type,
"date_info": date_info
}
# 落表路径校验
hdfs_path = CommonUtil.build_hdfs_path(hive_table, partition_dict=partition_dict)
print(f"hdfs_path is {hdfs_path}")
suffix = str(date_info).replace("-", "_")
import_table = f"{site_name}_seller_account_feedback_{suffix}"
if date_type == DateTypes.month.name and date_info >= '2023-08':
......@@ -51,24 +39,24 @@ if __name__ == '__main__':
and \$CONDITIONS
"""
# 进行schema和数据校验
CommonUtil.check_schema_before_import(db_type=db_type,
site_name=site_name,
query=sql_query,
hive_tb_name=hive_table,
msg_usr=['chenyuanjie'])
hive_table = "ods_seller_account_feedback"
partition_dict = {
"site_name": site_name,
"date_type": date_type,
"date_info": date_info
}
hdfs_path = CommonUtil.build_hdfs_path(hive_table, partition_dict=partition_dict)
engine = get_remote_engine(
site_name=site_name,
db_type=db_type
)
engine.sqoop_raw_import(
query=sql_query,
hive_table=hive_table,
hdfs_path=hdfs_path,
partitions=partition_dict
)
# 生成导出脚本
import_sh = CommonUtil.build_import_sh(site_name=site_name,
db_type=db_type,
query=sql_query,
hdfs_path=hdfs_path)
# 导入前先删除原始hdfs数据
HdfsUtils.delete_hdfs_file(hdfs_path)
# 创建ssh Client对象--用于执行cmd命令
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, import_sh, ignore_err=False)
# 创建lzo索引和修复元数据
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_table)
# 关闭链接
client.close()
pass
"""
@Author : HuangJian
@Description : 各站点店铺名称与店铺id关系全量表--传参为单站点
@SourceTable : us_seller_account_feedback
@SinkTable : ods_seller_account_feedback
@CreateTime : 2022/05/19 14:55
@UpdateTime : 2022/05/19 14:55
"""
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.db_util import DBUtil
from utils.common_util import CommonUtil
from utils.common_util import DateTypes
from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil
from utils.common_util import CommonUtil
from utils.secure_db_client import get_remote_engine
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
assert site_name is not None, "site_name 不能为空!"
hive_table = f"ods_seller_account_syn"
db_type = 'mysql'
import_table = f"{site_name}_seller_account_syn_distinct"
sql_query = f"""
select
id,
account_name,
url,
state,
created_at,
updated_at,
seller_id
from {import_table}
where 1=1
and \$CONDITIONS
"""
hive_table = "ods_seller_account_syn"
partition_dict = {
"site_name": site_name
}
# 落表路径校验
hdfs_path = CommonUtil.build_hdfs_path(hive_table, partition_dict=partition_dict)
print(f"hdfs_path is {hdfs_path}")
import_table = f"{site_name}_seller_account_syn_distinct"
db_type = 'mysql'
sql_query = f"""
select
id,
account_name,
url,
state,
created_at,
updated_at,
seller_id
from {import_table}
where 1=1
and \$CONDITIONS
"""
engine = get_remote_engine(
site_name=site_name,
db_type=db_type
)
# 进行schema和数据校验
CommonUtil.check_schema_before_import(db_type=db_type,
site_name=site_name,
query=sql_query,
hive_tb_name=hive_table,
msg_usr=['chenyuanjie'])
engine.sqoop_raw_import(
query=sql_query,
hive_table=hive_table,
hdfs_path=hdfs_path,
partitions=partition_dict
)
# 生成导出脚本
import_sh = CommonUtil.build_import_sh(site_name=site_name,
db_type=db_type,
query=sql_query,
hdfs_path=hdfs_path)
# 导入前先删除原始hdfs数据
HdfsUtils.delete_hdfs_file(hdfs_path)
# 创建ssh Client对象--用于执行cmd命令
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, import_sh, ignore_err=False)
# 创建lzo索引和修复元数据
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_table)
# 关闭链接
client.close()
pass
"""
@Author : HuangJian
@Description : 各站点店铺名称与asin关系全量表--传参为单站点
@SourceTable : us_seller_account_feedback
@SinkTable : ods_seller_account_feedback
@CreateTime : 2022/05/19 14:55
@UpdateTime : 2022/05/19 14:55
"""
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.db_util import DBUtil
from utils.common_util import CommonUtil
from utils.common_util import DateTypes
from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil
from utils.common_util import CommonUtil
from utils.secure_db_client import get_remote_engine
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
assert site_name is not None, "site_name 不能为空!"
hive_table = f"ods_seller_asin_account"
db_type = 'mysql'
import_table = f"{site_name}_seller_asin_account"
sql_query = f"""
select
id,
account_name,
asin,
created_at,
updated_at,
seller_id
from {import_table}
where 1=1
and \$CONDITIONS
"""
hive_table = "ods_seller_asin_account"
partition_dict = {
"site_name": site_name
}
# 落表路径校验
hdfs_path = CommonUtil.build_hdfs_path(hive_table, partition_dict=partition_dict)
print(f"hdfs_path is {hdfs_path}")
import_table = f"{site_name}_seller_asin_account"
db_type = 'mysql'
sql_query = f"""
select
id,
account_name,
asin,
created_at,
updated_at,
seller_id
from {import_table}
where 1=1
and \$CONDITIONS
"""
# 进行schema和数据校验
CommonUtil.check_schema_before_import(db_type=db_type,
site_name=site_name,
query=sql_query,
hive_tb_name=hive_table,
msg_usr=['chenyuanjie'])
# 生成导出脚本
import_sh = CommonUtil.build_import_sh(site_name=site_name,
db_type=db_type,
query=sql_query,
hdfs_path=hdfs_path,
map_num=10,
key='id')
# 导入前先删除原始hdfs数据
HdfsUtils.delete_hdfs_file(hdfs_path)
# 创建ssh Client对象--用于执行cmd命令
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, import_sh, ignore_err=False)
# 创建lzo索引和修复元数据
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_table)
# 关闭链接
client.close()
if site_name == 'us':
map_num = 100
else:
map_num = 40
engine = get_remote_engine(
site_name=site_name,
db_type=db_type
)
engine.sqoop_raw_import(
query=sql_query,
hive_table=hive_table,
hdfs_path=hdfs_path,
partitions=partition_dict,
m=map_num,
split_by='id'
)
pass
"""
@Author : HuangJian
@Description : 各站点店铺asin详情表-- 月抓取
@SourceTable : us_asin_detail_product_2023
@SinkTable : ods_asin_detail_product
@CreateTime : 2022/05/19 14:55
@UpdateTime : 2022/05/19 14:55
"""
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.db_util import DBUtil
from utils.common_util import CommonUtil
from utils.common_util import DateTypes
from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil
from utils.secure_db_client import get_remote_engine
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
......@@ -29,21 +17,8 @@ if __name__ == '__main__':
# 该表现在为月同步表,因此增加月类型校验
assert date_type == DateTypes.month.name, "date_type类型不对,应为month"
hive_table = f"ods_asin_detail_product"
partition_dict = {
"site_name": site_name,
"date_type": date_type,
"date_info": date_info
}
# 落表路径校验
hdfs_path = CommonUtil.build_hdfs_path(hive_table, partition_dict=partition_dict)
print(f"hdfs_path is {hdfs_path}")
# 日期拆分
suffix = str(date_info).replace("-", "_")
import_table = f"{site_name}_seller_asin_product_{suffix}"
# db_type = 'postgresql'
if date_type == DateTypes.month.name and date_info >= '2023-08':
db_type = 'postgresql_14'
else:
......@@ -51,44 +26,51 @@ if __name__ == '__main__':
print("当前链接的数据库为:", db_type)
sql_query = f"""
select
id,
null as account_id,
asin,
title,
img_url,
price,
rating,
total_comments,
null as week,
row_num,
created_at,
updated_at,
null as month,
seller_id
from {import_table}
where 1=1
and \$CONDITIONS
"""
select
id,
null as account_id,
asin,
title,
img_url,
price,
rating,
total_comments,
null as week,
row_num,
created_at,
updated_at,
null as month,
seller_id
from {import_table}
where 1=1
and \$CONDITIONS
"""
hive_table = "ods_asin_detail_product"
partition_dict = {
"site_name": site_name,
"date_type": date_type,
"date_info": date_info
}
hdfs_path = CommonUtil.build_hdfs_path(hive_table, partition_dict=partition_dict)
if site_name == 'us':
map_num = 8
else:
map_num = 3
engine = get_remote_engine(
site_name=site_name,
db_type=db_type
)
# 进行schema和数据校验
CommonUtil.check_schema_before_import(db_type=db_type,
site_name=site_name,
query=sql_query,
hive_tb_name=hive_table,
msg_usr=['chenyuanjie'])
engine.sqoop_raw_import(
query=sql_query,
hive_table=hive_table,
hdfs_path=hdfs_path,
partitions=partition_dict,
m=map_num,
split_by='id'
)
# 生成导出脚本
import_sh = CommonUtil.build_import_sh(site_name=site_name,
db_type=db_type,
query=sql_query,
hdfs_path=hdfs_path)
# 导入前先删除原始hdfs数据
HdfsUtils.delete_hdfs_file(hdfs_path)
# 创建ssh Client对象--用于执行cmd命令
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, import_sh, ignore_err=False)
# 创建lzo索引和修复元数据
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_table)
# 关闭链接
client.close()
pass
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.common_util import DateTypes
from utils.hdfs_utils import HdfsUtils
from utils.common_util import CommonUtil
from utils.secure_db_client import get_remote_engine
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
......@@ -17,75 +14,51 @@ if __name__ == '__main__':
assert date_type is not None, "date_type 不能为空!"
assert date_info is not None, "date_info 不能为空!"
hive_table = f"ods_st_quantity_being_sold"
db_type = 'postgresql_14'
d1, d2 = CommonUtil.split_month_week_date(date_type, date_info)
d2 = f'0{d2}' if int(d2) < 10 else f'{d2}'
import_table = f"{site_name}_brand_analytics_month_{d1}_{d2}"
sql_query = f"""
select
id,
search_term,
quantity_being_sold,
date_info as date_flag,
created_time,
updated_time,
quantity_being_sold_str,
result_count,
departments
from {import_table}
where 1=1
and \$CONDITIONS
"""
hive_table = "ods_st_quantity_being_sold"
partition_dict = {
"site_name": site_name,
"date_type": date_type,
"date_info": date_info
}
# 落表路径校验
hdfs_path = CommonUtil.build_hdfs_path(hive_table, partition_dict=partition_dict)
print(f"hdfs_path is {hdfs_path}")
# 日期拆分
d1, d2 = CommonUtil.split_month_week_date(date_type, date_info)
if date_type == DateTypes.week.name:
d2 = f'0{d2}' if int(d2) < 10 else f'{d2}'
if site_name == 'us' and date_info >= '2023-18':
db_type = 'postgresql'
if date_info >= '2023-34':
db_type = 'postgresql_14'
else:
db_type = 'postgresql_14'
import_table = f"{site_name}_brand_analytics_{d1}_{d2}"
if site_name == 'us':
map_num = 4
else:
map_num = 1
if date_type == DateTypes.month.name or date_type == DateTypes.month_week.name:
if site_name in ['us', 'uk', 'de']:
db_type = 'postgresql_14'
# pg的分区单位数是带0,如01、02、03
d2 = f'0{d2}' if int(d2) < 10 else f'{d2}'
import_table = f"{site_name}_brand_analytics_month_{d1}_{d2}"
else :
print(f"其他站点{date_type}数据暂未明确,请检查是否dateType传输有误")
exit()
sql_query = f"""
select
id,
search_term,
quantity_being_sold,
date_info as date_flag,
created_time,
updated_time,
quantity_being_sold_str,
result_count,
departments
from {import_table}
where 1=1
and \$CONDITIONS
"""
engine = get_remote_engine(
site_name=site_name,
db_type=db_type
)
# 进行schema和数据校验
if site_name not in ('fr', 'it', 'es'):
CommonUtil.check_schema_before_import(db_type=db_type,
site_name=site_name,
query=sql_query,
hive_tb_name=hive_table,
msg_usr=['fangxingjun','chenyuanjie'])
engine.sqoop_raw_import(
query=sql_query,
hive_table=hive_table,
hdfs_path=hdfs_path,
partitions=partition_dict,
m=map_num,
split_by='id'
)
# 生成导出脚本
import_sh = CommonUtil.build_import_sh(site_name=site_name,
db_type=db_type,
query=sql_query,
hdfs_path=hdfs_path)
# 导入前先删除原始hdfs数据
HdfsUtils.delete_hdfs_file(hdfs_path)
# 创建ssh Client对象--用于执行cmd命令
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, import_sh, ignore_err=False)
# 创建lzo索引和修复元数据
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_table)
# 关闭链接
client.close()
pass
......@@ -2,62 +2,41 @@ import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
from utils.secure_db_client import get_remote_engine
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
assert site_name is not None, "site_name 不能为空!"
hive_tb = "ods_theme"
db_type = "mysql"
partition_dict = {
"site_name": site_name
}
hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict=partition_dict)
print(f"hdfs_path is {hdfs_path}")
import_tb = f"{site_name}_theme"
cols = "id,theme_type_en,theme_type_ch,theme_en,theme_ch,created_at,updated_at"
cols = "id, theme_type_en, theme_type_ch, theme_en, theme_ch, created_at, updated_at"
query = f"""
select
{cols}
{cols}
from {import_tb}
where 1 = 1
and \$CONDITIONS
"""
empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type,
site_name=site_name,
query=query,
hive_tb_name=hive_tb,
msg_usr=['chenyuanjie']
)
assert check_flag, f"导入hive表{hive_tb}表结构检查失败!请检查query是否异常!!"
if not empty_flag:
sh = CommonUtil.build_import_sh(site_name=site_name,
db_type=db_type,
query=query,
hdfs_path=hdfs_path)
# 导入前先删除
HdfsUtils.delete_hdfs_file(hdfs_path)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb)
client.close()
# 导入后检测--检测数据一致性
CommonUtil.check_import_sync_num(db_type=db_type,
partition_dict=partition_dict,
import_query=query,
hive_tb_name=hive_tb,
msg_usr=['chenyuanjie'])
hive_tb = "ods_theme"
partition_dict = {
"site_name": site_name
}
hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict=partition_dict)
engine = get_remote_engine(
site_name=site_name,
db_type=db_type
)
engine.sqoop_raw_import(
query=query,
hive_table=hive_tb,
hdfs_path=hdfs_path,
partitions=partition_dict
)
pass
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment