Commit 078bc134 by chenyuanjie

asin信息库+流量选品30day 利润率更新

parent 6f57e4be
import os import os
import sys import sys
# =============================================================================
# 【ES写入异常排查笔记】EsHadoopNoNodesLeftException: all nodes failed
# =============================================================================
# 现象:Spark executor 写入ES时报 Connection error - all nodes failed,
# tried [[210:9200, 217:9200, 216:9200]]
#
# 排查步骤:
# 在各 hadoop 节点执行:curl -u user:pwd http://{es_node}:9200
# - 返回 401 → 网络可达(正常,未带认证)
# - 连接超时 → 网络不可达
#
# 原因1:网络不可达(Executor 只能访问部分ES节点)
# 根因:ES扩容后新节点未对Spark集群开放网络权限,或节点间VLAN隔离
# 解法:设置 es.nodes.wan.only=true,强制所有请求只走配置节点,不做集群节点发现
#
# 原因2:ES节点过载(本项目实际原因)
# 根因:大数据量高并发写入,ES处理不过来拒绝连接,重试所有节点均失败
# 解法:es.nodes.wan.only=true 将所有请求集中到单个协调节点(217),
# 由217内部分发到shard,相当于间接限流,降低对各数据节点的直接冲击
# 如仍报错,进一步降低并发和批次大小:
# es.batch.size.entries: 2000
# es.batch.write.concurrency: 5
# es.batch.write.retry.count: 5
# es.batch.write.retry.wait: 60s
#
# 结论:单入口访问ES集群时,es.nodes.wan.only=true 是推荐配置
# =============================================================================
sys.path.append(os.path.dirname(sys.path[0])) sys.path.append(os.path.dirname(sys.path[0]))
from utils.spark_util import SparkUtil from utils.spark_util import SparkUtil
...@@ -83,6 +111,9 @@ class EsAsinProfitRate(object): ...@@ -83,6 +111,9 @@ class EsAsinProfitRate(object):
self.read_profit_rate_add() self.read_profit_rate_add()
self.update_history_index() self.update_history_index()
self.save_profit_rate_to_es() # 遍历完成后统一更新利润率索引 self.save_profit_rate_to_es() # 遍历完成后统一更新利润率索引
# 更新信息库年表索引和流量选品30天索引的利润率字段
self.update_index_profit_rate(f"{self.site_name}_ai_asin_analyze_detail_last365_day")
self.update_index_profit_rate(f"{self.site_name}_flow_asin_30day")
def read_profit_rate_add(self): def read_profit_rate_add(self):
# 读取利润率整合数据(增量数据) # 读取利润率整合数据(增量数据)
...@@ -213,6 +244,86 @@ class EsAsinProfitRate(object): ...@@ -213,6 +244,86 @@ class EsAsinProfitRate(object):
print(f"索引 {index_name} 更新完毕!") print(f"索引 {index_name} 更新完毕!")
def update_index_profit_rate(self, index_name):
"""
从指定索引读取 asin + profit_key,
与新增利润率数据 inner join 后,只更新利润率相关字段回索引
"""
if not EsUtils.exist_index(index_name, self.es_client):
print(f"索引 {index_name} 不存在,跳过更新")
return
print(f"\n{'='*60}")
print(f"开始更新索引利润率字段:{index_name}")
print(f"{'='*60}")
# 从索引中读取 asin + profit_key
read_options = {
"es.nodes": EsUtils.__es_ip__,
"es.port": EsUtils.__es_port__,
"es.net.http.auth.user": EsUtils.__es_user__,
"es.net.http.auth.pass": EsUtils.__es_passwd__,
"es.nodes.wan.only": "true",
"es.read.field.include": "asin,profit_key"
}
df_index = self.spark.read.format("org.elasticsearch.spark.sql") \
.options(**read_options) \
.load(index_name) \
.select("asin", "profit_key") \
.dropna(subset=["profit_key"]) \
.repartition(40, "profit_key")
# 与新增利润率数据 inner join
df_update = df_index.join(
self.df_asin_profit_rate.select("profit_key", "ocean_profit", "air_profit"),
on="profit_key",
how="inner"
).withColumn(
"profit_rate_extra",
F.when(
F.col("ocean_profit").isNull() & F.col("air_profit").isNull(),
F.lit(None)
).otherwise(
F.struct(
F.col("ocean_profit").alias("ocean_profit"),
F.col("air_profit").alias("air_profit")
)
)
).select("asin", "profit_rate_extra").cache()
count = df_update.count()
print(f"索引 {index_name} 待更新利润率数据量: {count}")
if count == 0:
print("无待更新数据,跳过")
return
write_options = {
"es.nodes": EsUtils.__es_ip__,
"es.port": EsUtils.__es_port__,
"es.net.http.auth.user": EsUtils.__es_user__,
"es.net.http.auth.pass": EsUtils.__es_passwd__,
"es.mapping.id": "asin",
"es.resource": f"{index_name}/_doc",
"es.write.operation": "update",
"es.update.retry.on.conflict": "3",
"es.batch.write.refresh": "false",
"es.batch.size.entries": "2000",
"es.batch.write.concurrency": "5",
"es.batch.write.retry.count": "5",
"es.batch.write.retry.wait": "60s",
"es.nodes.wan.only": "true"
}
try:
df_update.repartition(10).write.format("org.elasticsearch.spark.sql") \
.options(**write_options) \
.mode("append") \
.save()
print(f"索引 {index_name} 利润率字段更新完毕!")
except Exception as e:
print(f"更新索引 {index_name} 失败: {str(e)}")
CommonUtil.send_wx_msg(['chenyuanjie'], '\u26A0 ES利润率更新失败', f'失败索引:{index_name}')
if __name__ == "__main__": if __name__ == "__main__":
site_name = sys.argv[1] site_name = sys.argv[1]
date_info = sys.argv[2] date_info = sys.argv[2]
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment