import os import sys sys.path.append(os.path.dirname(sys.path[0])) from utils.common_util import CommonUtil from utils.db_util import DBUtil from utils.hdfs_utils import HdfsUtils from pyspark.sql.types import DoubleType, StringType from utils.spark_util import SparkUtil from pyspark.sql import functions as F from yswg_utils.common_df import get_bsr_tree_full_name_df """ 利润率相关配置表 """ class DimProfitConfig(object): def __init__(self, site_name): app_name = f"{self.__class__.__name__}" self.spark = SparkUtil.get_spark_session(app_name) self.udf_parse_num_reg = F.udf(self.udf_parse_num, DoubleType()) self.udf_lower_category_name_reg = F.udf(self.udf_lower_category_name, StringType()) self.hive_tb = "dim_profit_config" self.site_name = site_name pass @staticmethod def udf_parse_num(num: str): if num is None: return None return round(float(num.replace("%", "")) / 100, 4) @staticmethod def udf_lower_category_name(category_name: str): if category_name is None: return None category_name = category_name.replace("_", "") category_name = category_name.replace("&", "") category_name = category_name.replace("\"", "") category_name = category_name.replace(",", "") category_name = category_name.replace("'", "") category_name = category_name.replace(" ", "") return category_name.lower() def run(self): name_df = get_bsr_tree_full_name_df(self.site_name, self.spark) \ .select( F.expr("replace(full_name,' ', '')").alias("full_name"), F.col("category_id"), F.col("en_name").alias("category_name"), F.col("category_first_id"), ) sql_old = f""" with old_tb as ( select replace(asin_category_desc, " ", "") as full_name, node_id as category_id, category_first_id from dim_category_desc_id where site_name='us' and asin_category_desc is not null ), name_tb as ( select category_id, max(en_name) as en_name from dim_bsr_category_tree where site_name = '{self.site_name}' group by category_id ) select old_tb.full_name, old_tb.category_id, name_tb.en_name as category_name, old_tb.category_first_id from old_tb left join name_tb on old_tb.category_id = name_tb.category_id """ old_name_df = self.spark.sql(sql_old) # 去重 name_df = name_df.unionByName(old_name_df).drop_duplicates(['full_name']) name_df = name_df.withColumn("category_name_low", self.udf_lower_category_name_reg(F.col("category_name"))) sql = f""" select name as full_name, cost, avg_cost from us_profit_cost_new """ conn_info = DBUtil.get_connection_info("postgresql", "us") profit_cost_df = SparkUtil.read_jdbc_query( session=self.spark, url=conn_info["url"], pwd=conn_info["pwd"], username=conn_info["username"], query=sql ).cache() sql = f""" select categoy_name as category_name, calc_type, config_json as fba_config_json, referral_fee_formula from us_profit_fba_config """ conn_info = DBUtil.get_connection_info("postgresql", "us") fba_config_df = SparkUtil.read_jdbc_query( session=self.spark, url=conn_info["url"], pwd=conn_info["pwd"], username=conn_info["username"], query=sql ).cache() # fba 相关 fba_config_df = fba_config_df.join(name_df, on=['category_name'], how='left').select( F.col("category_name"), F.col("calc_type"), F.col("fba_config_json"), F.col("referral_fee_formula"), F.col("category_id"), F.col("category_first_id"), ).drop_duplicates(['category_id', 'category_first_id']) # 广告配置项 sql = f""" select category as category_name, adv from us_profit_adv """ conn_info = DBUtil.get_connection_info("postgresql", "us") adv_config_df = SparkUtil.read_jdbc_query( session=self.spark, url=conn_info["url"], pwd=conn_info["pwd"], username=conn_info["username"], query=sql ).cache() # 退款率原始数据 sql = f""" select category as category_name, round(avg(return_ratio), 4) as return_ratio from us_aba_profit_category_insights where year_week = '2023-22' group by category """ conn_info = DBUtil.get_connection_info("postgresql", "us") return_config_df = SparkUtil.read_jdbc_query( session=self.spark, url=conn_info["url"], pwd=conn_info["pwd"], username=conn_info["username"], query=sql ).cache() return_config_df = return_config_df.withColumn("category_name_low", self.udf_lower_category_name_reg(F.col("category_name"))) # 关联 return_config_df = return_config_df.join(name_df, on=['category_name_low'], how='left').select( name_df["category_name"].alias("category_name"), F.col("return_ratio"), F.col("category_id"), F.col("category_first_id"), ).drop_duplicates(['category_id', 'category_first_id']) # return_config_df.show(truncate=False) # fba 相关 adv_config_df = adv_config_df.join(name_df, on=['category_name'], how='left').select( F.col("category_name"), F.col("adv"), F.col("category_id"), F.col("category_first_id"), ).drop_duplicates(['category_id', 'category_first_id']) profit_cost_df = profit_cost_df.join(name_df, on=['full_name'], how='left').select( F.col("full_name"), name_df['category_name'].alias("category_name"), self.udf_parse_num_reg(F.col("cost")).alias("cost"), self.udf_parse_num_reg(F.col("avg_cost")).alias("avg_cost"), F.col("category_id"), F.col("category_first_id"), ).drop_duplicates(['category_id', 'category_first_id']) # 退款todo df_save = profit_cost_df \ .join(fba_config_df, on=['category_id', 'category_first_id'], how='fullouter') \ .join(adv_config_df, on=['category_id', 'category_first_id'], how='fullouter') \ .join(return_config_df, on=['category_id', 'category_first_id'], how='fullouter') \ .select( # 分类 F.col("full_name"), F.coalesce( profit_cost_df['category_name'], fba_config_df['category_name'], adv_config_df['category_name'] ).alias("category_name"), F.col("category_id"), F.col("category_first_id"), # 成本 F.col("cost"), F.col("avg_cost"), # fba计算配置类型 F.col("calc_type"), F.col("fba_config_json"), F.col("referral_fee_formula"), # 广告 F.col("adv"), # 退款率 F.col("return_ratio"), F.lit(self.site_name).alias("site_name") ) df_save = df_save.repartition(1) partition_dict = { "site_name": self.site_name, } # 删除或更新 CommonUtil.save_or_update_table(self.spark, self.hive_tb, partition_dict, df_save) pass if __name__ == '__main__': site_name = CommonUtil.get_sys_arg(1, None) obj = DimProfitConfig(site_name) obj.run()