es_update_with_usr_mask.py 7.47 KB
# author : wangrui
# data : 2024/7/17 14:28
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from utils.db_util import DBUtil, DbTypes
from pyspark.sql import SparkSession
import pandas as pd
from yswg_utils.common_df import get_user_mask_type_asin_sql
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils, HdfsError
from pyspark.sql import Row
from utils.spark_util import SparkUtil
from utils.DorisHelper import DorisHelper

__es_ip__ = "192.168.10.217"
__es_port__ = "9200"
__es_user__ = "elastic"
__es_passwd__ = "selection2021.+"
__warehouse_dir__ = "hdfs://nameservice1:8020/home/big_data_selection"
__metastore_uris__ = "thrift://hadoop16:9083"



def get_es_index_name(site_name):
    engine_mysql = DBUtil.get_db_engine(db_type=DbTypes.mysql.name, site_name=site_name)
    sql = f"""SELECT REPLACE(report_date, '-', '_') as date_info FROM workflow_everyday WHERE site_name='{site_name}' AND date_type='month' AND page='流量选品' AND status_val=14 ORDER BY report_date DESC;"""
    print("查询需要更新的elaticsearch索引: ")
    print("sql=", sql)
    df = pd.read_sql(sql, con=engine_mysql)
    if df.shape[0]:
        date_info_list = list(df.date_info)
        filter_date_info_list = [date_info for date_info in date_info_list if date_info >= '2024-01']
        if filter_date_info_list:
            return filter_date_info_list
        else:
            print("没有需要更新的索引信息")
            sys.exit(0)


def update_es_fileds(spark, df_main, date_info_list, site_name, run_type):
    if run_type == "real":
        real_date_info_list = [max(date_info_list)]
        print("更新模式针对elasticsearch上最新索引,待更新索引的日期包括:", real_date_info_list)
    else:
        real_date_info_list = list(set(date_info_list) - set([max(date_info_list)]))
        print("更新模式针对elasticsearch上历史索引,待更新索引的日期包括:", real_date_info_list)
    for date_info in real_date_info_list:
        index_name = f"{site_name}_st_detail_month_{date_info}"
        es_asin_sql = f"""
              SELECT asin from es_selection.default_db.{index_name}
          """
        df_es = DorisHelper.spark_import_with_sql(spark, es_asin_sql)
        df_es = df_es.repartition(40)
        df_need_update = df_es.join(
            df_main, on=['asin'], how='inner'
        )
        print(f"Elasticsearch上{site_name} {date_info}需要更新的数据为:")
        df_need_update.cache()
        df_need_update.show(10, truncate=False)
        es_options = {
            "es.nodes": __es_ip__,
            "es.port": __es_port__,
            "es.net.http.auth.user": __es_user__,
            "es.net.http.auth.pass": __es_passwd__,
            "es.mapping.id": "asin",
            "es.resource": f"{index_name}/_doc",
            "es.batch.write.refresh": "false",
            "es.batch.write.retry.wait": "60s",
            "es.batch.size.entries": "5000",
            "es.nodes.wan.only": "false",
            "es.batch.write.concurrency": "60",
            "es.write.operation": "upsert"
        }
        try:
            df_need_update.write.format("org.elasticsearch.spark.sql") \
                .options(**es_options) \
                .mode("append") \
                .save()
            print(f"elasticsearch {index_name} 更新完毕!")
        except Exception as e:
            print("An error occurred while writing to Elasticsearch:", str(e))
            CommonUtil.send_wx_msg(['wujicang', 'wangrui4'], '\u26A0 es用户标记信息更新失败', f'es更新用户标记信息失败:{site_name}, {date_info}')
            pass
    print("elasticsearch 所有数据全部更新完毕")


def get_date_info(spark, file_path, date_info, run_type):
    if run_type == 'real':
        print("最新索引更新模式对应日期为:", date_info)
        return date_info
    else:
        cilent = HdfsUtils.get_hdfs_cilent()
        try:
            status = cilent.status(file_path)
            if status is not None:
                df_read = spark.read.parquet(file_path)
                hdfs_date_info = str(df_read.select("date_info").first()[0])
                print("hdfs上记录的日期为:", hdfs_date_info)
                return hdfs_date_info
        except HdfsError as e:
            print("hdfs上没有记录日期, 传入的日期参数为: ", date_info)
            return date_info


def save_date_info_to_hdfs(spark, date_info, file_path):
    if date_info <= '2024-01-01':
        pass
    else:
        data = [Row(date_info=date_info)]
        df_save = spark.createDataFrame(data)
        df_save.write.mode('overwrite').parquet(file_path)


def get_main_asin(spark, site_name, date_info):
    pg_con_info = DBUtil.get_connection_info("postgresql", "us")
    main_sql = get_user_mask_type_asin_sql(site_name, date_info)
    print("用户标记信息查询sql为: ", main_sql)
    if pg_con_info is not None:
        df_main = SparkUtil.read_jdbc_query(session=spark, url=pg_con_info['url'],
                                                        pwd=pg_con_info['pwd'], username=pg_con_info['username'],
                                                        query=main_sql)
        df_main = df_main.repartition(40).cache()
        count = df_main.count()
        if count == 0:
            print("没有需要更新的数据")
            sys.exit(0)
        else:
            print("待更新的数据量为:", count)
            df_main.show(10, truncate=False)
            return df_main
    else:
       pass


def main(site_name, date_info, run_type):
    file_path = f"/tmp/wangrui/usr_mask/{site_name}/date_info.parquet"
    spark = create_spark(site_name)
    cur_date_info = get_date_info(spark, file_path, date_info, run_type)
    df_main = get_main_asin(spark, site_name, cur_date_info)
    date_info_list = get_es_index_name(site_name)
    update_es_fileds(spark, df_main, date_info_list, site_name, run_type)
    save_date_info_to_hdfs(spark, date_info, file_path)
    spark.stop()


def create_spark(site_name):
    return SparkSession.builder \
                .master("yarn") \
                .appName(f"es_update_with_usr_mask: {site_name}") \
                .config("spark.sql.warehouse.dir", __warehouse_dir__) \
                .config("spark.metastore.uris", __metastore_uris__) \
                .config("spark.network.timeout", 1000000) \
                .config("spark.sql.orc.mergeSchema", True) \
                .config("spark.sql.parquet.compression.codec", "lzo") \
                .config("spark.driver.maxResultSize", "10g") \
                .config("spark.sql.autoBroadcastJoinThreshold", -1) \
                .config("spark.sql.shuffle.partitions", 100) \
                .config("spark.executor.memory", "15g") \
                .config("spark.executor.cores", "4") \
                .config("spark.executor.instances", "10") \
                .config("spark.driver.memory", "15g") \
                .config("spark.yarn.queue", "spark") \
                .enableHiveSupport() \
                .getOrCreate()


if __name__ == '__main__':
    arguments = sys.argv[1:]
    site_name = sys.argv[1]  # 参数1:站点
    run_type = sys.argv[2]   # 参数2:运行类型(real:最新索引;history:历史索引)
    if run_type == 'history':
        if len(arguments) == 3:  # 参数3:数据日期范围:如果针对最新索引,不用传入
            date_info = sys.argv[3]
        else:
            date_info = '2024-01-01'
    else:
        date_info = '2024-01-01'
    main(site_name, date_info, run_type)