common_df.py 7.6 KB
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.session import SparkSession
from pandas.core.frame import DataFrame as pd_DataFrame
from pyspark.sql.types import MapType, StringType
from yswg_utils.common_udf import parse_bsr_url
import pandas as pd


def asin_bsr_orders_df(df1, spark_session, site_name='us', date_info='2024-01'):
    # 计算asin的bsr月销量
    """
    df1: 带有 'asin', 'asin_bs_cate_1_rank', 'asin_bs_cate_1_id'字段
    # df2: 带有 'asin_bsr_orders', 'asin_bs_cate_1_rank', 'asin_bs_cate_1_id'字段
    spark_session: spark连接对象
    site_name: 站点, 默认 us
    date_info: 年月, 默认 2024-01
    """
    sql = f"""
        select category_id as asin_bs_cate_1_id, rank as asin_bs_cate_1_rank, ceil(orders) as asin_bsr_orders from ods_one_category_report " \
                  f"where site_name='{site_name}' and date_type='month' and date_info='{date_info}';
    """
    df2 = spark_session.sql(sql)
    df1 = df1.join(df2, on=['asin_bs_cate_1_rank', 'asin_bs_cate_1_id'], how='left')
    return df1


def get_bsr_tree_full_name_df(site_name: str, spark_session: SparkSession) -> DataFrame:
    """
    bsr tree 表获取 full_name
    :param site_name:
    :param spark_session:
    :return:
    """
    sql = f"""
            select category_id,
                   category_parent_id,
                   rel_first_id as category_first_id,
                   en_name,
                   null         as full_name
            from dim_bsr_category_tree
            where site_name = '{site_name}'
            order by nodes_num
            """

    pd_df_all = spark_session.sql(sql).toPandas()

    full_name_result_df = pd.DataFrame()

    def build_name(parent_ids: list, parent_df: pd_DataFrame):
        nonlocal full_name_result_df
        child_df = pd_df_all.query(f"category_parent_id in {str(parent_ids)} ")

        if child_df.empty:
            return
        merged = pd.merge(child_df, parent_df, left_on='category_parent_id', right_on='category_id', how='left')
        if len(parent_ids) == 1:
            merged['full_name'] = merged['en_name_x']
        else:
            merged['full_name'] = merged['full_name_y'].fillna("") + "›" + merged['en_name_x']

        select = {
            'category_id_x': 'category_id',
            'category_first_id_x': 'category_first_id',
            'category_parent_id_x': 'category_parent_id',
            'en_name_x': 'en_name',
            'full_name': 'full_name',
        }
        merged = merged.rename(columns=select)[[*select.values()]]
        full_name_result_df = pd.concat([full_name_result_df, merged], ignore_index=True)

        next_parent_ids = merged['category_id'].values.tolist()
        build_name(next_parent_ids, merged)

    parent_ids = ["0"]
    parent_df = pd_df_all.query(f"category_id in {str(parent_ids)} ")
    if not parent_df.empty:
        build_name(parent_ids, parent_df)

    result_df = spark_session.createDataFrame(full_name_result_df)
    result_df = result_df.drop_duplicates(['full_name'])
    return result_df


def get_asin_unlanuch_df(site_name: str, spark_session: SparkSession) -> DataFrame:
    """
    获取全部已下架asin详情
    :param site_name:
    :param spark_session:
    :return:
    """
    sql = f"""
        select asin, asin_unlaunch_time
        from dim_asin_err_state
        where site_name = '{site_name}'
    """
    return spark_session.sql(sql).cache()


def get_self_asin_df(site_name: str, spark_session: SparkSession) -> DataFrame:
    """
    获取是否是公司内部asin相关信息
    """
    sql = f"""
        select distinct  asin
        from ods_self_asin
        where site_name = '{site_name}'
    """
    return spark_session.sql(sql)


def get_node_first_id_df(site_name: str, spark_session: SparkSession) -> DataFrame:
    """
    获取nodeid 和 bsr 一级分类id对应关系
    :param site_name:
    :param spark_session:
    """
    sql = f"""
        select node_id,
               max(category_first_id) as category_first_id
        from dim_category_desc_id
        where site_name = '{site_name}'
        group by node_id
    """
    return spark_session.sql(sql)


def get_first_id_from_category_desc_df(site_name: str, spark_session: SparkSession)-> DataFrame:
    """
        获取分类id和分类名称的对应关系
    """
    sql = f"""
            select category_id as category_first_id, en_name as category_first_name 
            from big_data_selection.dim_bsr_category_tree
            where site_name = '{site_name}'
            and category_parent_id = 0 and delete_time is null
        """
    return spark_session.sql(sqlQuery=sql)


def get_bsr_category_tree_df(site_name: str, spark_session: SparkSession) -> DataFrame:
    """
    获取bsr分类树id和一级分类id对应关系
    :param site_name:
    :param spark_session:
    :return:
    """
    sql = f"""
        select category_id as category_id,
               rel_first_id as category_first_id,
               category_name
    from (
             select category_id,
                    rel_first_id,
                    en_name as category_name,
                    row_number() over (partition by category_id order by delete_time desc nulls first ) as row_number
             from dim_bsr_category_tree
             where site_name = '{site_name}'
         ) tmp
    where row_number = 1
    """
    return spark_session.sql(sql)


def get_old_id_category_df(site_name: str, spark_session: SparkSession) -> DataFrame:
    """
    获取bsr旧分类id和当前分类id对应关系
    :param site_name:
    :param spark_session:
    :return:
    """
    spark_session.udf.register("parse_bsr_url", parse_bsr_url, MapType(StringType(), StringType()))
    sql = f"""
            select id                                            as cate_1_id,
                   parse_bsr_url(nodes_num, path)['category_id'] as category_id
            from ods_bs_category
            where site_name = '{site_name}'
"""
    return spark_session.sql(sql)


def get_user_mask_type_asin_sql(site_name: str, day: str) -> DataFrame:
    """
    查询某日用户更新的流量选品字段数据
    usr_mask_type       类型
    usr_mask_progress   进度
    :return:
    """
    add_condition = ''
    if day is not None:
        add_condition = f"and create_time >='{day}' "
        pass

    return f"""
with df1 as (
	select edit_key_id as asin,
		   val_after   as usr_mask_type
	from (
			 select filed,
					edit_key_id,
					val_after,
					row_number() over ( partition by module,site_name, filed, edit_key_id order by id desc ) as last_row
			 from sys_edit_log
			 where val_after is not null
			   and edit_key_id is not null
			   and edit_key_id != ''
               and site_name = '{site_name}'
			   and user_id != 'admin'
			   and module in ('流量选品')
			   and filed in ('usr_mask_type')
               {add_condition}
		 ) tmp
	where last_row = 1
),
	 df2 as (
		 select edit_key_id as asin,
				val_after   as usr_mask_progress
		 from (
				  select filed,
						 edit_key_id,
						 val_after,
						 row_number() over ( partition by module,site_name, filed, edit_key_id order by id desc ) as last_row
				  from sys_edit_log
				  where val_after is not null
					and edit_key_id is not null
					and edit_key_id != ''
					and user_id != 'admin'
                    and site_name = '{site_name}'
					and module in ('流量选品')
					and filed in ('usr_mask_progress')
                   {add_condition}
			  ) tmp
		 where last_row = 1
	 )
select df1.asin, df1.usr_mask_type, df2.usr_mask_progress
from df1
		 full outer join df2 on df1.asin = df2.asin
"""

if __name__ == '__main__':
    print(get_user_mask_type_asin_sql("us", "2024-01-01"))