""" @Author : HuangJian @Description : 店铺基础信息报表 @SourceTable : ①ods_seller_account_feedback ②ods_seller_asin_account ③ods_asin_detail_product ④ods_seller_asin_syn ⑤ods_asin_variat @SinkTable : dwt_fb_base_report @SinkTable : dwt_fb_asin_info @CreateTime : 2022/07/05 15:48 @UpdateTime : 2022/07/05 15:48 """ import os import sys sys.path.append(os.path.dirname(sys.path[0])) from utils.hdfs_utils import HdfsUtils from utils.common_util import CommonUtil, DateTypes from pyspark.sql.types import StringType, IntegerType, DoubleType from utils.spark_util import SparkUtil from pyspark.sql import functions as F from yswg_utils.common_udf import udf_get_package_quantity from yswg_utils.common_udf import udf_new_asin_flag from utils.db_util import DBUtil class DwtFbBaseReport(object): def __init__(self, site_name, date_type, date_info): self.site_name = site_name self.date_type = date_type self.date_info = date_info self.hive_tb = "dwt_fb_base_report" self.partition_dict = { "site_name": site_name, "date_type": date_type, "date_info": date_info } # 落表路径校验 self.hdfs_path = CommonUtil.build_hdfs_path(self.hive_tb, partition_dict=self.partition_dict) # 创建spark_session对象相关 app_name = f"{self.__class__.__name__}:{site_name}:{date_info}" self.spark = SparkUtil.get_spark_session(app_name) # 获取不同维度日期下的计算日期YYYY-MM-DD self.cal_date = CommonUtil.get_calDay_by_dateInfo(self.spark, self.date_type, self.date_info) self.last_month = CommonUtil.get_month_offset(date_info, -1) # 全局df初始化 self.df_fb_feedback = self.spark.sql(f"select 1+1;") self.df_fb_asin = self.spark.sql(f"select 1+1;") self.df_top20_asin = self.spark.sql(f"select 1+1;") self.df_asin_history = self.spark.sql(f"select 1+1;") self.df_asin_parent = self.spark.sql(f"select 1+1;") self.df_fb_agg = self.spark.sql(f"select 1+1;") self.df_fb_asin_detail = self.spark.sql(f"select 1+1;") self.df_self_seller_id = self.spark.sql(f"select 1+1;") self.df_seller_account = self.spark.sql(f"select 1+1;") # 初始化UDF函数 self.udf_new_asin_flag = F.udf(udf_new_asin_flag, IntegerType()) self.u_judge_package_quantity = F.udf(udf_get_package_quantity, IntegerType()) def read_data(self): # ods_seller_account_feedback 月度店铺报告表主表 print("获取 ods_seller_account_feedback") sql = f"""select cur_fd.seller_id, cur_fd.fb_web_asin_num, cur_fd.fb_country_name, cur_fd.count_30_day_num, cur_fd.count_1_year_num, cur_fd.count_lifetime_num, cur_fd.fb_crawl_date, round((count_30_day_num - last_30_day_num) / last_30_day_num, 4) as count_30_day_rate, round((count_1_year_num - last_1_year_num) / last_1_year_num, 4) as count_1_year_rate, round((count_lifetime_num - last_lifetime_num) / last_lifetime_num, 4) as count_life_time_rate from (select seller_id, num as fb_web_asin_num, count_30_day as count_30_day_num, count_1_year as count_1_year_num, count_lifetime as count_lifetime_num, country_name as fb_country_name, date_format(updated_at, 'yyyy-MM-dd HH:mm:ss') as fb_crawl_date from ods_seller_account_feedback where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info}' and length(seller_id) > 2 ) cur_fd left join (select seller_id, count_30_day as last_30_day_num, count_1_year as last_1_year_num, count_lifetime as last_lifetime_num from ods_seller_account_feedback where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.last_month}' and length(seller_id) > 2 ) last_fd on cur_fd.seller_id = last_fd.seller_id""" self.df_fb_feedback = self.spark.sql(sqlQuery=sql) self.df_fb_feedback = self.df_fb_feedback.drop_duplicates(['seller_id']).cache() print(sql) # 获取我们内部的店铺与asin的数据库(从搜索词抓下来,店铺与asin的关系表) print("获取 ods_seller_asin_account") sql = f""" select seller_id,asin from ods_seller_asin_account where site_name='{self.site_name}' and date_format(created_at,'yyyy-MM-dd') <= '{self.cal_date}' """ self.df_fb_asin = self.spark.sql(sqlQuery=sql) self.df_fb_asin = self.df_fb_asin.drop_duplicates(['seller_id', 'asin']) print(sql) # 获取店铺top 20 asin信息计算 print("获取 ods_asin_detail_product") sql = f""" select seller_id, asin, price, rating, total_comments, row_num as rank from ods_asin_detail_product where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info}' and row_num <= 20 """ self.df_top20_asin = self.spark.sql(sqlQuery=sql) self.df_top20_asin = self.df_top20_asin.drop_duplicates(['seller_id', 'asin']) print(sql) # 获取dim_cal_asin_history提取launch_time用于计算是否新品 print("获取 dim_cal_asin_history") sql = f""" select asin, asin_title, asin_img_url, asin_price, asin_rating, asin_total_comments, asin_weight, asin_volume, asin_launch_time from dim_cal_asin_history_detail where site_name = '{self.site_name}' """ self.df_asin_history = self.spark.sql(sqlQuery=sql) print(sql) # 获取ods_asin_variat提取parent_asin用于计算是多变体 print("获取 dim_asin_variation_info") sql = f"select asin,parent_asin from dim_asin_variation_info " \ f"where site_name='{self.site_name}'" \ f" and asin != parent_asin " self.df_asin_parent = self.spark.sql(sqlQuery=sql) print(sql) # 获取ods_seller_account_syn提取account_name print("获取 ods_seller_account_syn") sql = f"select seller_id,account_name,id from ods_seller_account_syn " \ f"where site_name='{self.site_name}'" self.df_seller_account = self.spark.sql(sqlQuery=sql) # 进行去重 self.df_seller_account = self.df_seller_account.orderBy(self.df_seller_account.id.desc()) self.df_seller_account = self.df_seller_account.drop_duplicates(['seller_id']) self.df_seller_account = self.df_seller_account.drop('id') print(sql) # 获取mysql:selection.accounts ,用于排除公司内部店铺 print("获取 selection.accounts") sql = f""" select seller_id, 1 as is_self_fb from (select distinct seller_id from selection.accounts) t1 """ conn_info = DBUtil.get_connection_info("mysql", "us") self.df_self_seller_id = SparkUtil.read_jdbc_query( session=self.spark, url=conn_info["url"], pwd=conn_info["pwd"], username=conn_info["username"], query=sql ) def handle_fb_top_20(self): print("处理asin_detail_product的top20指标") self.df_top20_asin = self.df_top20_asin. \ join(self.df_asin_history.select('asin', 'asin_launch_time'), on='asin', how='left') self.df_top20_asin = self.df_top20_asin.withColumn("is_asin_new", self.udf_new_asin_flag(F.col('asin_launch_time'), F.lit(self.cal_date))) self.df_top20_asin = self.df_top20_asin.groupby('seller_id').agg( F.avg('price').alias('top_20_avg_price'), F.avg('rating').alias('top_20_avg_rating'), F.avg('total_comments').alias('top_20_avg_total_comments'), F.sum('is_asin_new').alias('top_20_new_asin_num') ) def handle_fb_cal_agg(self): print("处理店铺基础报表聚合数据") df_fb_join = self.df_fb_feedback.select('seller_id').join( self.df_fb_asin, on='seller_id', how='left' ) df_fb_join = df_fb_join.join( self.df_asin_history, on='asin', how='left' ).join( self.df_asin_parent, on='asin', how='left' ) # 计算是否新品 df_fb_join = df_fb_join.withColumn("is_asin_new", self.udf_new_asin_flag(F.col('asin_launch_time'), F.lit(self.cal_date))) # 计算店铺-asin的打包数量 df_fb_join = df_fb_join.withColumn('asin_package_quantity', self.u_judge_package_quantity(F.col('asin_title'))) # 打包数量标记 打包数量>=2的商品数标识 df_fb_join = df_fb_join.withColumn('is_pq_flag', F.when(F.col('asin_package_quantity') >= 2, F.lit(1))) # 补充parent_asin-如果为null则说明没有匹配到变体表中的asin,则设定parent_asin为自己 df_fb_join = df_fb_join.withColumn('parent_asin', F.when(F.col('parent_asin').isNull(), F.col('asin')) .otherwise(F.col('parent_asin'))) self.df_fb_asin_detail = df_fb_join.cache() fb_counts_agg = self.df_fb_asin_detail.groupby(['seller_id']).agg( F.count('asin').alias('fb_asin_total'), F.sum('is_asin_new').alias('fb_new_asin_num'), F.sum('is_pq_flag').alias('fb_pq_num') ) # 计算新品比率 fb_counts_agg = fb_counts_agg.withColumn('fb_new_asin_rate', F.round(F.col('fb_new_asin_num') / F.col('fb_asin_total'), 4)) # 计算打包数量比率 fb_counts_agg = fb_counts_agg.withColumn('fb_pq_rate', F.round(F.col('fb_pq_num') / F.col('fb_asin_total'), 4)) fb_counts_agg = fb_counts_agg.select('seller_id', 'fb_new_asin_num', 'fb_pq_num', 'fb_asin_total', 'fb_new_asin_rate', 'fb_pq_rate') # 计算店铺多数量占比 有变体asin数量/(有变体asin数量+单产品asin数量) 逻辑实现 # 计算多变体比率 df_variant_radio = self.df_fb_asin_detail.groupby(['seller_id', 'parent_asin']).agg( F.count('asin').alias('asin_son_count') ) # 打上多变体标签 如果asin_son_count > 1则说明该店铺该asin存在多变体 df_variant_radio = df_variant_radio.withColumn('is_variant_flag', F.when(F.col('asin_son_count') > 1, F.lit(1))) # 按照seller_id再次聚合,得出多变体计算分子分母 df_variant_radio = df_variant_radio.groupby(['seller_id']).agg( F.sum('is_variant_flag').alias('fb_more_variant_num'), F.count('parent_asin').alias('fb_variant_asin_total') ) # 得出多变体比率 df_variant_radio = df_variant_radio.withColumn('fb_variant_rate', F.round(F.col('fb_more_variant_num') / F.col( 'fb_variant_asin_total'), 4)) df_variant_radio = df_variant_radio.select('seller_id', 'fb_more_variant_num', 'fb_variant_asin_total', 'fb_variant_rate') # 合并计算结果 self.df_fb_agg = self.df_fb_feedback.join( fb_counts_agg, on='seller_id', how='left' ).join( df_variant_radio, on='seller_id', how='left' ).join( self.df_top20_asin, on='seller_id', how='left' ) # 关联公司店铺df,并标记是否公司内部店铺 self.df_fb_agg = self.df_fb_agg.join( self.df_self_seller_id, on='seller_id', how='left' ) # 没有关联上的赋值为0,则不是公司内部店铺 self.df_fb_agg = self.df_fb_agg.na.fill({"is_self_fb": 0}) # 输出数据集-report def save_data_report(self): # 关联ods_seller_account_syn,带回account_name-采用inner join过滤掉库中无店铺名称的数据 df_save = self.df_fb_agg.join(self.df_seller_account, on='seller_id', how='inner') df_save = df_save.select( F.col('seller_id'), F.col('account_name'), F.col('fb_country_name'), F.col('fb_web_asin_num'), F.col('is_self_fb'), F.round('top_20_avg_price', 4).alias('top_20_avg_price'), F.round('top_20_avg_rating', 4).alias('top_20_avg_rating'), F.ceil('top_20_avg_total_comments').alias('top_20_avg_total_comments'), F.col('top_20_new_asin_num'), F.col('count_30_day_num'), F.col('count_1_year_num'), F.col('count_lifetime_num'), F.col('count_30_day_rate'), F.col('count_1_year_rate'), F.col('count_life_time_rate'), F.col('fb_new_asin_num'), F.col('fb_asin_total'), F.col('fb_new_asin_rate'), F.col('fb_variant_rate'), F.col('fb_more_variant_num'), F.col('fb_variant_asin_total'), F.col('fb_pq_rate'), F.col('fb_pq_num'), F.col('fb_crawl_date'), F.when(F.col('fb_country_name').isNull(), F.lit(0)) .when(F.col('fb_country_name') == self.site_name.upper(), F.lit(1)) .when(F.col('fb_country_name') == 'CN', F.lit(2)) .otherwise(F.lit(3)).alias('fb_country_name_type'), F.when(F.col('fb_asin_total').isNull(), F.lit(0)) .when(F.col('fb_asin_total') <= 300, F.lit(1)) .when(F.col('fb_asin_total') <= 1000, F.lit(2)) .otherwise(F.lit(3)).alias('fb_account_type'), F.when(F.col('fb_asin_total').isNull(), F.lit(0)) .when(F.col('fb_asin_total') == 0, F.lit(1)) .when(F.col('fb_asin_total') <= 50, F.lit(2)) .when(F.col('fb_asin_total') <= 200, F.lit(3)) .when(F.col('fb_asin_total') <= 500, F.lit(4)) .when(F.col('fb_asin_total') <= 1000, F.lit(5)) .when(F.col('fb_asin_total') <= 5000, F.lit(6)) .when(F.col('fb_asin_total') <= 10000, F.lit(7)) .otherwise(F.lit(8)).alias('fb_asin_total_type'), F.when(F.col('fb_new_asin_num').isNull(), F.lit(0)) .when(F.col('fb_new_asin_num') == 0, F.lit(1)) .when(F.col('fb_new_asin_num') <= 5, F.lit(2)) .when(F.col('fb_new_asin_num') <= 10, F.lit(3)) .when(F.col('fb_new_asin_num') <= 20, F.lit(4)) .when(F.col('fb_new_asin_num') <= 30, F.lit(5)) .when(F.col('fb_new_asin_num') <= 50, F.lit(6)) .when(F.col('fb_new_asin_num') <= 100, F.lit(7)) .otherwise(F.lit(8)).alias('fb_new_asin_num_type'), F.when(F.col('fb_new_asin_rate').isNull(), F.lit(0)) .when(F.col('fb_new_asin_rate') == 0, F.lit(1)) .when(F.col('fb_new_asin_rate') <= 0.05, F.lit(2)) .when(F.col('fb_new_asin_rate') <= 0.1, F.lit(3)) .when(F.col('fb_new_asin_rate') <= 0.2, F.lit(4)) .when(F.col('fb_new_asin_rate') <= 0.5, F.lit(5)) .otherwise(F.lit(6)).alias('fb_new_asin_rate_type'), F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS').alias('created_time'), F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS').alias('updated_time'), F.lit(None).alias('usr_mask_type'), F.lit(None).alias('usr_mask_progress'), F.lit(self.site_name).alias('site_name'), F.lit(self.date_type).alias('date_type'), F.lit(self.date_info).alias('date_info') ) # CommonUtil.check_schema(self.spark, df_save, self.hive_tb) print(f"清除hdfs目录中:{self.hdfs_path}") HdfsUtils.delete_file_in_folder(self.hdfs_path) df_save = df_save.repartition(1) partition_by = ["site_name", "date_type", "date_info"] print(f"当前存储的表名为:{self.hive_tb},分区为{partition_by}", ) df_save.write.saveAsTable(name=self.hive_tb, format='hive', mode='append', partitionBy=partition_by) print("success") # 输出数据集-asin_info def save_data_asin_info(self): # 只保留新品的详情 self.df_fb_asin_detail = self.df_fb_asin_detail.filter('is_asin_new = 1') # 关联ods_seller_account_syn,带回account_name-采用inner join过滤掉库中无店铺名称的数据; df_save_asin = self.df_fb_asin_detail.join(self.df_seller_account, on='seller_id', how='inner') df_save_asin = df_save_asin.select( F.col('seller_id'), F.col('account_name'), F.col('asin'), F.col('asin_title'), F.col('asin_launch_time'), F.col('is_asin_new'), F.col('asin_package_quantity'), F.col('is_pq_flag'), F.col('parent_asin'), F.col('asin_img_url'), F.col('asin_price'), F.col('asin_rating'), F.col('asin_total_comments'), F.col('asin_weight'), F.col('asin_volume'), F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS').alias('created_time'), F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS').alias('updated_time'), F.lit(self.site_name).alias('site_name'), F.lit(self.date_type).alias('date_type'), F.lit(self.date_info).alias('date_info') ) hive_tb_name = 'dwt_fb_asin_info' hdfs_path_asin_info = CommonUtil.build_hdfs_path(hive_tb_name, partition_dict=self.partition_dict) print(f"清除hdfs目录中:{hdfs_path_asin_info}") HdfsUtils.delete_file_in_folder(hdfs_path_asin_info) df_save_asin = df_save_asin.repartition(50) partition_by = ["site_name", "date_type", "date_info"] print(f"当前存储的表名为:{hive_tb_name},分区为{partition_by}", ) df_save_asin.write.saveAsTable(name=hive_tb_name, format='hive', mode='append', partitionBy=partition_by) print("success") def run(self): self.read_data() self.handle_fb_top_20() self.handle_fb_cal_agg() self.save_data_report() self.save_data_asin_info() if __name__ == '__main__': site_name = CommonUtil.get_sys_arg(1, None) date_type = CommonUtil.get_sys_arg(2, None) date_info = CommonUtil.get_sys_arg(3, None) # 参数3:年-周/年-月/年-季/年-月-日, 比如: 2022-1 obj = DwtFbBaseReport(site_name=site_name, date_type=date_type, date_info=date_info) obj.run()