Commit 27088859 by wujicang

修改导出参数

parent 42821c6d
......@@ -123,10 +123,13 @@ def save_to_doris(df_all: DataFrame):
def export():
spark = SparkUtil.get_spark_session("self_asin_redis:export")
day = CommonUtil.get_sys_arg(1, CommonUtil.format_now("%Y-%m-%d"))
export_type = CommonUtil.get_sys_arg(2, "redis&&doris")
last_day = CommonUtil.get_day_offset(day, -1)
next_day = CommonUtil.get_day_offset(day, 1)
# 先删除
redis_key = f"self_asin_detail:{day}"
if "redis" in export_type:
client = RedisUtils.get_redis_client_by_type(db_type='microservice')
if client.exists(redis_key):
client.delete(redis_key)
......@@ -177,11 +180,17 @@ def export():
# 填充默认值
asin_df = na_fill(asin_df).cache()
if "redis" in export_type:
asin_df.toJSON().foreachPartition(functools.partial(save_to_redis_list, batch=5000, redis_key=redis_key, ttl=3600 * 24))
print(f"{site_name}:redis:success")
if "doris" in export_type:
save_to_doris(asin_df)
print(f"{site_name}:doris:success")
print("success all")
if "redis" in export_type:
check_total()
pass
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment