Commit 44e32b12 by chenyuanjie

fix

parent 8d26273e
...@@ -103,10 +103,10 @@ class EsAsinProfitRate(object): ...@@ -103,10 +103,10 @@ class EsAsinProfitRate(object):
print(f"创建利润率索引:{self.es_profit_rate_index}!") print(f"创建利润率索引:{self.es_profit_rate_index}!")
EsUtils.create_index(self.es_profit_rate_index, self.es_client, self.es_profit_rate_body) EsUtils.create_index(self.es_profit_rate_index, self.es_client, self.es_profit_rate_body)
try: try:
# self.df_asin_profit_rate.write.format("org.elasticsearch.spark.sql") \ self.df_asin_profit_rate.write.format("org.elasticsearch.spark.sql") \
# .options(**self.es_profit_rate_options) \ .options(**self.es_profit_rate_options) \
# .mode("append") \ .mode("append") \
# .save() .save()
print(f"ES {self.es_profit_rate_index} 索引更新完毕!") print(f"ES {self.es_profit_rate_index} 索引更新完毕!")
except Exception as e: except Exception as e:
print("An error occurred while writing to Elasticsearch:", str(e)) print("An error occurred while writing to Elasticsearch:", str(e))
...@@ -170,17 +170,18 @@ class EsAsinProfitRate(object): ...@@ -170,17 +170,18 @@ class EsAsinProfitRate(object):
"es.mapping.id": "asin", "es.mapping.id": "asin",
"es.resource": f"{index_name}/_doc", "es.resource": f"{index_name}/_doc",
"es.batch.write.refresh": "false", "es.batch.write.refresh": "false",
"es.batch.size.entries": "5000", "es.batch.size.entries": "5000", # 批次数据量
"es.write.operation": "update", "es.write.operation": "update",
"es.batch.write.retry.count": "3", "es.batch.write.concurrency": "5", # 降低并发数,默认是自动(较高)
"es.batch.write.retry.wait": "10s", "es.batch.write.retry.count": "3", # 重试次数
"es.internal.es.version.ignore": "true" # 忽略版本检查 "es.batch.write.retry.wait": "30s", # 重试等待
"es.http.timeout": "5m", # 增加超时时间
"es.internal.es.version.ignore": "true" # 忽略版本检查
} }
print(f"索引 {index_name} 待更新数据量: {df_update.count()}") print(f"索引 {index_name} 待更新数据量: {df_update.count()}")
df_update.show(5, False) df_update.show(5, False)
df_update.repartition(10).write.format("org.elasticsearch.spark.sql") \
df_update.write.format("org.elasticsearch.spark.sql") \
.options(**es_options) \ .options(**es_options) \
.mode("append") \ .mode("append") \
.save() .save()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment