Commit b9365511 by chenyuanjie

no message

parent a9d0da5f
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
date_type = CommonUtil.get_sys_arg(2, None)
date_info = CommonUtil.get_sys_arg(3, None)
assert site_name is not None, "site_name 不能为空!"
assert date_type is not None, "date_type 不能为空!"
assert date_info is not None, "date_info 不能为空!"
hive_table = f"ods_merchantwords_brand_analytics"
partition_dict = {
"site_name": site_name,
"date_type": date_type,
"date_info": date_info
}
# 落表路径校验
hdfs_path = CommonUtil.build_hdfs_path(hive_table, partition_dict=partition_dict)
print(f"hdfs_path is {hdfs_path}")
year, month, day = date_info.split("-")
db_type = 'postgresql_16'
import_table = f"{site_name}_merchantwords_brand_analytics_{year}_{month}_{day}"
sql_query = f"""
select
id,
search_term,
quantity_being_sold,
created_time,
updated_time,
quantity_being_sold_str,
result_count,
departments
from {import_table}
where 1=1
and \$CONDITIONS
"""
# 进行schema和数据校验
CommonUtil.check_schema_before_import(db_type=db_type,
site_name=site_name,
query=sql_query,
hive_tb_name=hive_table,
msg_usr=['chenyuanjie'],
partition_dict=partition_dict)
# 生成导出脚本
import_sh = CommonUtil.build_import_sh(site_name=site_name,
db_type=db_type,
query=sql_query,
hdfs_path=hdfs_path,
map_num=1,
key='id'
)
HdfsUtils.delete_hdfs_file(hdfs_path)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, import_sh, ignore_err=False)
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_table)
client.close()
CommonUtil.check_import_sync_num(db_type=db_type,
partition_dict=partition_dict,
import_query=sql_query,
hive_tb_name=hive_table,
msg_usr=['chenyuanjie'])
\ No newline at end of file
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
date_type = CommonUtil.get_sys_arg(2, None)
date_info = CommonUtil.get_sys_arg(3, None)
assert site_name is not None, "site_name 不能为空!"
assert date_type is not None, "date_type 不能为空!"
assert date_info is not None, "date_info 不能为空!"
hive_table = f"ods_merchantwords_other_search_term_data"
partition_dict = {
"site_name": site_name,
"date_type": date_type,
"date_info": date_info
}
# 落表路径校验
hdfs_path = CommonUtil.build_hdfs_path(hive_table, partition_dict=partition_dict)
print(f"hdfs_path is {hdfs_path}")
year, month, day = date_info.split("-")
db_type = 'postgresql_16'
import_table = f"{site_name}_merchantwords_other_search_term_{year}_{month}_{day}"
sql_query = f"""
select
id,
search_term,
asin,
page,
buy_data,
label,
created_time,
updated_time
from {import_table}
where 1=1
and \$CONDITIONS
"""
# 进行schema和数据校验
CommonUtil.check_schema_before_import(db_type=db_type,
site_name=site_name,
query=sql_query,
hive_tb_name=hive_table,
msg_usr=['chenyuanjie'],
partition_dict=partition_dict)
import_sh = CommonUtil.build_import_sh(site_name=site_name,
db_type=db_type,
query=sql_query,
hdfs_path=hdfs_path,
map_num=35,
key='id'
)
HdfsUtils.delete_hdfs_file(hdfs_path)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, import_sh, ignore_err=False)
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_table)
client.close()
\ No newline at end of file
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
st_type = CommonUtil.get_sys_arg(2, None)
date_type = CommonUtil.get_sys_arg(3, None)
date_info = CommonUtil.get_sys_arg(4, None)
assert site_name is not None, "site_name 不能为空!"
assert st_type is not None, "st_type 不能为空!"
assert date_type is not None, "date_type 不能为空!"
assert date_info is not None, "date_info 不能为空!"
if site_name == 'us':
if date_info == '2024-05-05':
if st_type == "bs":
quit()
elif date_info in ['2024-06-06', '2024-06-07', '2024-06-08']:
if st_type == "hr":
quit()
hive_tb = f"ods_merchantwords_search_term_{st_type}"
partition_dict = {
"site_name": site_name,
"date_type": date_type,
"date_info": date_info
}
if st_type in ["zr", "sp"]:
cols = "search_term,asin,page,page_row,id,created_time,updated_time"
elif st_type in ["sb"]:
cols = "search_term,asin,page,data_type,id,created_time,updated_time"
else:
cols = "search_term,asin,page,created_time,updated_time"
db_type = 'postgresql_16'
year, month, day = date_info.split("-")
import_tb = f"{site_name}_merchantwords_search_term_rank_{st_type}_{year}_{month}_{day}"
query = f"""
select
{cols}
from
{import_tb}
where 1 = 1
and \$CONDITIONS
"""
print(f"当前同步的表为:{import_tb}")
hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict=partition_dict)
print(f"hdfs_path is {hdfs_path}")
empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type,
site_name=site_name,
query=query,
hive_tb_name=hive_tb,
msg_usr=['chenyuanjie'],
partition_dict=partition_dict
)
assert check_flag, f"导入hive表{hive_tb}表结构检查失败!请检查query是否异常!!"
if not empty_flag:
if st_type in ["zr", "sp", "sb"]:
if st_type == "zr":
map_num = 50
elif st_type == "sp":
map_num = 25
else:
map_num = 15
sh = CommonUtil.build_import_sh(site_name=site_name,
db_type=db_type,
query=query,
hdfs_path=hdfs_path,
map_num=map_num,
key="id")
else:
sh = CommonUtil.build_import_sh(site_name=site_name,
db_type=db_type,
query=query,
hdfs_path=hdfs_path)
HdfsUtils.delete_hdfs_file(hdfs_path)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb)
client.close()
pass
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment