Commit f9e42a8c by wangjing

no message

parent 354b6de6
......@@ -6,16 +6,18 @@ sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.db_util import DBUtil, DbTypes
from utils.spark_util import SparkUtil
from utils.hdfs_utils import HdfsUtils
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
date_type = CommonUtil.get_sys_arg(2, None)
date_info = CommonUtil.get_sys_arg(3, None)
CommonUtil.judge_is_work_hours(
site_name=site_name, date_type=date_type, date_info=date_info,
principal='hejiangming', priority=1, export_tools_type=1, belonging_to_process='ABA周增长'
)
# 不写入到PG表晚上执行 立马执行
# CommonUtil.judge_is_work_hours(
# site_name=site_name, date_type=date_type, date_info=date_info,
# principal='hejiangming', priority=1, export_tools_type=1, belonging_to_process='ABA周增长'
# )
db_type = DbTypes.postgresql_cluster.name
print("导出到PG集群中")
......@@ -130,9 +132,64 @@ if __name__ == '__main__':
cp_index_flag=True,
)
# 第2步:Spark SQL 从 dim_st_detail_week 写入 dwt_st_base_report
print("================================写入周趋势数据到Hive================================")
spark = SparkUtil.get_spark_session(f"export_dim_st_detail_week_trend:{site_name} {date_type} {date_info}")
dwt_hive_tb = "dwt_st_base_report"
dwt_hdfs_path = f"/home/big_data_selection/dwt/{dwt_hive_tb}/site_name={site_name}/date_type=week/date_info={date_info}"
print(f"清除hdfs目录中数据:{dwt_hdfs_path}")
HdfsUtils.delete_hdfs_file(dwt_hdfs_path)
spark.sql(f"""
INSERT INTO TABLE {dwt_hive_tb} PARTITION(site_name='{site_name}', date_type='week', date_info='{date_info}')
SELECT
st_key,
search_term,
search_volume as st_volume,
orders as st_orders,
{year} as years,
current_timestamp() as created_time,
current_timestamp() as updated_time,
`rank` as st_rank
FROM dim_st_detail_week
WHERE site_name='{site_name}' AND date_type='week' AND date_info='{date_info}'
""")
spark.stop()
print("================================写入周趋势数据完成================================")
# 第3步:Sqoop 导出 dwt_st_base_report 到 PG 趋势表
print("================================导出周趋势图到PG集群================================")
trend_master_tb = f"{site_name}_aba_last_total_week"
trend_year_next = str(year + 1)
trend_export_tb = f"{trend_master_tb}_{year_str}"
trend_sql = f"""
create table if not exists {trend_export_tb} partition of {trend_master_tb} for values from ('{year_str}') to ('{trend_year_next}');
delete from {trend_export_tb} where date_info = '{date_info}';
"""
DBUtil.engine_exec_sql(engine, trend_sql)
trend_columns = ["st_key", "search_term", "st_volume", "st_rank", "st_orders", "years",
"created_time", "updated_time", "date_type", "date_info"]
trend_sh = CommonUtil.build_export_sh(
site_name=site_name,
db_type=db_type,
hive_tb=dwt_hive_tb,
export_tb=trend_export_tb,
col=trend_columns,
partition_dict={
"site_name": site_name,
"date_type": "week",
"date_info": date_info
}
)
trend_client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(trend_client, trend_sh, ignore_err=False)
trend_client.close()
print("================================周趋势图导出完成================================")
# 插入流程记录表
sql = f"""
REPLACE INTO selection.workflow_everyday
REPLACE INTO selection.workflow_everyday
(site_name, report_date, status, status_val, table_name, date_type, page, is_end, remark, export_db_type)
VALUES
('{site_name}', '{date_info}', '导出PG数据库', 14, '{site_name}_aba_report_week', 'week', 'ABA搜索词周报告', '是', 'ABA搜索词周报告表', 'postgresql_cluster');
......@@ -140,3 +197,5 @@ if __name__ == '__main__':
DBUtil.engine_exec_sql(DBUtil.get_db_engine('mysql', 'us'), sql)
print("success")
pass
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment