dim_profit_config.py 7.65 KB
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))
from utils.common_util import CommonUtil
from utils.db_util import DBUtil
from utils.hdfs_utils import HdfsUtils
from pyspark.sql.types import DoubleType, StringType
from utils.spark_util import SparkUtil
from pyspark.sql import functions as F
from yswg_utils.common_df import get_bsr_tree_full_name_df

"""
利润率相关配置表
"""


class DimProfitConfig(object):

    def __init__(self, site_name):
        app_name = f"{self.__class__.__name__}"
        self.spark = SparkUtil.get_spark_session(app_name)
        self.udf_parse_num_reg = F.udf(self.udf_parse_num, DoubleType())
        self.udf_lower_category_name_reg = F.udf(self.udf_lower_category_name, StringType())
        self.hive_tb = "dim_profit_config"
        self.site_name = site_name
        pass

    @staticmethod
    def udf_parse_num(num: str):
        if num is None:
            return None
        return round(float(num.replace("%", "")) / 100, 4)

    @staticmethod
    def udf_lower_category_name(category_name: str):
        if category_name is None:
            return None
        category_name = category_name.replace("_", "")
        category_name = category_name.replace("&", "")
        category_name = category_name.replace("\"", "")
        category_name = category_name.replace(",", "")
        category_name = category_name.replace("'", "")
        category_name = category_name.replace(" ", "")
        return category_name.lower()

    def run(self):
        name_df = get_bsr_tree_full_name_df(self.site_name, self.spark) \
            .select(
            F.expr("replace(full_name,' ', '')").alias("full_name"),
            F.col("category_id"),
            F.col("en_name").alias("category_name"),
            F.col("category_first_id"),
        )

        sql_old = f"""
with old_tb as (
	select replace(asin_category_desc, " ", "") as full_name,
		   node_id                              as category_id,
		   category_first_id
	from dim_category_desc_id
	where site_name='us' and asin_category_desc is not null
),
	 name_tb as (
		 select category_id, max(en_name) as en_name
		 from dim_bsr_category_tree
		 where site_name = '{self.site_name}'
		 group by category_id
	 )
select old_tb.full_name,
	   old_tb.category_id,
	   name_tb.en_name as category_name,
	   old_tb.category_first_id
from old_tb
		 left join name_tb on old_tb.category_id = name_tb.category_id
"""
        old_name_df = self.spark.sql(sql_old)
        # 去重
        name_df = name_df.unionByName(old_name_df).drop_duplicates(['full_name'])
        name_df = name_df.withColumn("category_name_low", self.udf_lower_category_name_reg(F.col("category_name")))
        sql = f"""
        select name as full_name, cost, avg_cost
        from us_profit_cost_new
"""

        conn_info = DBUtil.get_connection_info("postgresql", "us")
        profit_cost_df = SparkUtil.read_jdbc_query(
            session=self.spark,
            url=conn_info["url"],
            pwd=conn_info["pwd"],
            username=conn_info["username"],
            query=sql
        ).cache()

        sql = f"""
            select categoy_name as category_name,
                   calc_type,
                   config_json as fba_config_json,
                   referral_fee_formula
            from us_profit_fba_config
"""

        conn_info = DBUtil.get_connection_info("postgresql", "us")
        fba_config_df = SparkUtil.read_jdbc_query(
            session=self.spark,
            url=conn_info["url"],
            pwd=conn_info["pwd"],
            username=conn_info["username"],
            query=sql
        ).cache()
        #  fba 相关
        fba_config_df = fba_config_df.join(name_df, on=['category_name'], how='left').select(
            F.col("category_name"),
            F.col("calc_type"),
            F.col("fba_config_json"),
            F.col("referral_fee_formula"),
            F.col("category_id"),
            F.col("category_first_id"),
        ).drop_duplicates(['category_id', 'category_first_id'])

        # 广告配置项
        sql = f"""
            select category as category_name,
                   adv
            from us_profit_adv
"""

        conn_info = DBUtil.get_connection_info("postgresql", "us")
        adv_config_df = SparkUtil.read_jdbc_query(
            session=self.spark,
            url=conn_info["url"],
            pwd=conn_info["pwd"],
            username=conn_info["username"],
            query=sql
        ).cache()

        # 退款率原始数据
        sql = f"""
       	 select category as category_name,
				round(avg(return_ratio), 4) as return_ratio
		 from us_aba_profit_category_insights
		 where year_week = '2023-22'
		 group by category
"""

        conn_info = DBUtil.get_connection_info("postgresql", "us")
        return_config_df = SparkUtil.read_jdbc_query(
            session=self.spark,
            url=conn_info["url"],
            pwd=conn_info["pwd"],
            username=conn_info["username"],
            query=sql
        ).cache()
        return_config_df = return_config_df.withColumn("category_name_low", self.udf_lower_category_name_reg(F.col("category_name")))
        # 关联
        return_config_df = return_config_df.join(name_df, on=['category_name_low'], how='left').select(
            name_df["category_name"].alias("category_name"),
            F.col("return_ratio"),
            F.col("category_id"),
            F.col("category_first_id"),
        ).drop_duplicates(['category_id', 'category_first_id'])
        # return_config_df.show(truncate=False)

        #  fba 相关
        adv_config_df = adv_config_df.join(name_df, on=['category_name'], how='left').select(
            F.col("category_name"),
            F.col("adv"),
            F.col("category_id"),
            F.col("category_first_id"),
        ).drop_duplicates(['category_id', 'category_first_id'])

        profit_cost_df = profit_cost_df.join(name_df, on=['full_name'], how='left').select(
            F.col("full_name"),
            name_df['category_name'].alias("category_name"),
            self.udf_parse_num_reg(F.col("cost")).alias("cost"),
            self.udf_parse_num_reg(F.col("avg_cost")).alias("avg_cost"),
            F.col("category_id"),
            F.col("category_first_id"),
        ).drop_duplicates(['category_id', 'category_first_id'])

        # 退款todo
        df_save = profit_cost_df \
            .join(fba_config_df, on=['category_id', 'category_first_id'], how='fullouter') \
            .join(adv_config_df, on=['category_id', 'category_first_id'], how='fullouter') \
            .join(return_config_df, on=['category_id', 'category_first_id'], how='fullouter') \
            .select(
            # 分类
            F.col("full_name"),
            F.coalesce(
                profit_cost_df['category_name'],
                fba_config_df['category_name'],
                adv_config_df['category_name']
            ).alias("category_name"),

            F.col("category_id"),
            F.col("category_first_id"),
            #  成本
            F.col("cost"),
            F.col("avg_cost"),
            #  fba计算配置类型
            F.col("calc_type"),
            F.col("fba_config_json"),
            F.col("referral_fee_formula"),
            # 广告
            F.col("adv"),
            # 退款率
            F.col("return_ratio"),
            F.lit(self.site_name).alias("site_name")
        )

        df_save = df_save.repartition(1)
        partition_dict = {
            "site_name": self.site_name,
        }
        #  删除或更新
        CommonUtil.save_or_update_table(self.spark, self.hive_tb, partition_dict, df_save)
        pass


if __name__ == '__main__':
    site_name = CommonUtil.get_sys_arg(1, None)
    obj = DimProfitConfig(site_name)
    obj.run()