# author : wangrui # data : 2024/3/22 14:28 import os import sys sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from utils.es_util import EsUtils from pyspark.sql import SparkSession from utils.DorisHelper import DorisHelper __es_ip__ = "192.168.10.217" __es_port__ = "9200" __es_user__ = "elastic" __es_passwd__ = "selection2021.+" __warehouse_dir__ = "hdfs://nameservice1:8020/home/big_data_selection" __metastore_uris__ = "thrift://hadoop16:9083" def get_es_index_name(): client = EsUtils.get_es_client() index_name_list = EsUtils.get_index_names_associated_alias("us_st_detail_last_4_week", client) if index_name_list: index_name = str(index_name_list[0]) print("elasticsearch上待更新的索引名称为:", index_name) return index_name def update_es_fileds(df_update, index_name): es_options = { "es.nodes": __es_ip__, "es.port": __es_port__, "es.net.http.auth.user": __es_user__, "es.net.http.auth.pass": __es_passwd__, "es.mapping.id": "asin", "es.resource": f"{index_name}/_doc", "es.batch.write.refresh": "false", "es.batch.write.retry.wait": "60s", "es.batch.size.entries": "60000", "es.nodes.wan.only": "false", "es.batch.write.concurrency": "80", "es.write.operation": "upsert" } try: df_update = df_update.repartition(40) df_update.write.format("org.elasticsearch.spark.sql") \ .options(**es_options) \ .mode("append") \ .save() print("elasticsearch更新完毕") except Exception as e: print("An error occurred while writing to Elasticsearch:", str(e)) pass def read_es_asin(spark, index_name): es_asin_sql = f""" SELECT asin from es_selection.default_db.{index_name} """ df_need_update = DorisHelper.spark_import_with_sql(spark, es_asin_sql) return df_need_update def get_main_asin(spark): sql = f""" SELECT asin, cast(auctions_num as bigint) as auctions_num, cast(auctions_num_all as bigint) as auctions_num_all, cast(skus_num_creat as bigint) as skus_num_creat, cast(skus_num_creat_all as bigint) as skus_num_creat_all FROM big_data_selection.tmp_jm_info """ print("sql=", sql) df_main_asin = spark.sql(sql) return df_main_asin def create_spark(): return SparkSession.builder \ .master("yarn") \ .appName("es_update_use_sku_info") \ .config("spark.sql.warehouse.dir", __warehouse_dir__) \ .config("spark.metastore.uris", __metastore_uris__) \ .config("spark.network.timeout", 1000000) \ .config("spark.sql.orc.mergeSchema", True) \ .config("spark.sql.parquet.compression.codec", "lzo") \ .config("spark.driver.maxResultSize", "10g") \ .config("spark.sql.autoBroadcastJoinThreshold", -1) \ .config("spark.sql.shuffle.partitions", 100) \ .config("spark.executor.memory", "15g") \ .config("spark.executor.cores", "4") \ .config("spark.executor.instances", "15") \ .config("spark.driver.memory", "20g") \ .config("spark.yarn.queue", "spark") \ .enableHiveSupport() \ .getOrCreate() def main(): spark = create_spark() index_name = get_es_index_name() df_need_update = read_es_asin(spark, index_name) df_es = df_need_update.repartition(40).cache() print("elasticsearch上的asin信息为: ") df_es.show(20, truncate=False) df_main_asin = get_main_asin(spark) df_main_asin = df_main_asin.repartition(40).cache() print("需要更新的asin信息为: ") df_main_asin.show(20, truncate=False) df_update = df_es.join( df_main_asin, on=["asin"], how="inner" ) print("es上存在的需要更新的asin信息为: ") df_update = df_update.na.fill({"auctions_num": 0, "auctions_num_all": 0, "skus_num_creat": 0, "skus_num_creat_all": 0}).cache() df_update.show(20, truncate=False) update_es_fileds(df_update, index_name) spark.stop() if __name__ == '__main__': main()