1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# author : wangrui
# data : 2024/7/17 14:28
import os
import sys
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from utils.db_util import DBUtil, DbTypes
from pyspark.sql import SparkSession
import pandas as pd
from yswg_utils.common_df import get_user_mask_type_asin_sql
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils, HdfsError
from pyspark.sql import Row
from utils.spark_util import SparkUtil
from utils.DorisHelper import DorisHelper
__es_ip__ = "192.168.10.217"
__es_port__ = "9200"
__es_user__ = "elastic"
__es_passwd__ = "selection2021.+"
__warehouse_dir__ = "hdfs://nameservice1:8020/home/big_data_selection"
__metastore_uris__ = "thrift://hadoop16:9083"
def get_es_index_name(site_name):
engine_mysql = DBUtil.get_db_engine(db_type=DbTypes.mysql.name, site_name=site_name)
sql = f"""SELECT REPLACE(report_date, '-', '_') as date_info FROM workflow_everyday WHERE site_name='{site_name}' AND date_type='month' AND page='流量选品' AND status_val=14 ORDER BY report_date DESC;"""
print("查询需要更新的elaticsearch索引: ")
print("sql=", sql)
df = pd.read_sql(sql, con=engine_mysql)
if df.shape[0]:
date_info_list = list(df.date_info)
filter_date_info_list = [date_info for date_info in date_info_list if date_info >= '2024-01']
if filter_date_info_list:
return filter_date_info_list
else:
print("没有需要更新的索引信息")
sys.exit(0)
def update_es_fileds(spark, df_main, date_info_list, site_name, run_type):
if run_type == "real":
real_date_info_list = [max(date_info_list)]
print("更新模式针对elasticsearch上最新索引,待更新索引的日期包括:", real_date_info_list)
else:
real_date_info_list = list(set(date_info_list) - set([max(date_info_list)]))
print("更新模式针对elasticsearch上历史索引,待更新索引的日期包括:", real_date_info_list)
for date_info in real_date_info_list:
index_name = f"{site_name}_st_detail_month_{date_info}"
es_asin_sql = f"""
SELECT asin from es_selection.default_db.{index_name}
"""
df_es = DorisHelper.spark_import_with_sql(spark, es_asin_sql)
df_es = df_es.repartition(40)
df_need_update = df_es.join(
df_main, on=['asin'], how='inner'
)
print(f"Elasticsearch上{site_name} {date_info}需要更新的数据为:")
df_need_update.cache()
df_need_update.show(10, truncate=False)
es_options = {
"es.nodes": __es_ip__,
"es.port": __es_port__,
"es.net.http.auth.user": __es_user__,
"es.net.http.auth.pass": __es_passwd__,
"es.mapping.id": "asin",
"es.resource": f"{index_name}/_doc",
"es.batch.write.refresh": "false",
"es.batch.write.retry.wait": "60s",
"es.batch.size.entries": "5000",
"es.nodes.wan.only": "false",
"es.batch.write.concurrency": "60",
"es.write.operation": "upsert"
}
try:
df_need_update.write.format("org.elasticsearch.spark.sql") \
.options(**es_options) \
.mode("append") \
.save()
print(f"elasticsearch {index_name} 更新完毕!")
except Exception as e:
print("An error occurred while writing to Elasticsearch:", str(e))
CommonUtil.send_wx_msg(['wujicang', 'wangrui4'], '\u26A0 es用户标记信息更新失败', f'es更新用户标记信息失败:{site_name}, {date_info}')
pass
print("elasticsearch 所有数据全部更新完毕")
def get_date_info(spark, file_path, date_info, run_type):
if run_type == 'real':
print("最新索引更新模式对应日期为:", date_info)
return date_info
else:
cilent = HdfsUtils.get_hdfs_cilent()
try:
status = cilent.status(file_path)
if status is not None:
df_read = spark.read.parquet(file_path)
hdfs_date_info = str(df_read.select("date_info").first()[0])
print("hdfs上记录的日期为:", hdfs_date_info)
return hdfs_date_info
except HdfsError as e:
print("hdfs上没有记录日期, 传入的日期参数为: ", date_info)
return date_info
def save_date_info_to_hdfs(spark, date_info, file_path):
if date_info <= '2024-01-01':
pass
else:
data = [Row(date_info=date_info)]
df_save = spark.createDataFrame(data)
df_save.write.mode('overwrite').parquet(file_path)
def get_main_asin(spark, site_name, date_info):
pg_con_info = DBUtil.get_connection_info("postgresql", "us")
main_sql = get_user_mask_type_asin_sql(site_name, date_info)
print("用户标记信息查询sql为: ", main_sql)
if pg_con_info is not None:
df_main = SparkUtil.read_jdbc_query(session=spark, url=pg_con_info['url'],
pwd=pg_con_info['pwd'], username=pg_con_info['username'],
query=main_sql)
df_main = df_main.repartition(40).cache()
count = df_main.count()
if count == 0:
print("没有需要更新的数据")
sys.exit(0)
else:
print("待更新的数据量为:", count)
df_main.show(10, truncate=False)
return df_main
else:
pass
def main(site_name, date_info, run_type):
file_path = f"/tmp/wangrui/usr_mask/{site_name}/date_info.parquet"
spark = create_spark(site_name)
cur_date_info = get_date_info(spark, file_path, date_info, run_type)
df_main = get_main_asin(spark, site_name, cur_date_info)
date_info_list = get_es_index_name(site_name)
update_es_fileds(spark, df_main, date_info_list, site_name, run_type)
save_date_info_to_hdfs(spark, date_info, file_path)
spark.stop()
def create_spark(site_name):
return SparkSession.builder \
.master("yarn") \
.appName(f"es_update_with_usr_mask: {site_name}") \
.config("spark.sql.warehouse.dir", __warehouse_dir__) \
.config("spark.metastore.uris", __metastore_uris__) \
.config("spark.network.timeout", 1000000) \
.config("spark.sql.orc.mergeSchema", True) \
.config("spark.sql.parquet.compression.codec", "lzo") \
.config("spark.driver.maxResultSize", "10g") \
.config("spark.sql.autoBroadcastJoinThreshold", -1) \
.config("spark.sql.shuffle.partitions", 100) \
.config("spark.executor.memory", "15g") \
.config("spark.executor.cores", "4") \
.config("spark.executor.instances", "10") \
.config("spark.driver.memory", "15g") \
.config("spark.yarn.queue", "spark") \
.enableHiveSupport() \
.getOrCreate()
if __name__ == '__main__':
arguments = sys.argv[1:]
site_name = sys.argv[1] # 参数1:站点
run_type = sys.argv[2] # 参数2:运行类型(real:最新索引;history:历史索引)
if run_type == 'history':
if len(arguments) == 3: # 参数3:数据日期范围:如果针对最新索引,不用传入
date_info = sys.argv[3]
else:
date_info = '2024-01-01'
else:
date_info = '2024-01-01'
main(site_name, date_info, run_type)