Commit cda0402a by chenyuanjie

流量选品-es新增富集策略

parent 146441b7
...@@ -48,7 +48,11 @@ class EsStDetail(TemplatesMysql): ...@@ -48,7 +48,11 @@ class EsStDetail(TemplatesMysql):
self.record_table_name_field = f'{self.site_name}_flow_asin_last_month' if self.date_type == 'month' else f'{self.site_name}_flow_asin_last30day' self.record_table_name_field = f'{self.site_name}_flow_asin_last_month' if self.date_type == 'month' else f'{self.site_name}_flow_asin_last30day'
# elasticsearch相关配置 # elasticsearch相关配置
self.client = EsUtils.get_es_client() self.client = EsUtils.get_es_client()
self.es_options = EsUtils.get_es_options(self.es_index_name) # 富集策略相关配置,用于更新 usr_mask_type 字段
self.policy_name1 = "user_mask_asin_policy"
self.policy_name2 = "user_mask_category_policy"
self.pipeline_id = "user_asin_mask_enrich_pipeline"
self.es_options = EsUtils.get_es_options(self.es_index_name, self.pipeline_id)
self.es_body = EsUtils.get_es_body() self.es_body = EsUtils.get_es_body()
# 正式导出需入导出记录表 # 正式导出需入导出记录表
...@@ -105,6 +109,10 @@ class EsStDetail(TemplatesMysql): ...@@ -105,6 +109,10 @@ class EsStDetail(TemplatesMysql):
def es_prepare(self): def es_prepare(self):
print("当前链接的es节点信息为:" + str(EsUtils.__es_ip__)) print("当前链接的es节点信息为:" + str(EsUtils.__es_ip__))
EsUtils.create_index(self.es_index_name, self.client, self.es_body) EsUtils.create_index(self.es_index_name, self.client, self.es_body)
# 执行富集策略
EsUtils.user_enrich_pipeline(self.client, self.pipeline_id, self.policy_name1, self.policy_name2)
self.client.enrich.execute_policy(name=self.policy_name1)
self.client.enrich.execute_policy(name=self.policy_name2)
if self.date_type != 'month': if self.date_type != 'month':
if not EsUtils.exist_index_alias(self.alias_name, self.client): if not EsUtils.exist_index_alias(self.alias_name, self.client):
EsUtils.create_index_alias(self.es_index_name, self.alias_name, self.client) EsUtils.create_index_alias(self.es_index_name, self.alias_name, self.client)
......
...@@ -27,7 +27,7 @@ class EsUtils(object): ...@@ -27,7 +27,7 @@ class EsUtils(object):
# 获取elasticsearch相关配置 # 获取elasticsearch相关配置
@staticmethod @staticmethod
def get_es_options(es_index_name): def get_es_options(es_index_name, pipeline_id):
return { return {
"es.nodes": EsUtils.__es_ip__, "es.nodes": EsUtils.__es_ip__,
"es.port": EsUtils.__es_port__, "es.port": EsUtils.__es_port__,
...@@ -40,7 +40,8 @@ class EsUtils(object): ...@@ -40,7 +40,8 @@ class EsUtils(object):
"es.batch.size.entries": "5000", "es.batch.size.entries": "5000",
"es.nodes.wan.only": "false", "es.nodes.wan.only": "false",
"es.batch.write.concurrency": "30", "es.batch.write.concurrency": "30",
"es.write.operation": "upsert" "es.write.operation": "upsert",
"es.ingest.pipeline": f"{pipeline_id}"
} }
# 获取elasticsearch中索引配置信息 # 获取elasticsearch中索引配置信息
...@@ -487,7 +488,6 @@ class EsUtils(object): ...@@ -487,7 +488,6 @@ class EsUtils(object):
index_name_list = list(alias_info.keys()) index_name_list = list(alias_info.keys())
return index_name_list return index_name_list
#删除索引别名 #删除索引别名
@staticmethod @staticmethod
def delete_index_alias(alias_name, client): def delete_index_alias(alias_name, client):
...@@ -500,7 +500,79 @@ class EsUtils(object): ...@@ -500,7 +500,79 @@ class EsUtils(object):
else: else:
print("索引别名不存在!") print("索引别名不存在!")
@staticmethod
def user_enrich_pipeline(client, pipeline_id, policy_name1, policy_name2):
pipeline_body = {
"description": "asin flow user mask pipeline",
"processors": [
{
"enrich": {
"policy_name": f"{policy_name1}",
"field": "asin",
"target_field": "policy_add_1",
"max_matches": 1,
"ignore_missing": True
},
},
{
"enrich": {
"policy_name": f"{policy_name2}",
"field": "category_id",
"target_field": "policy_add_2",
"max_matches": 1,
"ignore_missing": True
},
},
{
"set": {
"field": "usr_mask_type",
"value": "{{policy_add_1.usr_mask_type}}",
"ignore_empty_value": True
}
},
{
"set": {
"field": "usr_mask_progress",
"value": "{{policy_add_1.usr_mask_progress}}",
"ignore_empty_value": True
}
},
{
"set": {
"field": "package_quantity",
"value": "{{policy_add_1.package_quantity}}",
"ignore_empty_value": True
}
},
{
"set": {
"field": "usr_mask_type",
"value": "{{policy_add_2.usr_mask_type}}",
"ignore_empty_value": True
}
},
{
"remove": {
"field": "policy_add_1",
"ignore_missing": True
}
},
{
"remove": {
"field": "policy_add_2",
"ignore_missing": True
}
},
{
"convert": {
"field": "package_quantity",
"type": "integer",
"ignore_missing": True
}
}
]
}
client.ingest.put_pipeline(id=pipeline_id, body=pipeline_body)
if __name__ == '__main__': if __name__ == '__main__':
pass pass
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment