import os import sys import re sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from pyspark.sql.types import IntegerType, DoubleType from utils.templates import Templates from pyspark.sql import functions as F from pyspark.sql.window import Window from utils.hdfs_utils import HdfsUtils from utils.spark_util import SparkUtil from pyspark.storagelevel import StorageLevel from yswg_utils.common_udf import udf_parse_amazon_orders, udf_get_package_quantity class DwdMerchantwordsMeasure(Templates): def __init__(self, site_name='us', date_type='day', date_info='2024-01-01', batch='2024-0'): super().__init__() self.site_name = site_name self.date_type = date_type self.date_info = date_info self.batch = batch self.db_save = 'dwd_merchantwords_measure_v2' self.spark = self.create_spark_object( app_name=f"DwdMerchantwordsMeasure: {self.site_name}, {self.date_type}, {self.date_info}, {self.batch}") self.partitions_num = 5 self.partitions_by = ['site_name', 'batch'] self.df_merchantwords_detail = self.spark.sql(f"select 1+1;") self.df_products_num = self.spark.sql(f"select 1+1;") self.df_search_term_type = self.spark.sql(f"select 1+1;") self.df_self_asin = self.spark.sql(f"select 1+1;") self.df_asin_detail = self.spark.sql(f"select 1+1;") self.df_asin_buy_data = self.spark.sql(f"select 1+1;") self.df_asin_count = self.spark.sql(f"select 1+1;") self.df_st_count = self.spark.sql(f"select 1+1;") self.df_st_buy_data = self.spark.sql(f"select 1+1;") self.df_st_detail = self.spark.sql(f"select 1+1;") self.df_save = self.spark.sql(f"select 1+1;") self.data_type_list = ['zr', 'sp', 'sb1', 'sb2', 'sb3', 'hr', 'bs', 'ac'] # 引入公用udf self.u_parse_amazon_orders = self.spark.udf.register('u_parse_amazon_orders', udf_parse_amazon_orders, IntegerType()) self.u_get_package_quantity = self.spark.udf.register('u_get_package_quantity', udf_get_package_quantity, IntegerType()) # 自定义udf self.u_parse_asin_price = self.spark.udf.register('u_parse_asin_price', self.udf_parse_asin_price, DoubleType()) self.u_parse_asin_rating = self.spark.udf.register('u_parse_asin_rating', self.udf_parse_asin_rating, DoubleType()) self.u_parse_asin_reviews = self.spark.udf.register('u_parse_asin_reviews', self.udf_parse_asin_reviews, IntegerType()) hdfs_path = f"/home/{SparkUtil.DEF_USE_DB}/dwd/{self.db_save}/site_name={self.site_name}/batch={self.batch}" print(f"清除hdfs目录中.....{hdfs_path}") HdfsUtils.delete_hdfs_file(hdfs_path) @staticmethod def udf_parse_asin_price(price): if price: try: match_us = re.search(r'\$([\d,]+(?:\.\d+)?)', price) if match_us: number_str = match_us.group(1).replace(',', '') return float(number_str) match_de = re.search(r'([\d.]+),(\d+)\s*€', price) if match_de: integer_part = match_de.group(1).replace('.', '') decimal_part = match_de.group(2) number_str = f"{integer_part}.{decimal_part}" return float(number_str) else: return None except ValueError: return None return None @staticmethod def udf_parse_asin_rating(site, rating): """ 解析asin详情页面的rating """ if rating: if site == 'de': rating = re.findall(r"(.*) von", rating)[0] elif site == 'fr': rating = re.findall(r"(.*) sur", rating)[0] elif site == 'it': rating = re.findall(r"(.*) su", rating)[0] elif site == 'es': rating = re.findall(r"(.*) de", rating)[0] else: rating = re.findall(r"(.*) out", rating)[0] rating = rating.replace(',', '.') return float(rating) return None @staticmethod def udf_parse_asin_reviews(reviews): if reviews: try: match = re.search(r'\b(\d{1,3}(?:[,.]\d{3})*)(?:\s+\w+)?\b', reviews) if match: number_str = match.group(1).replace(',', '').replace('.', '') return int(number_str) else: return None except ValueError: return None else: return None def read_data(self): print("1.读取dwt_merchantwords_st_detail") sql = f""" select keyword, volume, avg_3m, avg_12m, depth, results_count, sponsored_ads_count, page_1_reviews, appearance, last_seen, update_time, lang, last_batch from dwt_merchantwords_st_detail_merge where site_name = '{self.site_name}' and batch = '2024-1'; """ self.df_merchantwords_detail = self.spark.sql(sqlQuery=sql) self.df_merchantwords_detail = self.df_merchantwords_detail.repartition(80).persist(StorageLevel.MEMORY_ONLY) self.df_merchantwords_detail.show(10, truncate=True) print("2.读取ods_merchantwords_brand_analytics,得到产品总数") sql = f""" select search_term as keyword, quantity_being_sold as asin_total_num, updated_time from ods_merchantwords_brand_analytics where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info}'; """ self.df_products_num = self.spark.sql(sqlQuery=sql) self.df_products_num = self.df_products_num.repartition(80).persist(StorageLevel.MEMORY_ONLY) self.df_products_num.show(10, truncate=True) print("3.读取ods_merchantwords_search_term_type,得到搜索词、asin、类型") for data_type in self.data_type_list: if data_type in ['zr', 'sp']: sql = f""" select search_term as keyword, asin, page, page_row, '{data_type}' as data_type, created_time, updated_time from ods_merchantwords_search_term_{data_type} where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info}'; """ df = self.spark.sql(sqlQuery=sql) df = df.repartition(80) elif data_type in ['sb1', 'sb2', 'sb3']: data_type_int = int(data_type[-1]) sql = f""" select search_term as keyword, asin, page, '{data_type}' as data_type, created_time, updated_time from ods_merchantwords_search_term_sb where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info}' and data_type = {data_type_int}; """ df = self.spark.sql(sqlQuery=sql) df = df.repartition(80) else: sql = f""" select search_term as keyword, asin, page, '{data_type}' as data_type, created_time, updated_time from ods_merchantwords_search_term_{data_type} where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info}'; """ df = self.spark.sql(sqlQuery=sql) df = df.repartition(80) self.df_search_term_type = self.df_search_term_type.unionByName(df, allowMissingColumns=True) self.df_search_term_type = self.df_search_term_type.persist(StorageLevel.MEMORY_AND_DISK) self.df_search_term_type.show(10, truncate=True) print("4.读取ods_self_asin,得到公司内部产品") sql = f""" select asin, 1 as is_self_asin from ods_self_asin where site_name = '{self.site_name}' group by asin; """ df_self_asin = self.spark.sql(sqlQuery=sql) self.df_self_asin = F.broadcast(df_self_asin) self.df_self_asin.show(10, truncate=True) print("5.读取ods_merchantwords_asin_detail,得到asin数据") sql = f""" select asin, title, img, price, rating, reviews, updated_time from ods_merchantwords_asin_detail where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info}'; """ self.df_asin_detail = self.spark.sql(sqlQuery=sql) self.df_asin_detail = self.df_asin_detail.repartition(80).persist(StorageLevel.MEMORY_ONLY) self.df_asin_detail.show(10, truncate=True) print("6.读取ods_merchantwords_other_search_term_data,得到asin月销") sql = f""" select search_term as keyword, asin, buy_data, page, created_time, updated_time from ods_merchantwords_other_search_term_data where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info}'; """ self.df_asin_buy_data = self.spark.sql(sqlQuery=sql) self.df_asin_buy_data = self.df_asin_buy_data.repartition(80).persist(StorageLevel.MEMORY_ONLY) self.df_asin_buy_data.show(10, truncate=True) def handle_data(self): # 处理产品总数 self.handle_products_num() # df_search_term_type去重处理 self.handle_search_term_asin_type() # 处理asin维度下类型词count self.df_asin_count = self.handle_st_asin_counts(cal_type="asin") # 处理st维度下asin_count + 内部asin_count self.df_st_count = self.handle_st_asin_counts(cal_type="st") # 处理ao值和zr流量占比 self.handle_ao_and_zr_flow_proportion() # 处理月销 self.handle_monthly_sales() # 处理asin_detail:价格、rating等 self.handle_asin_detail() # 保存前字段处理 self.handle_save() def handle_products_num(self): print("处理产品总数:") # 1.去重处理 products_num_window = Window.partitionBy('keyword').orderBy( F.desc_nulls_last('updated_time') ) self.df_products_num = self.df_products_num.withColumn( "u_rank", F.row_number().over(window=products_num_window) ) self.df_products_num = self.df_products_num.filter('u_rank=1').drop('u_rank', 'updated_time') # 过滤出asin_total_num大于0的词 self.df_products_num = self.df_products_num.filter('asin_total_num > 0') # 2.关联回df_save self.df_save = self.df_merchantwords_detail.join( self.df_products_num, on=['keyword'], how='inner' ).persist(StorageLevel.MEMORY_ONLY) # 3.释放资源 self.df_merchantwords_detail.unpersist() self.df_products_num.unpersist() def handle_search_term_asin_type(self): print("df_search_term_type去重处理:") # 1.去重处理,防止爬虫重复抓取 st_asin_window = Window.partitionBy(['data_type', 'keyword', 'page']).orderBy( F.desc_nulls_last('created_time'), F.desc_nulls_last('updated_time') ) self.df_search_term_type = self.df_search_term_type.withColumn( "u_rank", F.rank().over(window=st_asin_window) ) self.df_search_term_type = self.df_search_term_type.filter('u_rank=1')\ .drop('u_rank', 'created_time', 'updated_time') def handle_st_asin_counts(self, cal_type="asin"): print(f"计算{cal_type}_counts") cal_type_complete = "keyword" if cal_type == "st" else cal_type self.df_search_term_type = self.df_search_term_type.withColumn( f"{cal_type}_data_type", F.concat(F.lit(f"{cal_type}_"), self.df_search_term_type.data_type, F.lit(f"_counts")) ) df = self.df_search_term_type.groupby([f'{cal_type_complete}'])\ .pivot(f"{cal_type}_data_type").count() df = df.fillna(0) df = df.withColumn( f"{cal_type}_sb_counts", df[f"{cal_type}_sb1_counts"] + df[f"{cal_type}_sb2_counts"] + df[f"{cal_type}_sb3_counts"] ) df = df.withColumn( f"{cal_type}_adv_counts", df[f"{cal_type}_sb_counts"] + df[f"{cal_type}_sp_counts"] ) df = df.repartition(80) if cal_type == "asin": df_asin_agg = self.df_search_term_type.groupby(['asin']).agg( F.count('keyword').alias("asin_st_counts") ) df = df.join( df_asin_agg, on=['asin'], how='left' ) if cal_type == "st": # 计算asin数量、内部asin数量及占比 df_st_agg = self.df_search_term_type\ .select("keyword", "asin")\ .join(self.df_self_asin, on=['asin'], how='left')\ .withColumn("is_self_asin", F.when(F.col("is_self_asin").isNotNull(), F.lit(1)).otherwise(F.lit(0)))\ .groupby(['keyword'])\ .agg(F.sum('is_self_asin').alias("self_asin_num"), F.count('asin').alias("asin_num")) df = df.join( df_st_agg, on=['keyword'], how='left' ).withColumn( "self_asin_proportion", F.round(F.col('self_asin_num')/F.col('asin_num'), 4) ) df = df.persist(StorageLevel.MEMORY_AND_DISK) return df def handle_ao_and_zr_flow_proportion(self): print("计算ao+zr流量占比:") # 1.计算asin的ao值和zr流量占比 self.df_asin_count = self.df_asin_count.withColumn( "asin_ao_val", F.round(self.df_asin_count.asin_adv_counts / self.df_asin_count.asin_zr_counts, 3) ).withColumn( "asin_zr_flow_proportion", F.when(F.col("asin_st_counts").isNotNull(), F.round(F.col("asin_zr_counts") / F.col("asin_st_counts"), 3)) ) # 2.计算st的ao值和zr流量占比--首页zr位asin的平均值 df_asin_ao_and_zr_flow_proportion = self.df_asin_count.select("asin", "asin_ao_val", "asin_zr_flow_proportion") df_st_ao_and_zr_flow_proportion = self.df_search_term_type.filter("data_type='zr' and page=1").select("keyword", "asin").join( df_asin_ao_and_zr_flow_proportion, on=['asin'], how='left' ) df_st_ao = df_st_ao_and_zr_flow_proportion\ .filter("asin_ao_val is not null")\ .groupby(["keyword"])\ .agg(F.round(F.mean("asin_ao_val"), 3).alias("st_ao_val")) df_st_ao = df_st_ao.repartition(80) df_st_zr_flow_proportion = df_st_ao_and_zr_flow_proportion\ .filter("asin_zr_flow_proportion is not null")\ .groupby(["keyword"])\ .agg(F.round(F.mean("asin_zr_flow_proportion"), 3).alias("st_zr_flow_proportion")) df_st_zr_flow_proportion = df_st_zr_flow_proportion.repartition(80) self.df_st_count = self.df_st_count.join( df_st_ao, on=['keyword'], how='left' ).join( df_st_zr_flow_proportion, on=['keyword'], how='left' ) # 3.关联回df_save self.df_save = self.df_save.join( self.df_st_count, on=['keyword'], how='left' ).persist(StorageLevel.MEMORY_ONLY) # 4.释放资源 self.df_asin_count.unpersist() self.df_st_count.unpersist() def handle_monthly_sales(self): print("计算st月销:") # 去重处理 asin_buy_data_window = Window.partitionBy(['keyword', 'page']).orderBy( F.desc_nulls_last('created_time'), F.desc_nulls_last('updated_time') ) self.df_asin_buy_data = self.df_asin_buy_data.withColumn( "u_rank", F.rank().over(window=asin_buy_data_window) ) self.df_asin_buy_data = self.df_asin_buy_data.filter('u_rank=1')\ .drop('u_rank', 'created_time', 'updated_time', 'page') # 计算月销 self.df_st_buy_data = self.df_asin_buy_data.withColumn( 'asin_monthly_sales', self.u_parse_amazon_orders('buy_data') ).groupby(['keyword']).agg( F.sum('asin_monthly_sales').alias("st_monthly_sales") ) # 关联回df_save self.df_save = self.df_save.join( self.df_st_buy_data, on=['keyword'], how='left' ).persist(StorageLevel.MEMORY_ONLY) # 释放资源 self.df_asin_buy_data.unpersist() def handle_asin_detail(self): print("处理asin_detail:") # 1.去重取最新记录 asin_detail_window = Window.partitionBy('asin').orderBy( F.desc_nulls_last('updated_time') ) self.df_asin_detail = self.df_asin_detail.withColumn( "u_rank", F.row_number().over(window=asin_detail_window) ) self.df_asin_detail = self.df_asin_detail.filter('u_rank=1').drop('u_rank', 'updated_time') # 2.字段清洗解析 self.df_asin_detail = self.df_asin_detail.withColumn( 'site', F.lit(self.site_name) ).withColumn( 'price', self.u_parse_asin_price('price') ).withColumn( 'rating', self.u_parse_asin_rating('site', 'rating') ).withColumn( 'reviews', self.u_parse_asin_reviews('reviews') ).withColumn( 'package_quantity', F.when( F.col('title').isNotNull(), self.u_get_package_quantity('title') ).otherwise(1) ) df_st_asin_detail = self.df_search_term_type.select("keyword", "asin").join( self.df_asin_detail, on=['asin'], how='left' ) self.df_st_detail = df_st_asin_detail.groupby(['keyword']).agg( F.round(F.avg('rating'), 2).alias("rating_avg"), F.round(F.avg('price'), 2).alias("price_avg"), F.round(F.avg('reviews'), 0).alias("reviews_avg"), F.round(F.count(F.col('package_quantity') > 1)/F.count('asin'), 4).alias("package_quantity"), ) # 3.关联回df_save self.df_save = self.df_save.join( self.df_st_detail, on=['keyword'], how='left' ).persist(StorageLevel.MEMORY_ONLY) # 4.释放资源 self.df_asin_detail.unpersist() self.df_search_term_type.unpersist() def handle_save(self): # 存储前补充字段 self.df_save = self.df_save.withColumn( 'listing_sales_avg', F.round(F.col("st_monthly_sales")/F.col("asin_num"), 0) ).withColumn( 'site_name', F.lit(self.site_name) ).withColumn( 'batch', F.lit(self.batch) ) # 空值处理 self.df_save = self.df_save.na.fill({ "st_ao_val": -1, "st_zr_flow_proportion": -1, "asin_total_num": -1, "asin_num": -1, "self_asin_num": -1, "self_asin_proportion": -1, "st_sp_counts": -1, "st_zr_counts": -1, "st_monthly_sales": -1, "listing_sales_avg": -1, "reviews_avg": -1, "rating_avg": -1, "price_avg": -1, "package_quantity": -1 }) self.df_save = self.df_save.select( "keyword", "lang", "st_ao_val", "st_zr_flow_proportion", "volume", "avg_3m", "avg_12m", "asin_total_num", "asin_num", "self_asin_num", "self_asin_proportion", "st_sp_counts", "st_zr_counts", "st_monthly_sales", "listing_sales_avg", "reviews_avg", "rating_avg", "price_avg", "depth", "results_count", "sponsored_ads_count", "page_1_reviews", "appearance", "last_seen", "update_time", "last_batch", "package_quantity", "site_name", "batch" ) if __name__ == '__main__': site_name = sys.argv[1] date_type = sys.argv[2] date_info = sys.argv[3] batch = sys.argv[4] handle_obj = DwdMerchantwordsMeasure(site_name=site_name, date_type=date_type, date_info=date_info, batch=batch) handle_obj.run()