es_update_with_info.py 4.17 KB
# author : wangrui
# data : 2024/3/22 14:28
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from utils.es_util import EsUtils
from pyspark.sql import SparkSession
from utils.DorisHelper import DorisHelper

__es_ip__ = "192.168.10.217"
__es_port__ = "9200"
__es_user__ = "elastic"
__es_passwd__ = "selection2021.+"
__warehouse_dir__ = "hdfs://nameservice1:8020/home/big_data_selection"
__metastore_uris__ = "thrift://hadoop16:9083"



def get_es_index_name():
    client = EsUtils.get_es_client()
    index_name_list = EsUtils.get_index_names_associated_alias("us_st_detail_last_4_week", client)
    if index_name_list:
        index_name = str(index_name_list[0])
        print("elasticsearch上待更新的索引名称为:", index_name)
        return index_name


def update_es_fileds(df_update, index_name):
    es_options = {
        "es.nodes": __es_ip__,
        "es.port": __es_port__,
        "es.net.http.auth.user": __es_user__,
        "es.net.http.auth.pass": __es_passwd__,
        "es.mapping.id": "asin",
        "es.resource": f"{index_name}/_doc",
        "es.batch.write.refresh": "false",
        "es.batch.write.retry.wait": "60s",
        "es.batch.size.entries": "60000",
        "es.nodes.wan.only": "false",
        "es.batch.write.concurrency": "80",
        "es.write.operation": "upsert"
    }
    try:
        df_update = df_update.repartition(40)
        df_update.write.format("org.elasticsearch.spark.sql") \
            .options(**es_options) \
            .mode("append") \
            .save()
        print("elasticsearch更新完毕")
    except Exception as e:
        print("An error occurred while writing to Elasticsearch:", str(e))
        pass


def read_es_asin(spark, index_name):
    es_asin_sql = f"""
        SELECT asin from es_selection.default_db.{index_name}
    """
    df_need_update = DorisHelper.spark_import_with_sql(spark, es_asin_sql)
    return df_need_update


def get_main_asin(spark):
    sql = f"""
            SELECT asin, cast(auctions_num as bigint) as auctions_num, cast(auctions_num_all as bigint) as auctions_num_all, 
            cast(skus_num_creat as bigint) as skus_num_creat, cast(skus_num_creat_all as bigint) as skus_num_creat_all FROM big_data_selection.tmp_jm_info
        """
    print("sql=", sql)
    df_main_asin = spark.sql(sql)
    return df_main_asin


def create_spark():
    return SparkSession.builder \
                .master("yarn") \
                .appName("es_update_use_sku_info") \
                .config("spark.sql.warehouse.dir", __warehouse_dir__) \
                .config("spark.metastore.uris", __metastore_uris__) \
                .config("spark.network.timeout", 1000000) \
                .config("spark.sql.orc.mergeSchema", True) \
                .config("spark.sql.parquet.compression.codec", "lzo") \
                .config("spark.driver.maxResultSize", "10g") \
                .config("spark.sql.autoBroadcastJoinThreshold", -1) \
                .config("spark.sql.shuffle.partitions", 100) \
                .config("spark.executor.memory", "15g") \
                .config("spark.executor.cores", "4") \
                .config("spark.executor.instances", "15") \
                .config("spark.driver.memory", "20g") \
                .config("spark.yarn.queue", "spark") \
                .enableHiveSupport() \
                .getOrCreate()


def main():
    spark = create_spark()
    index_name = get_es_index_name()
    df_need_update = read_es_asin(spark, index_name)
    df_es = df_need_update.repartition(40).cache()
    print("elasticsearch上的asin信息为: ")
    df_es.show(20, truncate=False)
    df_main_asin = get_main_asin(spark)
    df_main_asin = df_main_asin.repartition(40).cache()
    print("需要更新的asin信息为: ")
    df_main_asin.show(20, truncate=False)
    df_update = df_es.join(
        df_main_asin, on=["asin"], how="inner"
    )
    print("es上存在的需要更新的asin信息为: ")
    df_update = df_update.na.fill({"auctions_num": 0, "auctions_num_all": 0, "skus_num_creat": 0, "skus_num_creat_all": 0}).cache()
    df_update.show(20, truncate=False)
    update_es_fileds(df_update, index_name)
    spark.stop()


if __name__ == '__main__':
    main()