Commit 45f83a44 by chenyuanjie

no message

parent d13750ba
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
if __name__ == '__main__':
hive_table = f"ods_merchantwords_asin_detail"
partition_dict = {
"site_name": 'us',
"date_type": 'day',
"date_info": '2024-06-24'
}
hdfs_path = CommonUtil.build_hdfs_path(hive_table, partition_dict=partition_dict)
print(f"hdfs_path is {hdfs_path}")
db_type = 'postgresql_16'
import_table = "us_merchantwords_search_term_asin_detail"
sql_query = f"""
select
id,
asin,
title,
img,
price,
rating,
reviews,
created_time,
updated_time
from {import_table}
where 1=1
and \$CONDITIONS
"""
# 进行schema和数据校验
CommonUtil.check_schema_before_import(db_type=db_type,
site_name='us',
query=sql_query,
hive_tb_name=hive_table,
msg_usr=['chenyuanjie'],
partition_dict=partition_dict)
# 生成导出脚本
import_sh = CommonUtil.build_import_sh(site_name='us',
db_type=db_type,
query=sql_query,
hdfs_path=hdfs_path,
map_num=50,
key='id'
)
HdfsUtils.delete_hdfs_file(hdfs_path)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, import_sh, ignore_err=False)
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_table)
client.close()
\ No newline at end of file
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
if __name__ == '__main__':
site_name = 'us'
hive_table = f"ods_merchantwords_asin_detail"
partition_dict = {
"site_name": site_name,
"date_type": 'day',
"date_info": '2024-04-24-test'
}
# 落表路径校验
hdfs_path = CommonUtil.build_hdfs_path(hive_table, partition_dict=partition_dict)
print(f"hdfs_path is {hdfs_path}")
import_table = "us_merchantwords_search_term_asin_detail_2024_04_24"
sql_query = f"""
select
id,
asin,
title,
img,
price,
rating,
reviews,
created_time,
updated_time
from {import_table}
where 1=1
and \$CONDITIONS
"""
# 生成导出脚本
import_sh = f"""
/opt/module/sqoop-1.4.6/bin/sqoop yswg_import -D mapred.job.queue.name=default -D mapred.task.timeout=0 --append \\
--connect "jdbc:mysql://192.168.10.151:19030/test" \\
--username "chenyuanjie" \\
--password "chenyuanjie12345" \\
--target-dir {hdfs_path} \\
--mapreduce-job-name "sqoop_task: sr_to_hive_by_sqoop_test" \\
--query "{sql_query}" \\
--fields-terminated-by '\\t' \\
--hive-drop-import-delims \\
--null-string '\\\\N' \\
--null-non-string '\\\\N' \\
--compress \\
-m 50 \\
--split-by "id" \\
--compression-codec lzop \\
--outdir "/tmp/sqoop/"
"""
HdfsUtils.delete_hdfs_file(hdfs_path)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, import_sh, ignore_err=False)
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_table)
client.close()
\ No newline at end of file
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
if __name__ == '__main__':
hive_table = f"ods_merchantwords_brand_analytics"
partition_dict = {
"site_name": 'us',
"date_type": 'day',
"date_info": '2024-05-06'
}
hdfs_path = CommonUtil.build_hdfs_path(hive_table, partition_dict=partition_dict)
print(f"hdfs_path is {hdfs_path}")
db_type = 'postgresql_16'
import_table = "us_merchantwords_brand_analytics_2024"
sql_query = f"""
select
id,
search_term,
quantity_being_sold,
created_time,
updated_time
from {import_table}
where date_info < '2024-06-14'
and \$CONDITIONS
"""
# 进行schema和数据校验
CommonUtil.check_schema_before_import(db_type=db_type,
site_name='us',
query=sql_query,
hive_tb_name=hive_table,
msg_usr=['chenyuanjie'],
partition_dict=partition_dict)
# 生成导出脚本
import_sh = CommonUtil.build_import_sh(site_name='us',
db_type=db_type,
query=sql_query,
hdfs_path=hdfs_path,
map_num=50,
key='id'
)
HdfsUtils.delete_hdfs_file(hdfs_path)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, import_sh, ignore_err=False)
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_table)
client.close()
\ No newline at end of file
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
if __name__ == '__main__':
hive_table = f"ods_merchantwords_other_search_term_data"
partition_dict = {
"site_name": 'us',
"date_type": 'day',
"date_info": '2024-05-06'
}
hdfs_path = CommonUtil.build_hdfs_path(hive_table, partition_dict=partition_dict)
print(f"hdfs_path is {hdfs_path}")
db_type = 'postgresql_16'
import_table = "us_merchantwords_other_search_term"
sql_query = f"""
select
id,
search_term,
asin,
page,
buy_data,
label,
created_time,
updated_time
from {import_table}
where 1=1
and \$CONDITIONS
"""
# 进行schema和数据校验
CommonUtil.check_schema_before_import(db_type=db_type,
site_name='us',
query=sql_query,
hive_tb_name=hive_table,
msg_usr=['chenyuanjie'],
partition_dict=partition_dict)
import_sh = CommonUtil.build_import_sh(site_name='us',
db_type=db_type,
query=sql_query,
hdfs_path=hdfs_path,
map_num=35,
key='id'
)
HdfsUtils.delete_hdfs_file(hdfs_path)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, import_sh, ignore_err=False)
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_table)
client.close()
\ No newline at end of file
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
if __name__ == '__main__':
st_type_list = ['zr', 'sb', 'sp', 'ac', 'bs', 'hr']
for st_type in st_type_list:
hive_tb = f"ods_merchantwords_search_term_{st_type}"
partition_dict = {
"site_name": 'us',
"date_type": 'day',
"date_info": '2024-05-06'
}
if st_type in ["zr", "sp"]:
cols = "search_term,asin,page,page_row,id,created_time,updated_time"
elif st_type in ["sb"]:
cols = "search_term,asin,page,data_type,id,created_time,updated_time"
else:
cols = "search_term,asin,page,created_time,updated_time"
db_type = 'postgresql_16'
import_tb = f"us_merchantwords_search_term_rank_{st_type}"
query = f"""
select
{cols}
from
{import_tb}
where 1 = 1
and \$CONDITIONS
"""
hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict=partition_dict)
print(f"hdfs_path is {hdfs_path}")
empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type,
site_name='us',
query=query,
hive_tb_name=hive_tb,
msg_usr=['chenyuanjie'],
partition_dict=partition_dict
)
assert check_flag, f"导入hive表{hive_tb}表结构检查失败!请检查query是否异常!!"
if not empty_flag:
if st_type in ["zr", "sp", "sb"]:
if st_type == "zr":
map_num = 50
elif st_type == "sp":
map_num = 25
else:
map_num = 15
sh = CommonUtil.build_import_sh(site_name='us',
db_type=db_type,
query=query,
hdfs_path=hdfs_path,
map_num=map_num,
key="id")
else:
sh = CommonUtil.build_import_sh(site_name='us',
db_type=db_type,
query=query,
hdfs_path=hdfs_path)
HdfsUtils.delete_hdfs_file(hdfs_path)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb)
client.close()
pass
pass
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment