1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# author : wangrui
# data : 2024/3/22 14:28
import os
import sys
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from utils.es_util import EsUtils
from pyspark.sql import SparkSession
from utils.DorisHelper import DorisHelper
__es_ip__ = "192.168.10.217"
__es_port__ = "9200"
__es_user__ = "elastic"
__es_passwd__ = "selection2021.+"
__warehouse_dir__ = "hdfs://nameservice1:8020/home/big_data_selection"
__metastore_uris__ = "thrift://hadoop16:9083"
def get_es_index_name():
client = EsUtils.get_es_client()
index_name_list = EsUtils.get_index_names_associated_alias("us_st_detail_last_4_week", client)
if index_name_list:
index_name = str(index_name_list[0])
print("elasticsearch上待更新的索引名称为:", index_name)
return index_name
def update_es_fileds(df_update, index_name):
es_options = {
"es.nodes": __es_ip__,
"es.port": __es_port__,
"es.net.http.auth.user": __es_user__,
"es.net.http.auth.pass": __es_passwd__,
"es.mapping.id": "asin",
"es.resource": f"{index_name}/_doc",
"es.batch.write.refresh": "false",
"es.batch.write.retry.wait": "60s",
"es.batch.size.entries": "60000",
"es.nodes.wan.only": "false",
"es.batch.write.concurrency": "80",
"es.write.operation": "upsert"
}
try:
df_update = df_update.repartition(40)
df_update.write.format("org.elasticsearch.spark.sql") \
.options(**es_options) \
.mode("append") \
.save()
print("elasticsearch更新完毕")
except Exception as e:
print("An error occurred while writing to Elasticsearch:", str(e))
pass
def read_es_asin(spark, index_name):
es_asin_sql = f"""
SELECT asin from es_selection.default_db.{index_name}
"""
df_need_update = DorisHelper.spark_import_with_sql(spark, es_asin_sql)
return df_need_update
def get_main_asin(spark):
sql = f"""
SELECT asin, cast(auctions_num as bigint) as auctions_num, cast(auctions_num_all as bigint) as auctions_num_all,
cast(skus_num_creat as bigint) as skus_num_creat, cast(skus_num_creat_all as bigint) as skus_num_creat_all FROM big_data_selection.tmp_jm_info
"""
print("sql=", sql)
df_main_asin = spark.sql(sql)
return df_main_asin
def create_spark():
return SparkSession.builder \
.master("yarn") \
.appName("es_update_use_sku_info") \
.config("spark.sql.warehouse.dir", __warehouse_dir__) \
.config("spark.metastore.uris", __metastore_uris__) \
.config("spark.network.timeout", 1000000) \
.config("spark.sql.orc.mergeSchema", True) \
.config("spark.sql.parquet.compression.codec", "lzo") \
.config("spark.driver.maxResultSize", "10g") \
.config("spark.sql.autoBroadcastJoinThreshold", -1) \
.config("spark.sql.shuffle.partitions", 100) \
.config("spark.executor.memory", "15g") \
.config("spark.executor.cores", "4") \
.config("spark.executor.instances", "15") \
.config("spark.driver.memory", "20g") \
.config("spark.yarn.queue", "spark") \
.enableHiveSupport() \
.getOrCreate()
def main():
spark = create_spark()
index_name = get_es_index_name()
df_need_update = read_es_asin(spark, index_name)
df_es = df_need_update.repartition(40).cache()
print("elasticsearch上的asin信息为: ")
df_es.show(20, truncate=False)
df_main_asin = get_main_asin(spark)
df_main_asin = df_main_asin.repartition(40).cache()
print("需要更新的asin信息为: ")
df_main_asin.show(20, truncate=False)
df_update = df_es.join(
df_main_asin, on=["asin"], how="inner"
)
print("es上存在的需要更新的asin信息为: ")
df_update = df_update.na.fill({"auctions_num": 0, "auctions_num_all": 0, "skus_num_creat": 0, "skus_num_creat_all": 0}).cache()
df_update.show(20, truncate=False)
update_es_fileds(df_update, index_name)
spark.stop()
if __name__ == '__main__':
main()