from pyspark.sql.dataframe import DataFrame from pyspark.sql.session import SparkSession from pandas.core.frame import DataFrame as pd_DataFrame from pyspark.sql.types import MapType, StringType from yswg_utils.common_udf import parse_bsr_url import pandas as pd def asin_bsr_orders_df(df1, spark_session, site_name='us', date_info='2024-01'): # 计算asin的bsr月销量 """ df1: 带有 'asin', 'asin_bs_cate_1_rank', 'asin_bs_cate_1_id'字段 # df2: 带有 'asin_bsr_orders', 'asin_bs_cate_1_rank', 'asin_bs_cate_1_id'字段 spark_session: spark连接对象 site_name: 站点, 默认 us date_info: 年月, 默认 2024-01 """ sql = f""" select category_id as asin_bs_cate_1_id, rank as asin_bs_cate_1_rank, ceil(orders) as asin_bsr_orders from ods_one_category_report " \ f"where site_name='{site_name}' and date_type='month' and date_info='{date_info}'; """ df2 = spark_session.sql(sql) df1 = df1.join(df2, on=['asin_bs_cate_1_rank', 'asin_bs_cate_1_id'], how='left') return df1 def get_bsr_tree_full_name_df(site_name: str, spark_session: SparkSession) -> DataFrame: """ bsr tree 表获取 full_name :param site_name: :param spark_session: :return: """ sql = f""" select category_id, category_parent_id, rel_first_id as category_first_id, en_name, null as full_name from dim_bsr_category_tree where site_name = '{site_name}' order by nodes_num """ pd_df_all = spark_session.sql(sql).toPandas() full_name_result_df = pd.DataFrame() def build_name(parent_ids: list, parent_df: pd_DataFrame): nonlocal full_name_result_df child_df = pd_df_all.query(f"category_parent_id in {str(parent_ids)} ") if child_df.empty: return merged = pd.merge(child_df, parent_df, left_on='category_parent_id', right_on='category_id', how='left') if len(parent_ids) == 1: merged['full_name'] = merged['en_name_x'] else: merged['full_name'] = merged['full_name_y'].fillna("") + "›" + merged['en_name_x'] select = { 'category_id_x': 'category_id', 'category_first_id_x': 'category_first_id', 'category_parent_id_x': 'category_parent_id', 'en_name_x': 'en_name', 'full_name': 'full_name', } merged = merged.rename(columns=select)[[*select.values()]] full_name_result_df = pd.concat([full_name_result_df, merged], ignore_index=True) next_parent_ids = merged['category_id'].values.tolist() build_name(next_parent_ids, merged) parent_ids = ["0"] parent_df = pd_df_all.query(f"category_id in {str(parent_ids)} ") if not parent_df.empty: build_name(parent_ids, parent_df) result_df = spark_session.createDataFrame(full_name_result_df) result_df = result_df.drop_duplicates(['full_name']) return result_df def get_asin_unlanuch_df(site_name: str, spark_session: SparkSession) -> DataFrame: """ 获取全部已下架asin详情 :param site_name: :param spark_session: :return: """ sql = f""" select asin, asin_unlaunch_time from dim_asin_err_state where site_name = '{site_name}' """ return spark_session.sql(sql).cache() def get_self_asin_df(site_name: str, spark_session: SparkSession) -> DataFrame: """ 获取是否是公司内部asin相关信息 """ sql = f""" select distinct asin from ods_self_asin where site_name = '{site_name}' """ return spark_session.sql(sql) def get_node_first_id_df(site_name: str, spark_session: SparkSession) -> DataFrame: """ 获取nodeid 和 bsr 一级分类id对应关系 :param site_name: :param spark_session: """ sql = f""" select node_id, max(category_first_id) as category_first_id from dim_category_desc_id where site_name = '{site_name}' group by node_id """ return spark_session.sql(sql) def get_first_id_from_category_desc_df(site_name: str, spark_session: SparkSession)-> DataFrame: """ 获取分类id和分类名称的对应关系 """ sql = f""" select category_id as category_first_id, en_name as category_first_name from big_data_selection.dim_bsr_category_tree where site_name = '{site_name}' and category_parent_id = 0 and delete_time is null """ return spark_session.sql(sqlQuery=sql) def get_bsr_category_tree_df(site_name: str, spark_session: SparkSession) -> DataFrame: """ 获取bsr分类树id和一级分类id对应关系 :param site_name: :param spark_session: :return: """ sql = f""" select category_id as category_id, rel_first_id as category_first_id, category_name from ( select category_id, rel_first_id, en_name as category_name, row_number() over (partition by category_id order by delete_time desc nulls first ) as row_number from dim_bsr_category_tree where site_name = '{site_name}' ) tmp where row_number = 1 """ return spark_session.sql(sql) def get_old_id_category_df(site_name: str, spark_session: SparkSession) -> DataFrame: """ 获取bsr旧分类id和当前分类id对应关系 :param site_name: :param spark_session: :return: """ spark_session.udf.register("parse_bsr_url", parse_bsr_url, MapType(StringType(), StringType())) sql = f""" select id as cate_1_id, parse_bsr_url(nodes_num, path)['category_id'] as category_id from ods_bs_category where site_name = '{site_name}' """ return spark_session.sql(sql) def get_user_mask_type_asin_sql(site_name: str, day: str) -> DataFrame: """ 查询某日用户更新的流量选品字段数据 usr_mask_type 类型 usr_mask_progress 进度 :return: """ add_condition = '' if day is not None: add_condition = f"and create_time >='{day}' " pass return f""" with df1 as ( select edit_key_id as asin, val_after as usr_mask_type from ( select filed, edit_key_id, val_after, row_number() over ( partition by module,site_name, filed, edit_key_id order by id desc ) as last_row from sys_edit_log where val_after is not null and edit_key_id is not null and edit_key_id != '' and site_name = '{site_name}' and user_id != 'admin' and module in ('流量选品') and filed in ('usr_mask_type') {add_condition} ) tmp where last_row = 1 ), df2 as ( select edit_key_id as asin, val_after as usr_mask_progress from ( select filed, edit_key_id, val_after, row_number() over ( partition by module,site_name, filed, edit_key_id order by id desc ) as last_row from sys_edit_log where val_after is not null and edit_key_id is not null and edit_key_id != '' and user_id != 'admin' and site_name = '{site_name}' and module in ('流量选品') and filed in ('usr_mask_progress') {add_condition} ) tmp where last_row = 1 ) select df1.asin, df1.usr_mask_type, df2.usr_mask_progress from df1 full outer join df2 on df1.asin = df2.asin """ if __name__ == '__main__': print(get_user_mask_type_asin_sql("us", "2024-01-01"))