Commit 27622540 by chenyuanjie

流量选品-更新ES月销

parent c7f5d019
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.spark_util import SparkUtil
from utils.db_util import DBUtil
from utils.common_util import CommonUtil
from utils.DorisHelper import DorisHelper
from pyspark.sql import functions as F
__es_ip__ = "192.168.10.217"
__es_port__ = "9200"
__es_user__ = "elastic"
__es_passwd__ = "Selection20251#+"
class EsUpdate(object):
def __init__(self):
self.spark = SparkUtil.get_spark_session(f"{self.__class__.__name__}")
self.index_name = "us_st_detail_month_2025_09"
self.es_options = {
"es.nodes": __es_ip__,
"es.port": __es_port__,
"es.net.http.auth.user": __es_user__,
"es.net.http.auth.pass": __es_passwd__,
"es.mapping.id": "asin",
"es.resource": f"{self.index_name}/_doc",
"es.batch.write.refresh": "false",
"es.batch.write.retry.wait": "60s",
"es.batch.size.entries": "5000",
"es.nodes.wan.only": "false",
"es.batch.write.concurrency": "60",
"es.write.operation": "upsert"
}
self.df_asin = self.spark.sql(f"select 1+1;")
self.df_es_asin = self.spark.sql(f"select 1+1;")
self.df_need_update = self.spark.sql(f"select 1+1;")
def run(self):
self.get_update_asin()
self.update_es_filed()
def get_update_asin(self):
sql = """
select asin from us_asin_detail_2025_not_buysales
"""
pg_con_info = DBUtil.get_connection_info("postgresql_14", "us")
self.df_asin = SparkUtil.read_jdbc_query(
session=self.spark,
url=pg_con_info['url'],
username=pg_con_info['username'],
pwd=pg_con_info['pwd'],
query=sql
)
self.df_asin = self.df_asin.dropDuplicates(['asin']).repartition(40, 'asin').cache()
print("爬虫表数据量为:", self.df_asin.count())
def update_es_filed(self):
es_asin_sql = f"""
SELECT asin from es_selection.default_db.{self.index_name}
"""
self.df_es_asin = DorisHelper.spark_import_with_sql(self.spark, es_asin_sql).repartition(40, 'asin')
self.df_need_update = self.df_asin.join(
self.df_es_asin, on=['asin'], how='inner'
).withColumn(
'asin_bought_month', F.lit(None)
).cache()
print("ES待更新的数据量为:", self.df_need_update.count())
print(f"正在更新ES数据,更新索引:{self.index_name}")
try:
self.df_need_update.write.format("org.elasticsearch.spark.sql") \
.options(**self.es_options) \
.mode("append") \
.save()
print(f"ES{self.index_name}索引更新完毕!")
except Exception as e:
print("An error occurred while writing to Elasticsearch:", str(e))
CommonUtil.send_wx_msg(['chenyuanjie'], '\u26A0 ES月销数据更新失败', f'失败索引:{self.index_name}')
if __name__ == "__main__":
handle_obj = EsUpdate()
handle_obj.run()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment