Commit 0f4f80be by chenyuanjie

流量选品-实时任务新增富集策略

parent 3a52c2df
......@@ -50,7 +50,11 @@ class KafkaFlowAsinDetail(Templates):
self.es_index_name = f"{self.topic_name}_test" if self.test_flag == 'test' else f"{self.topic_name}"
self.es_index_alias_name = f"{self.site_name}_st_detail_last_4_week_test" if self.test_flag == 'test' else f"{self.site_name}_st_detail_last_4_week"
self.es_index_body = EsUtils.get_es_body()
self.es_options = EsUtils.get_es_options(self.es_index_name)
# 富集策略相关配置,用于更新 usr_mask_type 字段
self.policy_name1 = "user_mask_asin_policy"
self.policy_name2 = "user_mask_category_policy"
self.pipeline_id = "user_asin_mask_enrich_pipeline"
self.es_options = EsUtils.get_es_options(self.es_index_name, self.pipeline_id)
self.db_save = 'kafka_flow_asin_detail'
self.app_name = self.get_app_name()
print(f"任务名称:{self.app_name}")
......@@ -828,6 +832,10 @@ class KafkaFlowAsinDetail(Templates):
# 创建对应es索引
EsUtils.create_index(self.es_index_name, self.client, self.es_index_body)
print("索引名称为:", self.es_index_name)
# 执行富集策略
EsUtils.user_enrich_pipeline(self.client, self.pipeline_id, self.policy_name1, self.policy_name2)
self.client.enrich.execute_policy(name=self.policy_name1)
self.client.enrich.execute_policy(name=self.policy_name2)
if not EsUtils.exist_index_alias(self.es_index_alias_name, self.client):
EsUtils.create_index_alias(self.es_index_name, self.es_index_alias_name, self.client)
else:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment