# author : wangrui # data : 2024/7/17 14:28 import os import sys sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from utils.db_util import DBUtil, DbTypes from pyspark.sql import SparkSession import pandas as pd from yswg_utils.common_df import get_user_mask_type_asin_sql from utils.common_util import CommonUtil from utils.hdfs_utils import HdfsUtils, HdfsError from pyspark.sql import Row from utils.spark_util import SparkUtil from utils.DorisHelper import DorisHelper __es_ip__ = "192.168.10.217" __es_port__ = "9200" __es_user__ = "elastic" __es_passwd__ = "selection2021.+" __warehouse_dir__ = "hdfs://nameservice1:8020/home/big_data_selection" __metastore_uris__ = "thrift://hadoop16:9083" def get_es_index_name(site_name): engine_mysql = DBUtil.get_db_engine(db_type=DbTypes.mysql.name, site_name=site_name) sql = f"""SELECT REPLACE(report_date, '-', '_') as date_info FROM workflow_everyday WHERE site_name='{site_name}' AND date_type='month' AND page='流量选品' AND status_val=14 ORDER BY report_date DESC;""" print("查询需要更新的elaticsearch索引: ") print("sql=", sql) df = pd.read_sql(sql, con=engine_mysql) if df.shape[0]: date_info_list = list(df.date_info) filter_date_info_list = [date_info for date_info in date_info_list if date_info >= '2024-01'] if filter_date_info_list: return filter_date_info_list else: print("没有需要更新的索引信息") sys.exit(0) def update_es_fileds(spark, df_main, date_info_list, site_name, run_type): if run_type == "real": real_date_info_list = [max(date_info_list)] print("更新模式针对elasticsearch上最新索引,待更新索引的日期包括:", real_date_info_list) else: real_date_info_list = list(set(date_info_list) - set([max(date_info_list)])) print("更新模式针对elasticsearch上历史索引,待更新索引的日期包括:", real_date_info_list) for date_info in real_date_info_list: index_name = f"{site_name}_st_detail_month_{date_info}" es_asin_sql = f""" SELECT asin from es_selection.default_db.{index_name} """ df_es = DorisHelper.spark_import_with_sql(spark, es_asin_sql) df_es = df_es.repartition(40) df_need_update = df_es.join( df_main, on=['asin'], how='inner' ) print(f"Elasticsearch上{site_name} {date_info}需要更新的数据为:") df_need_update.cache() df_need_update.show(10, truncate=False) es_options = { "es.nodes": __es_ip__, "es.port": __es_port__, "es.net.http.auth.user": __es_user__, "es.net.http.auth.pass": __es_passwd__, "es.mapping.id": "asin", "es.resource": f"{index_name}/_doc", "es.batch.write.refresh": "false", "es.batch.write.retry.wait": "60s", "es.batch.size.entries": "5000", "es.nodes.wan.only": "false", "es.batch.write.concurrency": "60", "es.write.operation": "upsert" } try: df_need_update.write.format("org.elasticsearch.spark.sql") \ .options(**es_options) \ .mode("append") \ .save() print(f"elasticsearch {index_name} 更新完毕!") except Exception as e: print("An error occurred while writing to Elasticsearch:", str(e)) CommonUtil.send_wx_msg(['wujicang', 'wangrui4'], '\u26A0 es用户标记信息更新失败', f'es更新用户标记信息失败:{site_name}, {date_info}') pass print("elasticsearch 所有数据全部更新完毕") def get_date_info(spark, file_path, date_info, run_type): if run_type == 'real': print("最新索引更新模式对应日期为:", date_info) return date_info else: cilent = HdfsUtils.get_hdfs_cilent() try: status = cilent.status(file_path) if status is not None: df_read = spark.read.parquet(file_path) hdfs_date_info = str(df_read.select("date_info").first()[0]) print("hdfs上记录的日期为:", hdfs_date_info) return hdfs_date_info except HdfsError as e: print("hdfs上没有记录日期, 传入的日期参数为: ", date_info) return date_info def save_date_info_to_hdfs(spark, date_info, file_path): if date_info <= '2024-01-01': pass else: data = [Row(date_info=date_info)] df_save = spark.createDataFrame(data) df_save.write.mode('overwrite').parquet(file_path) def get_main_asin(spark, site_name, date_info): pg_con_info = DBUtil.get_connection_info("postgresql", "us") main_sql = get_user_mask_type_asin_sql(site_name, date_info) print("用户标记信息查询sql为: ", main_sql) if pg_con_info is not None: df_main = SparkUtil.read_jdbc_query(session=spark, url=pg_con_info['url'], pwd=pg_con_info['pwd'], username=pg_con_info['username'], query=main_sql) df_main = df_main.repartition(40).cache() count = df_main.count() if count == 0: print("没有需要更新的数据") sys.exit(0) else: print("待更新的数据量为:", count) df_main.show(10, truncate=False) return df_main else: pass def main(site_name, date_info, run_type): file_path = f"/tmp/wangrui/usr_mask/{site_name}/date_info.parquet" spark = create_spark(site_name) cur_date_info = get_date_info(spark, file_path, date_info, run_type) df_main = get_main_asin(spark, site_name, cur_date_info) date_info_list = get_es_index_name(site_name) update_es_fileds(spark, df_main, date_info_list, site_name, run_type) save_date_info_to_hdfs(spark, date_info, file_path) spark.stop() def create_spark(site_name): return SparkSession.builder \ .master("yarn") \ .appName(f"es_update_with_usr_mask: {site_name}") \ .config("spark.sql.warehouse.dir", __warehouse_dir__) \ .config("spark.metastore.uris", __metastore_uris__) \ .config("spark.network.timeout", 1000000) \ .config("spark.sql.orc.mergeSchema", True) \ .config("spark.sql.parquet.compression.codec", "lzo") \ .config("spark.driver.maxResultSize", "10g") \ .config("spark.sql.autoBroadcastJoinThreshold", -1) \ .config("spark.sql.shuffle.partitions", 100) \ .config("spark.executor.memory", "15g") \ .config("spark.executor.cores", "4") \ .config("spark.executor.instances", "10") \ .config("spark.driver.memory", "15g") \ .config("spark.yarn.queue", "spark") \ .enableHiveSupport() \ .getOrCreate() if __name__ == '__main__': arguments = sys.argv[1:] site_name = sys.argv[1] # 参数1:站点 run_type = sys.argv[2] # 参数2:运行类型(real:最新索引;history:历史索引) if run_type == 'history': if len(arguments) == 3: # 参数3:数据日期范围:如果针对最新索引,不用传入 date_info = sys.argv[3] else: date_info = '2024-01-01' else: date_info = '2024-01-01' main(site_name, date_info, run_type)