es_update_with_usr_mask.py 7.47 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
# author : wangrui
# data : 2024/7/17 14:28
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from utils.db_util import DBUtil, DbTypes
from pyspark.sql import SparkSession
import pandas as pd
from yswg_utils.common_df import get_user_mask_type_asin_sql
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils, HdfsError
from pyspark.sql import Row
from utils.spark_util import SparkUtil
from utils.DorisHelper import DorisHelper

__es_ip__ = "192.168.10.217"
__es_port__ = "9200"
__es_user__ = "elastic"
__es_passwd__ = "selection2021.+"
__warehouse_dir__ = "hdfs://nameservice1:8020/home/big_data_selection"
__metastore_uris__ = "thrift://hadoop16:9083"



def get_es_index_name(site_name):
    engine_mysql = DBUtil.get_db_engine(db_type=DbTypes.mysql.name, site_name=site_name)
    sql = f"""SELECT REPLACE(report_date, '-', '_') as date_info FROM workflow_everyday WHERE site_name='{site_name}' AND date_type='month' AND page='流量选品' AND status_val=14 ORDER BY report_date DESC;"""
    print("查询需要更新的elaticsearch索引: ")
    print("sql=", sql)
    df = pd.read_sql(sql, con=engine_mysql)
    if df.shape[0]:
        date_info_list = list(df.date_info)
        filter_date_info_list = [date_info for date_info in date_info_list if date_info >= '2024-01']
        if filter_date_info_list:
            return filter_date_info_list
        else:
            print("没有需要更新的索引信息")
            sys.exit(0)


def update_es_fileds(spark, df_main, date_info_list, site_name, run_type):
    if run_type == "real":
        real_date_info_list = [max(date_info_list)]
        print("更新模式针对elasticsearch上最新索引,待更新索引的日期包括:", real_date_info_list)
    else:
        real_date_info_list = list(set(date_info_list) - set([max(date_info_list)]))
        print("更新模式针对elasticsearch上历史索引,待更新索引的日期包括:", real_date_info_list)
    for date_info in real_date_info_list:
        index_name = f"{site_name}_st_detail_month_{date_info}"
        es_asin_sql = f"""
              SELECT asin from es_selection.default_db.{index_name}
          """
        df_es = DorisHelper.spark_import_with_sql(spark, es_asin_sql)
        df_es = df_es.repartition(40)
        df_need_update = df_es.join(
            df_main, on=['asin'], how='inner'
        )
        print(f"Elasticsearch上{site_name} {date_info}需要更新的数据为:")
        df_need_update.cache()
        df_need_update.show(10, truncate=False)
        es_options = {
            "es.nodes": __es_ip__,
            "es.port": __es_port__,
            "es.net.http.auth.user": __es_user__,
            "es.net.http.auth.pass": __es_passwd__,
            "es.mapping.id": "asin",
            "es.resource": f"{index_name}/_doc",
            "es.batch.write.refresh": "false",
            "es.batch.write.retry.wait": "60s",
            "es.batch.size.entries": "5000",
            "es.nodes.wan.only": "false",
            "es.batch.write.concurrency": "60",
            "es.write.operation": "upsert"
        }
        try:
            df_need_update.write.format("org.elasticsearch.spark.sql") \
                .options(**es_options) \
                .mode("append") \
                .save()
            print(f"elasticsearch {index_name} 更新完毕!")
        except Exception as e:
            print("An error occurred while writing to Elasticsearch:", str(e))
            CommonUtil.send_wx_msg(['wujicang', 'wangrui4'], '\u26A0 es用户标记信息更新失败', f'es更新用户标记信息失败:{site_name}, {date_info}')
            pass
    print("elasticsearch 所有数据全部更新完毕")


def get_date_info(spark, file_path, date_info, run_type):
    if run_type == 'real':
        print("最新索引更新模式对应日期为:", date_info)
        return date_info
    else:
        cilent = HdfsUtils.get_hdfs_cilent()
        try:
            status = cilent.status(file_path)
            if status is not None:
                df_read = spark.read.parquet(file_path)
                hdfs_date_info = str(df_read.select("date_info").first()[0])
                print("hdfs上记录的日期为:", hdfs_date_info)
                return hdfs_date_info
        except HdfsError as e:
            print("hdfs上没有记录日期, 传入的日期参数为: ", date_info)
            return date_info


def save_date_info_to_hdfs(spark, date_info, file_path):
    if date_info <= '2024-01-01':
        pass
    else:
        data = [Row(date_info=date_info)]
        df_save = spark.createDataFrame(data)
        df_save.write.mode('overwrite').parquet(file_path)


def get_main_asin(spark, site_name, date_info):
    pg_con_info = DBUtil.get_connection_info("postgresql", "us")
    main_sql = get_user_mask_type_asin_sql(site_name, date_info)
    print("用户标记信息查询sql为: ", main_sql)
    if pg_con_info is not None:
        df_main = SparkUtil.read_jdbc_query(session=spark, url=pg_con_info['url'],
                                                        pwd=pg_con_info['pwd'], username=pg_con_info['username'],
                                                        query=main_sql)
        df_main = df_main.repartition(40).cache()
        count = df_main.count()
        if count == 0:
            print("没有需要更新的数据")
            sys.exit(0)
        else:
            print("待更新的数据量为:", count)
            df_main.show(10, truncate=False)
            return df_main
    else:
       pass


def main(site_name, date_info, run_type):
    file_path = f"/tmp/wangrui/usr_mask/{site_name}/date_info.parquet"
    spark = create_spark(site_name)
    cur_date_info = get_date_info(spark, file_path, date_info, run_type)
    df_main = get_main_asin(spark, site_name, cur_date_info)
    date_info_list = get_es_index_name(site_name)
    update_es_fileds(spark, df_main, date_info_list, site_name, run_type)
    save_date_info_to_hdfs(spark, date_info, file_path)
    spark.stop()


def create_spark(site_name):
    return SparkSession.builder \
                .master("yarn") \
                .appName(f"es_update_with_usr_mask: {site_name}") \
                .config("spark.sql.warehouse.dir", __warehouse_dir__) \
                .config("spark.metastore.uris", __metastore_uris__) \
                .config("spark.network.timeout", 1000000) \
                .config("spark.sql.orc.mergeSchema", True) \
                .config("spark.sql.parquet.compression.codec", "lzo") \
                .config("spark.driver.maxResultSize", "10g") \
                .config("spark.sql.autoBroadcastJoinThreshold", -1) \
                .config("spark.sql.shuffle.partitions", 100) \
                .config("spark.executor.memory", "15g") \
                .config("spark.executor.cores", "4") \
                .config("spark.executor.instances", "10") \
                .config("spark.driver.memory", "15g") \
                .config("spark.yarn.queue", "spark") \
                .enableHiveSupport() \
                .getOrCreate()


if __name__ == '__main__':
    arguments = sys.argv[1:]
    site_name = sys.argv[1]  # 参数1:站点
    run_type = sys.argv[2]   # 参数2:运行类型(real:最新索引;history:历史索引)
    if run_type == 'history':
        if len(arguments) == 3:  # 参数3:数据日期范围:如果针对最新索引,不用传入
            date_info = sys.argv[3]
        else:
            date_info = '2024-01-01'
    else:
        date_info = '2024-01-01'
    main(site_name, date_info, run_type)