Commit d22775a0 by chenyuanjie

ES更新利润率索引

parent 2fc06234
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.spark_util import SparkUtil
from utils.es_util import EsUtils
from pyspark.sql import functions as F
from utils.common_util import CommonUtil
class EsAsinProfitRate(object):
def __init__(self, site_name):
self.site_name = site_name
self.spark = SparkUtil.get_spark_session(f"{self.__class__.__name__}")
# ES相关配置
self.es_client = EsUtils.get_es_client()
self.es_index = f"{self.site_name}_profit_rate_extra"
self.es_body = self.get_es_body()
self.es_options = self.get_es_options(self.es_index)
self.profit_rate_policy = f"{self.site_name}_profit_rate_policy"
self.user_mask_asin_policy = "user_mask_asin_policy"
self.user_mask_category_policy = "user_mask_category_policy"
self.pipeline_id = f"{self.site_name}_user_mask_and_profit_rate_pipeline"
self.df_asin_profit_rate = self.spark.sql(f"select 1+1;")
self.df_keepa_asin = self.spark.sql(f"select 1+1;")
@staticmethod
def get_es_body():
return {
"settings": {
"number_of_shards": "3",
"number_of_replicas": "1"
},
"mappings": {
"properties": {
"profit_key": {
"type": "keyword"
},
"asin": {
"type": "keyword"
},
"price": {
"type": "float"
},
"ocean_profit": {
"type": "float"
},
"air_profit": {
"type": "float"
}
}
}
}
@staticmethod
def get_es_options(index_name):
return {
"es.nodes": EsUtils.__es_ip__,
"es.port": EsUtils.__es_port__,
"es.net.http.auth.user": EsUtils.__es_user__,
"es.net.http.auth.pass": EsUtils.__es_passwd__,
"es.mapping.id": "profit_key",
"es.resource": f"{index_name}/_doc",
"es.batch.write.refresh": "false",
"es.batch.write.retry.wait": "60s",
"es.batch.size.entries": "5000",
"es.nodes.wan.only": "false",
"es.batch.write.concurrency": "40",
"es.write.operation": "index"
}
def run(self):
self.read_data()
self.es_save()
self.create_enrich_policy()
self.create_enrich_pipeline()
def read_data(self):
sql = f"""
select asin, price, ocean_profit, air_profit, package_length, package_width, package_height, weight
from dim_asin_profit_rate_info where site_name = '{self.site_name}'
"""
self.df_asin_profit_rate = self.spark.sql(sqlQuery=sql).repartition(40, 'asin')
sql = f"""
select asin, package_length, package_width, package_height, weight
from dim_keepa_asin_info where site_name = '{self.site_name}'
"""
self.df_keepa_asin = self.spark.sql(sqlQuery=sql).repartition(40, 'asin')
# 因为 dim_asin_profit_rate_info 存在重复计算利润率的情况,保留与keepa最新数据所对应的数据行
self.df_asin_profit_rate = self.df_asin_profit_rate.join(
self.df_keepa_asin, on=['asin', 'package_length', 'package_width', 'package_height', 'weight'], how='inner'
).select(
'asin', 'price', 'ocean_profit', 'air_profit'
).withColumn(
'profit_key', F.concat_ws("_", F.col("asin"), F.col("price"))
).cache()
def es_save(self):
print(f"创建富集索引:{self.es_index}!")
EsUtils.create_index(self.es_index, self.es_client, self.es_body)
try:
self.df_asin_profit_rate.write.format("org.elasticsearch.spark.sql") \
.options(**self.es_options) \
.mode("append") \
.save()
print(f"ES {self.es_index} 索引更新完毕!")
except Exception as e:
print("An error occurred while writing to Elasticsearch:", str(e))
CommonUtil.send_wx_msg(['chenyuanjie'], '\u26A0 ES数据更新失败', f'失败索引:{self.es_index}')
def create_enrich_policy(self):
# self.es_client.ingest.delete_pipeline(id=self.pipeline_id)
# self.es_client.enrich.delete_policy(name=self.policy_name)
# print(f"创建富集策略:{self.policy_name}!")
# policy_body = {
# "match": {
# "indices": f"{self.es_index}",
# "match_field": "profit_key",
# "enrich_fields": ["ocean_profit", "air_profit"]
# }
# }
# self.es_client.enrich.put_policy(name=self.policy_name, body=policy_body)
print(f"刷新富集策略:{self.profit_rate_policy}!")
self.es_client.enrich.execute_policy(self.profit_rate_policy, request_timeout=1800)
def create_enrich_pipeline(self):
print(f"创建富集管道:{self.pipeline_id}!")
pipeline_body = {
"description": "asin profit_rate and user_mask pipeline",
"processors": [
{
"enrich": {
"policy_name": self.profit_rate_policy,
"field": "profit_key",
"target_field": "profit_rate_extra",
"max_matches": 1,
"ignore_missing": True
},
},
{
"enrich": {
"policy_name": f"{self.user_mask_asin_policy}",
"field": "asin",
"target_field": "policy_add_1",
"max_matches": 1,
"ignore_missing": True
},
},
{
"enrich": {
"policy_name": f"{self.user_mask_category_policy}",
"field": "category_id",
"target_field": "policy_add_2",
"max_matches": 1,
"ignore_missing": True
},
},
{
"set": {
"field": "usr_mask_type",
"value": "{{policy_add_1.usr_mask_type}}",
"ignore_empty_value": True
}
},
{
"set": {
"field": "usr_mask_progress",
"value": "{{policy_add_1.usr_mask_progress}}",
"ignore_empty_value": True
}
},
{
"set": {
"field": "package_quantity",
"value": "{{policy_add_1.package_quantity}}",
"ignore_empty_value": True
}
},
{
"set": {
"field": "usr_mask_type",
"value": "{{policy_add_2.usr_mask_type}}",
"ignore_empty_value": True
}
},
{
"remove": {
"field": "policy_add_1",
"ignore_missing": True
}
},
{
"remove": {
"field": "policy_add_2",
"ignore_missing": True
}
},
{
"convert": {
"field": "package_quantity",
"type": "integer",
"ignore_missing": True
}
}
]
}
self.es_client.ingest.put_pipeline(id=self.pipeline_id, body=pipeline_body)
# 刷新ES数据,使pipeline生效
# body = {
# "query": {
# "bool": {
# "must_not": {
# "exists": {
# "field": "profit_rate_extra"
# }
# }
# }
# }
# }
body = {
}
self.es_client.update_by_query(
index="us_st_detail_month_2025_11",
body=body,
pipeline=self.pipeline_id,
refresh=True,
wait_for_completion=False,
request_timeout=600
)
pass
if __name__ == "__main__":
site_name = sys.argv[1]
handle_obj = EsAsinProfitRate(site_name)
handle_obj.run()
print("success!!!")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment