Commit e56a458e by fangxingjun

no message

parent 96bba449
......@@ -110,9 +110,17 @@ class DwdNsrBsrKeepaAsin(Templates):
self.df_asin_nsr = self.spark.sql(sqlQuery=sql_nsr).cache()
self.df_asin_nsr.show(10, truncate=False)
print(f"1.3 读取最近30天的keepa的asin")
sql_keepa = f"select distinct(asin), date_info, 3 as asin_cate_flag, null as category_id from ods_keepa_finder_asin where site_name='{self.site_name}' and date_info between '{thirty_days_ago}' and '{self.date_info}'"
sql_keepa = f"""
select distinct(asin), date_info, 3 as asin_cate_flag, null as category_id from ods_keepa_finder_asin where site_name='{self.site_name}' and date_info between '{thirty_days_ago}' and '{self.date_info}'
union all
select distinct(asin), date_info, 4 as asin_cate_flag, null as category_id from ods_keepa_finder_task_asin where site_name='{self.site_name}'
"""
if self.date_type == 'month':
sql_keepa = f"select asin, date_info, 3 as asin_cate_flag, null as category_id from ods_keepa_finder_asin where site_name='{self.site_name}' and date_info in {self.date_info_tuple}"
sql_keepa = f"""
select asin, date_info, 3 as asin_cate_flag, null as category_id from ods_keepa_finder_asin where site_name='{self.site_name}' and date_info in {self.date_info_tuple}
union all
select distinct(asin), date_info, 4 as asin_cate_flag, null as category_id from ods_keepa_finder_task_asin where site_name='{self.site_name}' and date_info in {self.date_info_tuple}
"""
print("sql_keepa:", sql_keepa)
self.df_asin_keepa = self.spark.sql(sqlQuery=sql_keepa).cache()
self.df_asin_keepa.show(10, truncate=False)
......
......@@ -72,7 +72,8 @@ class DolphinschedulerHelper(object):
:param warning_Type: 警告类型 NONE ALL
:return:
"""
print(f"startParams: {startParams}")
start_params = startParams
project_map = cls.get_project_map()
project_code = project_map.get(project_name)
process_df_map: Dict = cls.get_project_df_map(project_code)
......@@ -128,11 +129,11 @@ class DolphinschedulerHelper(object):
title = f"【海豚调度】调度api触发提示"
if resp_state:
DolphinschedulerHelper.send_startup_state_to_oa(project_name, process_df_name, resp_state)
DolphinschedulerHelper.send_startup_state_to_oa(project_name, f"{process_df_name}, 参数 {':'.join(start_params.values())}", resp_state)
return True
else:
DolphinschedulerHelper.send_startup_state_to_oa(project_name, process_df_name, resp_state)
raise Exception(f"任务【{project_name}/{process_df_name}】调度失败!")
DolphinschedulerHelper.send_startup_state_to_oa(project_name, f"{process_df_name}, 参数 {':'.join(start_params.values())}", resp_state)
raise Exception(f"任务【{project_name}/f'{process_df_name}, 参数 {':'.join(start_params.values())}'】调度失败!")
@classmethod
def send_wx_msg(cls, users: list, title: str, content: str, msgtype: str = "textcard"):
......
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.common_util import CommonUtil
from utils.secure_db_client import get_remote_engine
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
date_type = CommonUtil.get_sys_arg(2, None)
date_info = CommonUtil.get_sys_arg(3, None)
assert site_name is not None, "site_name 不能为空!"
assert date_type is not None, "date_type 不能为空!"
assert date_info is not None, "date_info 不能为空!"
query = f"""
select
task_id,
asin,
id,
update_time,
query_md5
from keepa_finder_task_asin
where 1 = 1
and \$CONDITIONS
"""
hive_tb = "ods_keepa_finder_task_asin"
partition_dict = {
"site_name": site_name,
"date_type": date_type,
"date_info": date_info,
}
db_type = "doris"
engine = get_remote_engine(
site_name=site_name,
db_type=db_type
)
engine.sqoop_raw_import(
query=query,
hive_table=hive_tb,
partitions=partition_dict
)
pass
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment