Commit 4b76286f by chenyuanjie

fix

parent 67068a36
...@@ -833,9 +833,9 @@ class KafkaFlowAsinDetail(Templates): ...@@ -833,9 +833,9 @@ class KafkaFlowAsinDetail(Templates):
EsUtils.create_index(self.es_index_name, self.client, self.es_index_body) EsUtils.create_index(self.es_index_name, self.client, self.es_index_body)
print("索引名称为:", self.es_index_name) print("索引名称为:", self.es_index_name)
# 执行富集策略 # 执行富集策略
EsUtils.user_enrich_pipeline(self.client, self.pipeline_id, self.policy_name1, self.policy_name2)
self.client.enrich.execute_policy(name=self.policy_name1) self.client.enrich.execute_policy(name=self.policy_name1)
self.client.enrich.execute_policy(name=self.policy_name2) self.client.enrich.execute_policy(name=self.policy_name2)
EsUtils.user_enrich_pipeline(self.client, self.pipeline_id, self.policy_name1, self.policy_name2)
if not EsUtils.exist_index_alias(self.es_index_alias_name, self.client): if not EsUtils.exist_index_alias(self.es_index_alias_name, self.client):
EsUtils.create_index_alias(self.es_index_name, self.es_index_alias_name, self.client) EsUtils.create_index_alias(self.es_index_name, self.es_index_alias_name, self.client)
else: else:
......
# author : wangrui
# data : 2023/3/9 15:50
from elasticsearch import Elasticsearch from elasticsearch import Elasticsearch
...@@ -40,7 +38,7 @@ class EsUtils(object): ...@@ -40,7 +38,7 @@ class EsUtils(object):
"es.batch.size.entries": "5000", "es.batch.size.entries": "5000",
"es.nodes.wan.only": "false", "es.nodes.wan.only": "false",
"es.batch.write.concurrency": "30", "es.batch.write.concurrency": "30",
"es.write.operation": "upsert", "es.write.operation": "index",
"es.ingest.pipeline": f"{pipeline_id}" "es.ingest.pipeline": f"{pipeline_id}"
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment