Commit a2c1c284 by chenyuanjie

fix

parent f11c3848
...@@ -30,7 +30,6 @@ class KafkaFlowAsinDetail(Templates): ...@@ -30,7 +30,6 @@ class KafkaFlowAsinDetail(Templates):
self.date_info = date_info self.date_info = date_info
self.consumer_type = consumer_type # 消费实时还是消费历史 self.consumer_type = consumer_type # 消费实时还是消费历史
self.test_flag = test_flag # 正式环境跟测试环境 self.test_flag = test_flag # 正式环境跟测试环境
self.year = str(self.date_info).split('-')[0]
self.year_month = str(self.date_info).replace("-", "_") self.year_month = str(self.date_info).replace("-", "_")
self.repartition_num = 80 self.repartition_num = 80
# kafka相关参数 # kafka相关参数
...@@ -47,9 +46,9 @@ class KafkaFlowAsinDetail(Templates): ...@@ -47,9 +46,9 @@ class KafkaFlowAsinDetail(Templates):
self.asin_latest_detail_table = f"{self.site_name}_asin_latest_detail" self.asin_latest_detail_table = f"{self.site_name}_asin_latest_detail"
# elasticsearch相关参数 # elasticsearch相关参数
self.client = EsUtils.get_es_client() self.client = EsUtils.get_es_client()
self.es_index_name = f"{self.topic_name}_test" if self.test_flag == 'test' else f"{self.topic_name}" self.es_index_name = f"{self.site_name}_flow_asin_30day_test" if self.test_flag == 'test' else f"{self.site_name}_flow_asin_30day"
self.es_index_alias_name = f"{self.site_name}_st_detail_last_4_week_test" if self.test_flag == 'test' else f"{self.site_name}_st_detail_last_4_week" # self.es_index_alias_name = f"{self.site_name}_st_detail_last_4_week_test" if self.test_flag == 'test' else f"{self.site_name}_st_detail_last_4_week"
self.es_index_body = EsUtils.get_es_body() self.es_index_body = EsUtils.get_es_30day_body()
# 富集策略相关配置,用于更新 usr_mask_type 字段 # 富集策略相关配置,用于更新 usr_mask_type 字段
self.policy_name1 = "user_mask_asin_policy" self.policy_name1 = "user_mask_asin_policy"
self.policy_name2 = "user_mask_category_policy" self.policy_name2 = "user_mask_category_policy"
...@@ -60,9 +59,9 @@ class KafkaFlowAsinDetail(Templates): ...@@ -60,9 +59,9 @@ class KafkaFlowAsinDetail(Templates):
print(f"任务名称:{self.app_name}") print(f"任务名称:{self.app_name}")
# Spark实时消费相关参数 # Spark实时消费相关参数
self.spark = SparkUtil.get_stream_spark(app_name=self.app_name) self.spark = SparkUtil.get_stream_spark(app_name=self.app_name)
self.check_path = f"/tmp/wangrui/{self.topic_name}_{self.consumer_type}_test" if self.test_flag == 'test' else f"/tmp/wangrui/{self.topic_name}_{self.consumer_type}" self.check_path = f"/tmp/chenyuanjie/{self.topic_name}_{self.consumer_type}_test" if self.test_flag == 'test' else f"/tmp/chenyuanjie/{self.topic_name}_{self.consumer_type}"
self.previous_date = self.get_previous_date(self) # self.previous_date = self.get_previous_date(self)
self.previous_two_date = self.get_previous_two_date(self) # self.previous_two_date = self.get_previous_two_date(self)
self.launch_time_interval_dict = self.get_launch_time_interval_dict() self.launch_time_interval_dict = self.get_launch_time_interval_dict()
print("日期字典:", self.launch_time_interval_dict) print("日期字典:", self.launch_time_interval_dict)
self.initial_batch_id = self.get_initial_batch_id(self) self.initial_batch_id = self.get_initial_batch_id(self)
...@@ -100,6 +99,7 @@ class KafkaFlowAsinDetail(Templates): ...@@ -100,6 +99,7 @@ class KafkaFlowAsinDetail(Templates):
self.df_user_package_num = self.spark.sql("select 1+1;") self.df_user_package_num = self.spark.sql("select 1+1;")
self.df_asin_category = self.spark.sql("select 1+1;") self.df_asin_category = self.spark.sql("select 1+1;")
self.df_max_bought_month_info_update = self.spark.sql("select 1+1;") self.df_max_bought_month_info_update = self.spark.sql("select 1+1;")
self.df_asin_source_flag = self.spark.sql("select 1+1;")
# udf函数注册 # udf函数注册
package_schema = StructType([ package_schema = StructType([
StructField("parse_package_quantity", IntegerType(), True), StructField("parse_package_quantity", IntegerType(), True),
...@@ -188,27 +188,27 @@ class KafkaFlowAsinDetail(Templates): ...@@ -188,27 +188,27 @@ class KafkaFlowAsinDetail(Templates):
]) ])
return schema return schema
@staticmethod # @staticmethod
def get_previous_date(self): # def get_previous_date(self):
self.df_date = self.spark.sql(f"select * from dim_date_20_to_30") # self.df_date = self.spark.sql(f"select * from dim_date_20_to_30")
df = self.df_date.toPandas() # df = self.df_date.toPandas()
df_loc = df.loc[(df.year_month == f'{self.date_info}') & (df.day == 1)] # df_loc = df.loc[(df.year_month == f'{self.date_info}') & (df.day == 1)]
cur_month_id = int(list(df_loc.id)[0]) # cur_month_id = int(list(df_loc.id)[0])
previous_date_id = cur_month_id - 1 # previous_date_id = cur_month_id - 1
df_loc = df.loc[df.id == previous_date_id] # df_loc = df.loc[df.id == previous_date_id]
previous_date = str(list(df_loc.year_month)[0]) # previous_date = str(list(df_loc.year_month)[0])
return previous_date # return previous_date
@staticmethod # @staticmethod
def get_previous_two_date(self): # def get_previous_two_date(self):
self.df_date = self.spark.sql(f"select * from dim_date_20_to_30") # self.df_date = self.spark.sql(f"select * from dim_date_20_to_30")
df = self.df_date.toPandas() # df = self.df_date.toPandas()
df_loc = df.loc[(df.year_month == f'{self.date_info}') & (df.day == 1)] # df_loc = df.loc[(df.year_month == f'{self.date_info}') & (df.day == 1)]
cur_month_id = int(list(df_loc.id)[0]) # cur_month_id = int(list(df_loc.id)[0])
previous_two_date_id = cur_month_id - 40 # previous_two_date_id = cur_month_id - 40
df_loc = df.loc[df.id == previous_two_date_id] # df_loc = df.loc[df.id == previous_two_date_id]
prvious_two_date = str(list(df_loc.year_month)[0]) # prvious_two_date = str(list(df_loc.year_month)[0])
return prvious_two_date # return prvious_two_date
@staticmethod @staticmethod
def get_launch_time_interval_dict(): def get_launch_time_interval_dict():
...@@ -352,6 +352,8 @@ class KafkaFlowAsinDetail(Templates): ...@@ -352,6 +352,8 @@ class KafkaFlowAsinDetail(Templates):
WHEN asin_length > 0 AND asin_length <= 150 AND asin_length + asin_length + (asin_width + asin_height) <= 300 THEN 6 WHEN asin_length > 0 AND asin_length <= 150 AND asin_length + asin_length + (asin_width + asin_height) <= 300 THEN 6
WHEN asin_length > 150 AND asin_length + asin_length + (asin_width + asin_height) > 300 THEN 7 ELSE 0 END""" WHEN asin_length > 150 AND asin_length + asin_length + (asin_width + asin_height) > 300 THEN 7 ELSE 0 END"""
df = df.withColumn("size_type", F.expr(expr_str)).drop("asin_length", "asin_width", "asin_height") df = df.withColumn("size_type", F.expr(expr_str)).drop("asin_length", "asin_width", "asin_height")
# 6.处理五点描述长度
df = df.withColumn("describe_len", F.length(F.col("describe").replace("|-|", "")))
return df return df
# 7. 处理asin图片信息 # 7. 处理asin图片信息
...@@ -586,7 +588,16 @@ class KafkaFlowAsinDetail(Templates): ...@@ -586,7 +588,16 @@ class KafkaFlowAsinDetail(Templates):
df = df.drop(previous_col) df = df.drop(previous_col)
return df return df
# 13. 字段标准化 # 13. 处理不同来源asin
def handle_asin_different_source(self, df):
df = df.join(self.df_asin_source_flag, on=['asin'], how='left').fillna({"asin_source_flag": "0"}).withColumn(
"asin_source_flag", F.split(F.col("asin_source_flag"), ",")
).withColumn(
"asin_source_flag", F.expr("transform(asin_source_flag, x -> cast(x as int))")
)
return df
# 14. 字段标准化
def handle_column_name(self, df): def handle_column_name(self, df):
df = df.withColumnRenamed("asin_bs_cate_1_id", "category_first_id")\ df = df.withColumnRenamed("asin_bs_cate_1_id", "category_first_id")\
.withColumnRenamed("asin_bs_cate_current_id", "category_id") \ .withColumnRenamed("asin_bs_cate_current_id", "category_id") \
...@@ -597,7 +608,9 @@ class KafkaFlowAsinDetail(Templates): ...@@ -597,7 +608,9 @@ class KafkaFlowAsinDetail(Templates):
.withColumnRenamed("asinUpdateTime", "asin_crawl_date")\ .withColumnRenamed("asinUpdateTime", "asin_crawl_date")\
.withColumnRenamed("customer_reviews_json", "product_features")\ .withColumnRenamed("customer_reviews_json", "product_features")\
.withColumn("collapse_asin", F.coalesce(F.col("parent_asin"), F.col("asin")))\ .withColumn("collapse_asin", F.coalesce(F.col("parent_asin"), F.col("asin")))\
.withColumn("bsr_best_orders_type", F.lit(-1)) .withColumn("bsr_best_orders_type", F.lit(-1))\
.withColumn("img_type", F.split(F.col("img_type"), ","))\
.withColumn("img_type", F.expr("transform(img_type, x -> cast(x as int))"))
df_save = df.select("asin", "ao_val", "zr_counts", "sp_counts", "sb_counts", "vi_counts", "bs_counts", "ac_counts", df_save = df.select("asin", "ao_val", "zr_counts", "sp_counts", "sb_counts", "vi_counts", "bs_counts", "ac_counts",
"tr_counts", "er_counts", "bsr_orders", "bsr_orders_sale", "title", "title_len", "price", "tr_counts", "er_counts", "bsr_orders", "bsr_orders_sale", "title", "title_len", "price",
"rating", "total_comments", "buy_box_seller_type", "page_inventory", "volume", "weight", "color", "rating", "total_comments", "buy_box_seller_type", "page_inventory", "volume", "weight", "color",
...@@ -616,7 +629,9 @@ class KafkaFlowAsinDetail(Templates): ...@@ -616,7 +629,9 @@ class KafkaFlowAsinDetail(Templates):
"asin_lob_info", "is_contains_lob_info", "is_package_quantity_abnormal", "category", "asin_lob_info", "is_contains_lob_info", "is_package_quantity_abnormal", "category",
"zr_flow_proportion", "matrix_flow_proportion", "matrix_ao_val", "product_features", "img_info", "zr_flow_proportion", "matrix_flow_proportion", "matrix_ao_val", "product_features", "img_info",
"collapse_asin", F.col("follow_sellers").alias("follow_sellers_count"), "seller_json", "collapse_asin", F.col("follow_sellers").alias("follow_sellers_count"), "seller_json",
F.col("describe").alias("asin_describe"), F.round("fbm_delivery_price", 2).alias("fbm_price")) F.col("describe").alias("asin_describe"), F.round("fbm_delivery_price", 2).alias("fbm_price"),
"asin_source_flag", "bsr_last_seen_at", "bsr_seen_count_30d", "nsr_last_seen_at", "nsr_seen_count_30d",
"describe_len")
df_save = df_save.na.fill( df_save = df_save.na.fill(
{"zr_counts": 0, "sp_counts": 0, "sb_counts": 0, "vi_counts": 0, "bs_counts": 0, "ac_counts": 0, {"zr_counts": 0, "sp_counts": 0, "sb_counts": 0, "vi_counts": 0, "bs_counts": 0, "ac_counts": 0,
"tr_counts": 0, "er_counts": 0, "title_len": 0, "total_comments": 0, "variation_num": 0, "img_num": 0, "tr_counts": 0, "er_counts": 0, "title_len": 0, "total_comments": 0, "variation_num": 0, "img_num": 0,
...@@ -624,7 +639,9 @@ class KafkaFlowAsinDetail(Templates): ...@@ -624,7 +639,9 @@ class KafkaFlowAsinDetail(Templates):
"one_star": 0, "two_star": 0, "three_star": 0, "four_star": 0, "five_star": 0, "low_star": 0, "one_star": 0, "two_star": 0, "three_star": 0, "four_star": 0, "five_star": 0, "low_star": 0,
"size_type": 0, "rating_type": 0, "site_name_type": 0, "weight_type": 0, "launch_time_type": 0, "size_type": 0, "rating_type": 0, "site_name_type": 0, "weight_type": 0, "launch_time_type": 0,
"ao_val_type": 0, "rank_type": 0, "price_type": 0, "quantity_variation_type": 0, "package_quantity": 1, "ao_val_type": 0, "rank_type": 0, "price_type": 0, "quantity_variation_type": 0, "package_quantity": 1,
"is_movie_label": 0, "is_brand_label": 0, "is_alarm_brand": 0, "asin_lqs_rating": 0.0, "follow_sellers_count": -1} "is_movie_label": 0, "is_brand_label": 0, "is_alarm_brand": 0, "asin_lqs_rating": 0.0, "follow_sellers_count": -1,
"bsr_last_seen_at": "1970-01-01", "bsr_seen_count_30d": 0, "nsr_last_seen_at": "1970-01-01", "nsr_seen_count_30d": 0,
"describe_len": 0}
) )
print("asin的标准信息:") print("asin的标准信息:")
df_save.show(10, truncate=False) df_save.show(10, truncate=False)
...@@ -637,30 +654,16 @@ class KafkaFlowAsinDetail(Templates): ...@@ -637,30 +654,16 @@ class KafkaFlowAsinDetail(Templates):
variation_num as previous_asin_variation_num, asin_rating as previous_asin_rating, variation_num as previous_asin_variation_num, asin_rating as previous_asin_rating,
asin_total_comments as previous_asin_total_comments, first_category_rank as previous_first_category_rank, asin_total_comments as previous_asin_total_comments, first_category_rank as previous_first_category_rank,
bsr_orders as previous_asin_bsr_orders, sales as previous_sales bsr_orders as previous_asin_bsr_orders, sales as previous_sales
from dwt_flow_asin where site_name = '{self.site_name}' and date_type = '{self.date_type}' from dwt_flow_asin where site_name = '{self.site_name}' and date_type = '30day'
and date_info = '{self.previous_date}'
""" """
print("sql=", sql) print("sql=", sql)
self.df_previous_flow_asin = self.spark.sql(sqlQuery=sql) self.df_previous_flow_asin = self.spark.sql(sqlQuery=sql)
if self.df_previous_flow_asin.count() <= 1:
print("该历史节点数据不全,调整到上上个月")
sql = f"""
select asin, first_category_rank as previous_first_category_rank,
round(asin_ao_val, 3) as previous_asin_ao_val, asin_price as previous_asin_price,
bsr_orders as previous_bsr_orders, asin_rating as previous_asin_rating,
asin_total_comments as previous_asin_total_comments, sales as previous_sales,
variation_num as previous_variation_num
from dwt_flow_asin where site_name = '{self.site_name}' and date_type = '{self.date_type}'
and date_info = '{self.previous_two_date}'
"""
print("sql=", sql)
self.df_previous_flow_asin = self.spark.sql(sqlQuery=sql)
self.df_previous_flow_asin = self.df_previous_flow_asin.repartition(self.repartition_num).persist(StorageLevel.DISK_ONLY) self.df_previous_flow_asin = self.df_previous_flow_asin.repartition(self.repartition_num).persist(StorageLevel.DISK_ONLY)
self.df_previous_flow_asin.show(10, truncate=False) self.df_previous_flow_asin.show(10, truncate=False)
print("2. 获取卖家相关信息") print("2. 获取卖家相关信息")
sql = f""" sql = f"""
select fd_unique as seller_id, upper(fd_country_name) as seller_country_name from dim_fd_asin_info select fd_unique as seller_id, upper(fd_country_name) as seller_country_name from dim_fd_asin_info_30day
where site_name='{self.site_name}' and fd_unique is not null group by fd_unique, fd_country_name""" where site_name='{self.site_name}' and date_type = '30day' and fd_unique is not null group by fd_unique, fd_country_name"""
print("sql=", sql) print("sql=", sql)
self.df_seller_info = self.spark.sql(sqlQuery=sql) self.df_seller_info = self.spark.sql(sqlQuery=sql)
self.df_seller_info = self.df_seller_info.repartition(self.repartition_num).persist(StorageLevel.DISK_ONLY) self.df_seller_info = self.df_seller_info.repartition(self.repartition_num).persist(StorageLevel.DISK_ONLY)
...@@ -711,22 +714,10 @@ class KafkaFlowAsinDetail(Templates): ...@@ -711,22 +714,10 @@ class KafkaFlowAsinDetail(Templates):
sql = f""" sql = f"""
select asin, label from select asin, label from
(select asin, lower(label) as label, created_time,row_number() over(partition by asin,label order by updated_time desc) as crank (select asin, lower(label) as label, created_time,row_number() over(partition by asin,label order by updated_time desc) as crank
from ods_other_search_term_data where site_name='{self.site_name}' and date_type='{self.date_type}' and from ods_other_search_term_data where site_name='{self.site_name}' and date_type='30day' and trim(label) not in ('null','') and label is not null) t where t.crank=1
date_info='{self.date_info}' and trim(label) not in ('null','') and label is not null) t where t.crank=1
""" """
print("sql=", sql) print("sql=", sql)
self.df_asin_label_info = self.spark.sql(sqlQuery=sql) self.df_asin_label_info = self.spark.sql(sqlQuery=sql)
if self.df_asin_label_info.count() <= 1:
print("该历史节点数据不全,调整到上上个月")
sql = f"""
select asin, label from
(select asin, lower(label) as label, created_time,row_number()
over(partition by asin,label order by updated_time desc) as crank
from ods_other_search_term_data where site_name='{self.site_name}' and date_type='{self.date_type}' and
date_info='{self.previous_date}' and trim(label) not in ('null','') and label is not null) t where t.crank=1
"""
print("sql=", sql)
self.df_asin_label_info = self.spark.sql(sqlQuery=sql)
self.df_asin_label_info = self.df_asin_label_info.groupby(['asin']).agg( self.df_asin_label_info = self.df_asin_label_info.groupby(['asin']).agg(
F.collect_set("label").alias("asin_label_list")) F.collect_set("label").alias("asin_label_list"))
self.df_asin_label_info = self.df_asin_label_info.repartition(self.repartition_num).persist(StorageLevel.DISK_ONLY) self.df_asin_label_info = self.df_asin_label_info.repartition(self.repartition_num).persist(StorageLevel.DISK_ONLY)
...@@ -737,19 +728,15 @@ class KafkaFlowAsinDetail(Templates): ...@@ -737,19 +728,15 @@ class KafkaFlowAsinDetail(Templates):
asin_bs_counts as bs_counts, asin_ac_counts as ac_counts, asin_tr_counts as tr_counts, asin_er_counts as er_counts, asin_bs_counts as bs_counts, asin_ac_counts as ac_counts, asin_tr_counts as tr_counts, asin_er_counts as er_counts,
asin_st_counts, asin_zr_counts, asin_adv_counts, round(asin_zr_flow_proportion, 3) as asin_zr_flow_proportion, asin_st_counts, asin_zr_counts, asin_adv_counts, round(asin_zr_flow_proportion, 3) as asin_zr_flow_proportion,
round(asin_ao_val, 3) as asin_ao_val, asin_amazon_orders round(asin_ao_val, 3) as asin_ao_val, asin_amazon_orders
from dwd_asin_measure where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}' from dwd_asin_measure where site_name='{self.site_name}' and date_type='30day'
""" """
print("sql=", sql) print("sql=", sql)
self.df_asin_measure = self.spark.sql(sqlQuery=sql) self.df_asin_measure = self.spark.sql(sqlQuery=sql)
self.df_asin_measure = self.df_asin_measure.repartition(self.repartition_num).persist(StorageLevel.DISK_ONLY) self.df_asin_measure = self.df_asin_measure.repartition(self.repartition_num).persist(StorageLevel.DISK_ONLY)
self.df_asin_measure.show(10, truncate=False) self.df_asin_measure.show(10, truncate=False)
print("8. 读取one_category_report表") print("8. 读取one_category_report表")
if int(self.year) == 2022 and int(self.month) < 3: sql = f"select category_id as asin_bs_cate_1_id, rank as asin_bs_cate_1_rank, orders as bsr_orders from ods_one_category_report " \
sql = f"select category_id as asin_bs_cate_1_id, rank as asin_bs_cate_1_rank, orders as bsr_orders from ods_one_category_report " \ f"where site_name='{self.site_name}' and date_type='30day'"
f"where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='2022-12'"
else:
sql = f"select category_id as asin_bs_cate_1_id, rank as asin_bs_cate_1_rank, orders as bsr_orders from ods_one_category_report " \
f"where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'"
print("sql=", sql) print("sql=", sql)
self.df_bs_report = self.spark.sql(sqlQuery=sql) self.df_bs_report = self.spark.sql(sqlQuery=sql)
self.df_bs_report = self.df_bs_report.repartition(self.repartition_num).persist(StorageLevel.DISK_ONLY) self.df_bs_report = self.df_bs_report.repartition(self.repartition_num).persist(StorageLevel.DISK_ONLY)
...@@ -759,7 +746,7 @@ class KafkaFlowAsinDetail(Templates): ...@@ -759,7 +746,7 @@ class KafkaFlowAsinDetail(Templates):
select asin, new_launch_time from select asin, new_launch_time from
(select asin, launch_time as new_launch_time, (select asin, launch_time as new_launch_time,
row_number() over(partition by asin order by updated_at desc) as trank row_number() over(partition by asin order by updated_at desc) as trank
from ods_asin_keep_date where site_name='{self.site_name}' and state=3) t where t.trank=1 from ods_asin_keep_date_30day where site_name='{self.site_name}' and date_type='30day' and state=3) t where t.trank=1
""" """
print("sql=", sql) print("sql=", sql)
self.df_asin_keep_date = self.spark.sql(sqlQuery=sql) self.df_asin_keep_date = self.spark.sql(sqlQuery=sql)
...@@ -799,6 +786,16 @@ class KafkaFlowAsinDetail(Templates): ...@@ -799,6 +786,16 @@ class KafkaFlowAsinDetail(Templates):
).repartition(self.repartition_num).persist(StorageLevel.DISK_ONLY) ).repartition(self.repartition_num).persist(StorageLevel.DISK_ONLY)
self.df_asin_category.show(10, truncate=False) self.df_asin_category.show(10, truncate=False)
print("15. 获取asin不同来源标识")
sql = f"""
select asin, asin_cate_flag as asin_source_flag, bsr_latest_date as bsr_last_seen_at, bsr_30day_count as bsr_seen_count_30d,
nsr_latest_date as nsr_last_seen_at, nsr_30day_count as nsr_seen_count_30d from dwd_asin_cate_flag
where site_name='{self.site_name}' and date_type='30day'
"""
self.df_asin_source_flag = self.spark.sql(sqlQuery=sql)
self.df_asin_source_flag = self.df_asin_source_flag.repartition(self.repartition_num).persist(StorageLevel.DISK_ONLY)
self.df_asin_source_flag.show(10, truncate=False)
# 字段处理逻辑综合 # 字段处理逻辑综合
def handle_all_field(self, df): def handle_all_field(self, df):
# 1. 处理asin分类及排名以及排名类型字段 # 1. 处理asin分类及排名以及排名类型字段
...@@ -825,7 +822,9 @@ class KafkaFlowAsinDetail(Templates): ...@@ -825,7 +822,9 @@ class KafkaFlowAsinDetail(Templates):
df = self.handle_asin_detail_all_type(df) df = self.handle_asin_detail_all_type(df)
# 12. 处理变化率相关字段 # 12. 处理变化率相关字段
df = self.handle_asin_attribute_change(df) df = self.handle_asin_attribute_change(df)
# 13. 字段标准化 # 13. 处理不同来源asin
df = self.handle_asin_different_source(df)
# 14. 字段标准化
df_save = self.handle_column_name(df) df_save = self.handle_column_name(df)
return df_save return df_save
...@@ -838,15 +837,15 @@ class KafkaFlowAsinDetail(Templates): ...@@ -838,15 +837,15 @@ class KafkaFlowAsinDetail(Templates):
self.client.enrich.execute_policy(name=self.policy_name1) self.client.enrich.execute_policy(name=self.policy_name1)
self.client.enrich.execute_policy(name=self.policy_name2) self.client.enrich.execute_policy(name=self.policy_name2)
# EsUtils.user_enrich_pipeline(self.client, self.pipeline_id, self.policy_name1, self.policy_name2) # EsUtils.user_enrich_pipeline(self.client, self.pipeline_id, self.policy_name1, self.policy_name2)
if not EsUtils.exist_index_alias(self.es_index_alias_name, self.client): # if not EsUtils.exist_index_alias(self.es_index_alias_name, self.client):
EsUtils.create_index_alias(self.es_index_name, self.es_index_alias_name, self.client) # EsUtils.create_index_alias(self.es_index_name, self.es_index_alias_name, self.client)
else: # else:
index_name_list = EsUtils.get_index_names_associated_alias(self.es_index_alias_name, self.client) # index_name_list = EsUtils.get_index_names_associated_alias(self.es_index_alias_name, self.client)
if self.es_index_name not in index_name_list: # if self.es_index_name not in index_name_list:
EsUtils.delete_index_alias(self.es_index_alias_name, self.client) # EsUtils.delete_index_alias(self.es_index_alias_name, self.client)
EsUtils.create_index_alias(self.es_index_name, self.es_index_alias_name, self.client) # EsUtils.create_index_alias(self.es_index_name, self.es_index_alias_name, self.client)
else: # else:
pass # pass
# 写入elasticsearch逻辑 # 写入elasticsearch逻辑
def save_to_es(self, df, batch_num): def save_to_es(self, df, batch_num):
...@@ -867,7 +866,9 @@ class KafkaFlowAsinDetail(Templates): ...@@ -867,7 +866,9 @@ class KafkaFlowAsinDetail(Templates):
F.col("first_category_rank").alias("category_first_rank"), F.col("first_category_rank").alias("category_first_rank"),
F.col("current_category_rank").alias("category_current_rank"), "asin_type", F.col("current_category_rank").alias("category_current_rank"), "asin_type",
"bsr_orders", "bsr_orders_sale", "page_inventory", "asin_bought_month", "seller_json", "bsr_orders", "bsr_orders_sale", "page_inventory", "asin_bought_month", "seller_json",
"buy_box_seller_type", "asin_describe", F.col("fbm_price").alias("asin_fbm_price")) "buy_box_seller_type", "asin_describe", F.col("fbm_price").alias("asin_fbm_price"),
F.col("describe_len").alias("asin_describe_len")
)
df = df.drop("category", "seller_json") df = df.drop("category", "seller_json")
df.write.format("org.elasticsearch.spark.sql").options(**self.es_options).mode("append").save() df.write.format("org.elasticsearch.spark.sql").options(**self.es_options).mode("append").save()
end_time = time.time() end_time = time.time()
...@@ -881,7 +882,7 @@ class KafkaFlowAsinDetail(Templates): ...@@ -881,7 +882,7 @@ class KafkaFlowAsinDetail(Templates):
account_name, account_id, seller_country_name, category_first_id, parent_asin, variation_num, img_info, account_name, account_id, seller_country_name, category_first_id, parent_asin, variation_num, img_info,
asin_crawl_date, asin_price, asin_rating, asin_total_comments, matrix_ao_val, zr_flow_proportion, matrix_flow_proportion, asin_crawl_date, asin_price, asin_rating, asin_total_comments, matrix_ao_val, zr_flow_proportion, matrix_flow_proportion,
date_info, img_url, category_current_id, category_first_rank, category_current_rank, asin_type, bsr_orders, bsr_orders_sale, date_info, img_url, category_current_id, category_first_rank, category_current_rank, asin_type, bsr_orders, bsr_orders_sale,
page_inventory, asin_bought_month, seller_json, buy_box_seller_type, asin_describe, asin_fbm_price""" page_inventory, asin_bought_month, seller_json, buy_box_seller_type, asin_describe, asin_fbm_price, asin_describe_len"""
DorisHelper.spark_export_with_columns(df_save=df_asin_latest_detail, db_name=self.doris_db, table_name=self.asin_latest_detail_table, table_columns=table_columns) DorisHelper.spark_export_with_columns(df_save=df_asin_latest_detail, db_name=self.doris_db, table_name=self.asin_latest_detail_table, table_columns=table_columns)
df_asin_latest_detail.unpersist() df_asin_latest_detail.unpersist()
......
import os
import sys
import time
import traceback
sys.path.append("/opt/module/spark-3.2.0-bin-hadoop3.2/demo/py_demo/")
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from utils.templates import Templates
from pyspark.sql import functions as F
from pyspark.sql.types import *
from utils.db_util import DBUtil
from utils.common_util import CommonUtil
from utils.spark_util import SparkUtil
from datetime import datetime, timedelta
from functools import reduce
from utils.es_util import EsUtils
from pyspark.sql import Window
from pyspark.storagelevel import StorageLevel
from utils.DorisHelper import DorisHelper
from yswg_utils.common_df import get_node_first_id_df, get_first_id_from_category_desc_df
from yswg_utils.common_udf import udf_parse_bs_category, parse_weight_str, udf_extract_volume_dimensions, udf_get_package_quantity_with_flag as udf_get_package_quantity, udf_parse_seller_json
class KafkaFlowAsinDetail(Templates):
def __init__(self, site_name='us', date_type="day", date_info='2022-10-01', consumer_type='latest', test_flag='normal', batch_size=100000):
super().__init__()
self.site_name = site_name
self.date_type = date_type
self.date_info = date_info
self.consumer_type = consumer_type # 消费实时还是消费历史
self.test_flag = test_flag # 正式环境跟测试环境
self.year = str(self.date_info).split('-')[0]
self.year_month = str(self.date_info).replace("-", "_")
self.repartition_num = 80
# kafka相关参数
self.topic_name = f"{self.site_name}_asin_detail_day_2026_02_04"
self.batch_size = batch_size
self.schema = self.init_schema()
self.batch_size_history = 20000
self.processing_time = 900 if self.site_name == 'us' else 600
self.history_batch_id = 0
# doris相关参数
self.doris_db = "test" if self.test_flag == "test" else "selection"
self.max_bought_month_table = f"{self.site_name}_asin_max_bought_month_info"
self.parent_asin_latest_detail_table = f"{self.site_name}_parent_asin_latest_detail"
self.asin_latest_detail_table = f"{self.site_name}_asin_latest_detail"
# elasticsearch相关参数
self.client = EsUtils.get_es_client()
self.es_index_name = f"{self.site_name}_flow_asin_30day_test" if self.test_flag == 'test' else f"{self.site_name}_flow_asin_30day"
self.es_index_body = self.get_es_body()
# 富集策略相关配置,用于更新 usr_mask_type 字段
self.policy_name1 = "user_mask_asin_policy"
self.policy_name2 = "user_mask_category_policy"
self.pipeline_id = f"{self.site_name}_user_mask_and_profit_rate_pipeline"
self.es_options = EsUtils.get_es_options(self.es_index_name, self.pipeline_id)
self.db_save = 'kafka_flow_asin_detail'
self.app_name = self.get_app_name()
print(f"任务名称:{self.app_name}")
# Spark实时消费相关参数
self.spark = SparkUtil.get_stream_spark(app_name=self.app_name)
self.check_path = f"/tmp/wangrui/{self.topic_name}_{self.consumer_type}_test" if self.test_flag == 'test' else f"/tmp/wangrui/{self.topic_name}_{self.consumer_type}"
self.previous_date = self.get_previous_date(self)
self.previous_two_date = self.get_previous_two_date(self)
self.launch_time_interval_dict = self.get_launch_time_interval_dict()
print("日期字典:", self.launch_time_interval_dict)
self.initial_batch_id = self.get_initial_batch_id(self)
print("当前消费的起始批次为: ", self.initial_batch_id)
self.history_batch_id = self.initial_batch_id + 1
# BSR分类解析模板
self.pattern1_dict = {
"us": "See Top 100 in ".lower(),
"uk": "See Top 100 in ".lower(),
"de": "Siehe Top 100 in ".lower(),
"es": "Ver el Top 100 en ".lower(),
"fr": "Voir les 100 premiers en ".lower(),
"it": "Visualizza i Top 100 nella categoria ".lower(),
}
self.pattern_current_dict = {
"us": "#(\d+) ",
"uk": "(\d+) in ",
"de": "(\d+) in ",
"es": "(\d+) en ",
"fr": "(\d+) en ",
"it": "(\d+) in ",
}
# DataFrame初始化
self.df_previous_flow_asin = self.spark.sql("select 1+1;")
self.df_seller_info = self.spark.sql("select 1+1;")
self.df_self_asin_info = self.spark.sql("select 1+1;")
self.df_alarm_brand_info = self.spark.sql("select 1+1;")
self.df_asin_label_info = self.spark.sql("select 1+1;")
self.df_asin_measure = self.spark.sql("select 1+1;")
self.df_bs_report = self.spark.sql("select 1+1;")
self.df_asin_keep_date = self.spark.sql("select 1+1;")
self.df_asin_bsr_end = self.spark.sql("select 1+1;")
self.df_hide_category = self.spark.sql("select 1+1;")
self.df_asin_new_cate = self.spark.sql("select 1+ 1;")
self.df_user_package_num = self.spark.sql("select 1+1;")
self.df_asin_category = self.spark.sql("select 1+1;")
self.df_max_bought_month_info_update = self.spark.sql("select 1+1;")
self.df_asin_source_flag = self.spark.sql("select 1+1;")
# udf函数注册
package_schema = StructType([
StructField("parse_package_quantity", IntegerType(), True),
StructField("is_package_quantity_abnormal", IntegerType(), True),
])
self.u_parse_package_quantity = self.spark.udf.register('u_parse_package_quantity', udf_get_package_quantity, package_schema)
bs_category_schema = StructType([
StructField('asin_bs_cate_1_id', StringType(), True),
StructField('asin_bs_cate_current_id', StringType(), True),
StructField('asin_bs_cate_1_rank', IntegerType(), True),
StructField('asin_bs_cate_current_rank', IntegerType(), True),
])
self.u_parse_bs_category = self.spark.udf.register('u_parse_bs_category', udf_parse_bs_category, bs_category_schema)
weight_schema = StructType([
StructField('weight', FloatType(), True),
StructField('weight_type', StringType(), True)
])
self.u_parse_weight = self.spark.udf.register('u_parse_weight', parse_weight_str, weight_schema)
volume_schema = StructType([
StructField("length", FloatType(), True),
StructField("width", FloatType(), True),
StructField("height", FloatType(), True),
StructField("asin_volume_type", StringType(), True)
])
self.u_parse_volume = self.spark.udf.register('u_parse_volume', udf_extract_volume_dimensions, volume_schema)
seller_schema = StructType([
StructField("buy_box_seller_type", IntegerType(), True),
StructField("account_name", StringType(), True),
StructField("account_id", StringType(), True)
])
self.u_parse_seller_info = self.spark.udf.register('u_parse_seller_info', udf_parse_seller_json, seller_schema)
@staticmethod
def init_schema():
schema = StructType([
StructField("asin", StringType(), True),
StructField("title", StringType(), True),
StructField("img_url", StringType(), True),
StructField("rating", DoubleType(), True),
StructField("total_comments", IntegerType(), True),
StructField("price", FloatType(), True),
StructField("category", StringType(), True),
StructField("launch_time", StringType(), True),
StructField("volume", StringType(), True),
StructField("page_inventory", IntegerType(), True),
StructField("asin_vartion_list", ArrayType(ArrayType(StringType()), True), True),
StructField("title_len", IntegerType(), True),
StructField("img_num", IntegerType(), True),
StructField("img_type", StringType(), True),
StructField("activity_type", StringType(), True),
StructField("one_two_val", StringType(), True),
StructField("three_four_val", StringType(), True),
StructField("five_six_val", StringType(), True),
StructField("eight_val", StringType(), True),
StructField("node_id", StringType(), True),
StructField("five_star", IntegerType(), True),
StructField("four_star", IntegerType(), True),
StructField("three_star", IntegerType(), True),
StructField("two_star", IntegerType(), True),
StructField("one_star", IntegerType(), True),
StructField("low_star", IntegerType(), True),
StructField("together_asin", StringType(), True),
StructField("brand", StringType(), True),
StructField("ac_name", StringType(), True),
StructField("material", StringType(), True),
StructField("data_type", IntegerType(), True),
StructField("weight_str", StringType(), True),
StructField("seller_id", StringType(), True),
StructField("variat_num", IntegerType(), True),
StructField("best_sellers_rank", StringType(), True),
StructField("best_sellers_herf", StringType(), True),
StructField("account_name", StringType(), True),
StructField("parentAsin", StringType(), True),
StructField("asinUpdateTime", StringType(), True),
StructField("all_best_sellers_herf", StringType(), True),
StructField("image_view", IntegerType(), True),
StructField("product_description", StringType(), True),
StructField("describe", StringType(), True),
StructField("buy_sales", StringType(), True),
StructField("lob_asin_json", StringType(), True),
StructField("seller_json", StringType(), True),
StructField("customer_reviews_json", StringType(), True),
StructField("img_list", StringType(), True),
StructField("follow_sellers", IntegerType(), True),
StructField("fbm_delivery_price", FloatType(), True)
])
return schema
@staticmethod
def get_es_body():
return {
"settings": {
"number_of_shards": "3",
"number_of_replicas": "1",
"analysis": {
"filter": {
"en_snowball": {
"type": "snowball",
"language": "English"
},
"en_synonym": {
"type": "synonym_graph",
"synonyms_path": "analysis/synonyms_en.txt",
"updateable": "true"
}
},
"analyzer": {
"en_analyzer": {
"type": "custom",
"tokenizer": "standard",
"filter": [
"lowercase",
"en_snowball"
]
},
"en_search_analyzer": {
"tokenizer": "standard",
"filter": [
"lowercase",
"en_synonym",
"en_snowball"
]
}
},
"normalizer": {
"lowercase_normalizer": {
"type": "custom",
"char_filter": [],
"filter": [
"lowercase"
]
}
}
}
},
"mappings": {
"properties": {
"asin": {
"type": "keyword"
},
"img_url": {
"type": "keyword"
},
"title": {
"type": "text",
"analyzer": "en_analyzer",
"search_analyzer": "en_search_analyzer",
"fields": {
"keyword": {
"ignore_above": 32766,
"type": "keyword",
"normalizer": "lowercase_normalizer"
}
}
},
"title_len": {
"type": "integer"
},
"parent_asin": {
"type": "keyword"
},
"rating": {
"type": "float"
},
"price": {
"type": "float"
},
"total_comments": {
"type": "integer"
},
"buy_box_seller_type": {
"type": "short"
},
"page_inventory": {
"type": "short"
},
"weight": {
"type": "float"
},
"volume": {
"type": "keyword"
},
"ao_val": {
"type": "float"
},
"launch_time": {
"type": "date"
},
"bsr_orders": {
"type": "integer"
},
"bsr_orders_sale": {
"type": "double"
},
"low_star": {
"type": "short"
},
"one_star": {
"type": "short"
},
"two_star": {
"type": "short"
},
"three_star": {
"type": "short"
},
"four_star": {
"type": "short"
},
"five_star": {
"type": "short"
},
"zr_counts": {
"type": "integer"
},
"sp_counts": {
"type": "integer"
},
"sb_counts": {
"type": "integer"
},
"vi_counts": {
"type": "integer"
},
"bs_counts": {
"type": "integer"
},
"ac_counts": {
"type": "integer"
},
"tr_counts": {
"type": "integer"
},
"er_counts": {
"type": "integer"
},
"color": {
"type": "text",
"analyzer": "standard",
"fields": {
"keyword": {
"ignore_above": 256,
"type": "keyword",
"normalizer": "lowercase_normalizer"
}
}
},
"size": {
"type": "text",
"analyzer": "standard",
"fields": {
"keyword": {
"ignore_above": 256,
"type": "keyword",
"normalizer": "lowercase_normalizer"
}
}
},
"style": {
"type": "text",
"analyzer": "standard",
"fields": {
"keyword": {
"ignore_above": 256,
"type": "keyword",
"normalizer": "lowercase_normalizer"
}
}
},
"img_num": {
"type": "short"
},
"img_type": {
"type": "keyword"
},
"activity_type": {
"type": "keyword"
},
"one_two_val": {
"type": "float"
},
"three_four_val": {
"type": "float"
},
"five_six_val": {
"type": "float"
},
"eight_val": {
"type": "float"
},
"together_asin": {
"type": "keyword"
},
"brand": {
"type": "text",
"analyzer": "standard",
"fields": {
"keyword": {
"ignore_above": 256,
"type": "keyword",
"normalizer": "lowercase_normalizer"
}
}
},
"rank_rise": {
"type": "integer"
},
"rank_change": {
"type": "float"
},
"ao_rise": {
"type": "float"
},
"ao_change": {
"type": "float"
},
"price_rise": {
"type": "float"
},
"price_change": {
"type": "float"
},
"rating_rise": {
"type": "float"
},
"rating_change": {
"type": "float"
},
"comments_rise": {
"type": "integer"
},
"comments_change": {
"type": "float"
},
"bsr_orders_rise": {
"type": "integer"
},
"bsr_orders_change": {
"type": "float"
},
"variation_num": {
"type": "integer"
},
"variation_rise": {
"type": "integer"
},
"variation_change": {
"type": "float"
},
"sales_rise": {
"type": "float"
},
"sales_change": {
"type": "float"
},
"account_id": {
"type": "keyword"
},
"account_name": {
"type": "text",
"analyzer": "standard",
"fields": {
"keyword": {
"ignore_above": 256,
"type": "keyword",
"normalizer": "lowercase_normalizer"
}
}
},
"site_name": {
"type": "keyword"
},
"site_name_type": {
"type": "short"
},
"launch_time_type": {
"type": "short"
},
"size_type": {
"type": "short"
},
"rank_type": {
"type": "short"
},
"rating_type": {
"type": "short"
},
"price_type": {
"type": "short"
},
"ao_val_type": {
"type": "short"
},
"weight_type": {
"type": "short"
},
"package_quantity": {
"type": "long"
},
"quantity_variation_type": {
"type": "short"
},
"bsr_type": {
"type": "short"
},
"bsr_best_orders_type": {
"type": "short"
},
"is_movie_label": {
"type": "short"
},
"is_brand_label": {
"type": "short"
},
"is_alarm_brand": {
"type": "short"
},
"asin_type": {
"type": "short"
},
"material": {
"type": "text",
"analyzer": "standard",
"fields": {
"keyword": {
"ignore_above": 256,
"type": "keyword",
"normalizer": "lowercase_normalizer"
}
}
},
"asin_crawl_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss"
},
"category_first_id": {
"type": "keyword"
},
"category_id": {
"type": "keyword"
},
"first_category_rank": {
"type": "integer"
},
"current_category_rank": {
"type": "integer"
},
"asin_weight_ratio": {
"type": "float"
},
"asin_bought_month": {
"type": "integer"
},
"asin_lqs_rating": {
"type": "float"
},
"asin_lqs_rating_detail": {
"type": "keyword"
},
"asin_lob_info": {
"type": "keyword"
},
"is_contains_lob_info": {
"type": "short"
},
"is_package_quantity_abnormal": {
"type": "short"
},
"auctions_num": {
"type": "integer"
},
"auctions_num_all": {
"type": "integer"
},
"skus_num_creat": {
"type": "integer"
},
"skus_num_creat_all": {
"type": "integer"
},
"zr_flow_proportion": {
"type": "float"
},
"matrix_flow_proportion": {
"type": "float"
},
"matrix_ao_val": {
"type": "float"
},
"title_matching_degree": {
"type": "float"
},
"usr_mask_type": {
"type": "keyword"
},
"usr_mask_progress": {
"type": "keyword"
},
"product_features": {
"type": "keyword"
},
"img_info": {
"type": "keyword"
},
"collapse_asin": {
"type": "keyword"
},
"follow_sellers_count": {
"type": "integer"
},
"asin_describe": {
"type": "text"
},
"fbm_price": {
"type": "float"
},
"asin_source_flag": {
"type": "integer"
},
"bsr_last_seen_at": {
"type": "date"
},
"bsr_seen_count_30d": {
"type": "integer"
},
"nsr_last_seen_at": {
"type": "date"
},
"nsr_seen_count_30d": {
"type": "integer"
}
}
}
}
@staticmethod
def get_previous_date(self):
self.df_date = self.spark.sql(f"select * from dim_date_20_to_30")
df = self.df_date.toPandas()
df_loc = df.loc[(df.year_month == f'{self.date_info}') & (df.day == 1)]
cur_month_id = int(list(df_loc.id)[0])
previous_date_id = cur_month_id - 1
df_loc = df.loc[df.id == previous_date_id]
previous_date = str(list(df_loc.year_month)[0])
return previous_date
@staticmethod
def get_previous_two_date(self):
self.df_date = self.spark.sql(f"select * from dim_date_20_to_30")
df = self.df_date.toPandas()
df_loc = df.loc[(df.year_month == f'{self.date_info}') & (df.day == 1)]
cur_month_id = int(list(df_loc.id)[0])
previous_two_date_id = cur_month_id - 40
df_loc = df.loc[df.id == previous_two_date_id]
prvious_two_date = str(list(df_loc.year_month)[0])
return prvious_two_date
@staticmethod
def get_launch_time_interval_dict():
cur_date = datetime.now().date()
return {
"one_month": (cur_date + timedelta(days=-30)).strftime('%Y-%m-%d'),
"three_month": (cur_date + timedelta(days=-90)).strftime('%Y-%m-%d'),
"six_month": (cur_date + timedelta(days=-180)).strftime('%Y-%m-%d'),
"twelve_month": (cur_date + timedelta(days=-360)).strftime('%Y-%m-%d'),
"twenty_four_month": (cur_date + timedelta(days=-720)).strftime('%Y-%m-%d'),
"thirty_six_month": (cur_date + timedelta(days=-1080)).strftime('%Y-%m-%d')
}
@staticmethod
def get_initial_batch_id(self):
max_bought_month_batch_id_sql = f"""
SELECT MAX(batch_id) as initial_batch_id from {self.doris_db}.{self.max_bought_month_table} WHERE date_info='{self.date_info}' AND consumer_type='{self.consumer_type}'
"""
df_max_bought_month_batch_id = DorisHelper.spark_import_with_sql(
self.spark, query=max_bought_month_batch_id_sql)
max_bought_month_batch_id = 0 if df_max_bought_month_batch_id.take(1)[0]['initial_batch_id'] is None else \
df_max_bought_month_batch_id.take(1)[0]['initial_batch_id']
return max_bought_month_batch_id
# 1. 处理asin分类及排名以及排名类型字段
def handle_asin_bs_category_info(self, df):
df = df.withColumnRenamed("parentAsin", "parent_asin")
cate_current_pattern = self.pattern_current_dict[self.site_name]
cate_1_pattern = self.pattern1_dict[self.site_name]
df = df.withColumn("asin_bs_sellers_rank_lower", F.lower("best_sellers_rank"))
df = df.withColumn("asin_bs", self.u_parse_bs_category(
"asin_bs_sellers_rank_lower", "best_sellers_herf", "all_best_sellers_herf", F.lit(cate_current_pattern), F.lit(cate_1_pattern), "node_id"))
df = df.withColumn("asin_bs_cate_1_id", df.asin_bs.getField("asin_bs_cate_1_id")) \
.withColumn("asin_bs_cate_current_id", df.asin_bs.getField("asin_bs_cate_current_id")) \
.withColumn("asin_bs_cate_1_rank", df.asin_bs.getField("asin_bs_cate_1_rank")) \
.withColumn("asin_bs_cate_current_rank", df.asin_bs.getField("asin_bs_cate_current_rank")) \
.drop("asin_bs", "asin_bs_sellers_rank_lower", "best_sellers_herf", "all_best_sellers_herf",
"best_sellers_rank")
df = df.withColumn("rank_type", F.expr("""
CASE WHEN asin_bs_cate_1_rank IS NOT NULL AND asin_bs_cate_1_rank BETWEEN 0 AND 1000 THEN 1
WHEN asin_bs_cate_1_rank BETWEEN 1000 AND 5000 THEN 2 WHEN asin_bs_cate_1_rank BETWEEN 5000 AND 10000 THEN 3
WHEN asin_bs_cate_1_rank BETWEEN 10000 AND 20000 THEN 4 WHEN asin_bs_cate_1_rank BETWEEN 20000 AND 30000 THEN 5
WHEN asin_bs_cate_1_rank BETWEEN 30000 AND 50000 THEN 6 WHEN asin_bs_cate_1_rank BETWEEN 50000 AND 70000 THEN 7
WHEN asin_bs_cate_1_rank >= 70000 THEN 8 ELSE 0 END"""))
return df
# 2. 利用node_id以及分类描述进行分类补充(此时无排名信息)
def handle_asin_category_supplement(self, df):
df = df.join(self.df_asin_new_cate, on=['node_id'], how='left')
df = df.withColumn("asin_bs_cate_current_id", F.coalesce(F.col("asin_bs_cate_current_id"), F.col("node_id"))). \
withColumn("asin_bs_cate_1_id", F.coalesce(F.col("asin_bs_cate_1_id"), F.col("category_first_id"))). \
drop("category_first_id", "node_id")
df_with_category = df.filter("asin_bs_cate_1_id is null and category is not null").select("asin", "category")
df_with_category = df_with_category.withColumn(
"category_split", F.split(F.col("category"), "›")
).withColumn(
"category_first_name", F.lower(F.col("category_split").getItem(0))
).drop("category_split", "category")
df_with_category = df_with_category.join(self.df_asin_category, on=['category_first_name'], how='inner')
df_with_category = df_with_category.withColumnRenamed("category_first_id", "category_first_id_with_name").drop("category_first_name")
df = df.join(df_with_category, on=['asin'], how='left')
df = df.withColumn("asin_bs_cate_1_id", F.coalesce(F.col("asin_bs_cate_1_id"), F.col("category_first_id_with_name"))).drop("category_first_id_with_name")
return df
# 3. 处理bsr销量、价格类型字段以及BSR销售额信息
def handle_asin_bsr_orders(self, df):
df = df.join(self.df_bs_report, on=['asin_bs_cate_1_id', 'asin_bs_cate_1_rank'], how='left')
df = df.withColumn("price_type", F.expr("""
CASE WHEN price IS NOT NULL AND price > 0 AND price < 10 THEN 1 WHEN price >= 10 AND price < 15 THEN 2
WHEN price >= 15 AND price < 20 THEN 3 WHEN price >= 20 AND price < 30 THEN 4
WHEN price >= 30 AND price < 50 THEN 5 WHEN price >= 50 THEN 6 ELSE 0 END""")).\
withColumn("bsr_orders_sale", F.round(F.col("bsr_orders") * F.col("price"), 2))
return df
# 4.解析Make-It-A-Bundle信息
def handle_asin_lob_info(self, df):
df = df.withColumn("is_contains_lob_info",
F.when(F.col("lob_asin_json").isNotNull(), F.lit(1)).otherwise(F.lit(0)))
df_parsed = df.withColumn("parse_asin_lob",
F.when(F.col("is_contains_lob_info") == 1, F.from_json("lob_asin_json", "array<struct<lob_asin:string>>")))
df_result = df_parsed.withColumn("asin_lob_info", F.expr("transform(parse_asin_lob, x -> x.lob_asin)"))
df = df_result.withColumn(
"asin_lob_info", F.regexp_replace(F.concat_ws(",", "asin_lob_info"), "[{}]", "")).drop(
"parse_asin_lob", "lob_asin_json")
return df
# 5. 处理配送方式、卖家所在地以及卖家所在地类型
def handle_asin_buy_box_seller_type(self, df):
df = df.withColumn("seller_json_parsed", self.u_parse_seller_info(df.seller_json))
df = df.withColumn("buy_box_seller_type", df.seller_json_parsed.buy_box_seller_type).withColumn(
"account_name", df.seller_json_parsed.account_name).drop("seller_json_parsed")
df = df.join(self.df_seller_info, on=['seller_id'], how='left')
df = df.withColumn("site_name_type", F.expr("""
CASE WHEN buy_box_seller_type = 1 THEN 4
WHEN buy_box_seller_type != 1 AND seller_country_name is not null AND seller_country_name like '%US%' THEN 1
WHEN buy_box_seller_type != 1 AND seller_country_name is not null AND seller_country_name like '%CN%' THEN 2
ELSE 3 END"""))
return df
# 6. 处理asin基础属性信息(长宽高重量等)
def handle_asin_basic_attribute_info(self, df):
# 1.解析ASIN重量相关信息
df = df.withColumn("weight_str", F.lower(F.col("weight_str"))).withColumn("asin_weight", self.u_parse_weight("weight_str", F.lit(self.site_name))).drop("weight_str")
df = df.withColumn(
"weight", F.when(df.asin_weight.getField("weight_type") == 'pounds', df.asin_weight.getField("weight")).otherwise(F.lit(0))).drop("asin_weight")
# 2.处理重量类型
df = df.withColumn("weight_type", F.expr("""
CASE WHEN weight BETWEEN 0 AND 0.2 THEN 1 WHEN weight BETWEEN 0.2 AND 0.4 THEN 2
WHEN weight BETWEEN 0.4 AND 0.6 THEN 3 WHEN weight BETWEEN 0.6 AND 1 THEN 4
WHEN weight BETWEEN 1 AND 2 THEN 5 WHEN weight >= 2 THEN 6 ELSE 0 END"""))
# 3.解析ASIN体积相关信息
df = df.withColumn("asin_volume", self.u_parse_volume("volume"))
df = df.withColumn("asin_volume_type", df.asin_volume.getField("asin_volume_type")) \
.withColumn("asin_length", F.when(F.col("asin_volume_type") == 'inches', df.asin_volume.getField("length"))) \
.withColumn("asin_width", F.when(F.col("asin_volume_type") == 'inches', df.asin_volume.getField("width"))) \
.withColumn("asin_height", F.when(F.col("asin_volume_type") == 'inches', df.asin_volume.getField("height"))) \
.drop("asin_volume", "asin_volume_type")
# 4.获取体积重/毛重相关信息
df = df.withColumn(
"asin_weight_ratio", F.when(
F.col("asin_length").isNotNull() & (F.col("asin_width").isNotNull()) &
(F.col("asin_height").isNotNull()) &
(F.col("weight") > 0), F.round(F.col("asin_length") * F.col("asin_width") * F.col("asin_height") * 3.2774128 / (F.col("weight") * 453.59), 3))
.otherwise(F.lit(-1)))
# 5.处理尺寸类型
if self.site_name == 'us':
expr_str = f"""
CASE WHEN weight > 0 AND weight * 16 <= 16 AND asin_length > 0 AND asin_length <= 15 AND asin_width > 0 AND asin_width <= 12 AND asin_height > 0 AND asin_height <= 0.75 THEN 1
WHEN weight > 0 AND weight <= 20 AND asin_length > 0 AND asin_length <= 18 AND asin_width > 0 AND asin_width <= 14 AND asin_height > 0 AND asin_height <= 8 THEN 2
WHEN weight > 0 AND weight <= 70 AND asin_length > 0 AND asin_length <= 60 AND asin_width > 0 AND asin_width <= 30 AND asin_length + asin_length + (asin_width + asin_height) * 2 <= 130 THEN 3
WHEN weight > 0 AND weight <= 150 AND asin_length > 0 AND asin_length <= 108 AND asin_length + asin_length + (asin_width + asin_height) * 2 <= 130 THEN 4
WHEN weight > 0 AND weight <= 150 AND asin_length > 0 AND asin_length <= 108 AND asin_length + asin_length + (asin_width + asin_height) * 2 <= 165 THEN 5
WHEN weight > 150 AND asin_length > 108 AND asin_length + asin_length + (asin_width + asin_height) * 2 > 165 THEN 6 ELSE 0 END"""
else:
expr_str = f"""
CASE WHEN weight > 0 AND weight <= 100 AND asin_length > 0 AND asin_length <= 20 AND asin_width > 0 AND asin_width <= 15 AND asin_height > 0 AND asin_height <= 1 THEN 1
WHEN weight > 0 AND weight <= 500 AND asin_length > 0 AND asin_length <= 33 AND asin_width > 0 AND asin_width <= 23 AND asin_height > 0 AND asin_height <= 2.5 THEN 2
WHEN weight > 0 AND weight <= 1000 AND asin_length > 0 AND asin_length <= 33 AND asin_width > 0 AND asin_width <= 23 AND asin_height > 0 AND asin_height <= 5 THEN 3
WHEN weight > 0 AND weight <= 12000 AND asin_length > 0 AND asin_length <= 45 AND asin_width > 0 AND asin_width <= 34 AND asin_height > 0 AND asin_height <= 26 THEN 4
WHEN weight > 0 AND weight <= 2000 AND asin_length > 0 AND asin_length <= 61 AND asin_width > 0 AND asin_width <= 46 AND asin_height > 0 AND asin_height <= 46 THEN 5
WHEN asin_length > 0 AND asin_length <= 150 AND asin_length + asin_length + (asin_width + asin_height) <= 300 THEN 6
WHEN asin_length > 150 AND asin_length + asin_length + (asin_width + asin_height) > 300 THEN 7 ELSE 0 END"""
df = df.withColumn("size_type", F.expr(expr_str)).drop("asin_length", "asin_width", "asin_height")
return df
# 7. 处理asin图片信息
def handle_asin_img_info(self, df):
img_schema = ArrayType(ArrayType(StringType()))
df = df.withColumn("img_list", F.from_json(F.col("img_list"), img_schema))
df_with_img = df.filter(F.size("img_list") > 0).select("asin", "img_list")
df_with_img_attribute = df_with_img.select(
"asin", F.explode("img_list").alias("img_attributes")
).select(
"asin", F.col("img_attributes")[1].alias("img_url"), F.col("img_attributes")[2].alias("img_order_by"),
F.col("img_attributes")[3].alias("data_type")
)
df_with_img_attribute_agg = df_with_img_attribute.groupby("asin").agg(
F.to_json(F.collect_list(F.struct(F.col("img_url"), F.col("img_order_by"), F.col("data_type")))).alias(
"img_info")
)
df = df.drop("img_list")
df = df.join(df_with_img_attribute_agg, on=['asin'], how='left')
return df
# 8. 处理变体相关(ao及母体相关,自然占比及母体自然占比,各类型数量,月销信息等)
def handle_asin_measure(self, df):
df = CommonUtil.get_asin_variant_attribute(df_asin_detail=df, df_asin_measure=self.df_asin_measure,
partition_num=self.repartition_num, use_type=1)
# 是否数量变体类型和ao的类型
df = df.withColumn("quantity_variation_type", F.expr("""
CASE WHEN size is not null and size != '' and lower(size) like '%quantity%' THEN 1 ELSE 0 END""")).withColumn(
"ao_val_type", F.expr("""
CASE WHEN asin_ao_val BETWEEN 0 AND 0.1 THEN 1 WHEN asin_ao_val BETWEEN 0.1 AND 0.2 THEN 2
WHEN asin_ao_val BETWEEN 0.2 AND 0.4 THEN 3 WHEN asin_ao_val BETWEEN 0.4 AND 0.8 THEN 4
WHEN asin_ao_val BETWEEN 0.8 AND 1.2 THEN 5 WHEN asin_ao_val BETWEEN 1.2 AND 2 THEN 6
WHEN asin_ao_val >= 2 THEN 7 ELSE 0 END"""))
df = df.withColumnRenamed("asin_zr_counts", "zr_counts").withColumnRenamed("asin_ao_val", "ao_val") \
.withColumnRenamed("asin_zr_flow_proportion", "zr_flow_proportion") \
.withColumnRenamed("asin_amazon_orders", "asin_bought_month").drop("asin_st_counts", "asin_adv_counts")
return df
# 9. 提取打包数量字段
def handle_asin_package_quantity(self, df):
df = df.withColumn(
"variat_attribute", F.concat_ws("&&&%", F.col("color"), F.col("style"), F.col("size"), F.col("material")))
df = df.withColumn("title_parse", self.u_parse_package_quantity(df.title)).withColumn(
"variat_parse", self.u_parse_package_quantity(df.variat_attribute))
df = df.withColumn("title_package_quantity", df.title_parse.getField("parse_package_quantity")). \
withColumn("variat_package_quantity", df.variat_parse.getField("parse_package_quantity")). \
withColumn("title_package_quantity_is_abnormal", df.title_parse.getField("is_package_quantity_abnormal")). \
withColumn("variat_package_quantity_is_abnormal", df.variat_parse.getField("is_package_quantity_abnormal")). \
drop("title_parse", "variat_parse", "variat_attribute")
df = df.withColumn(
"package_quantity", F.expr("""
CASE WHEN title_package_quantity is null and variat_package_quantity is not null THEN variat_package_quantity
WHEN title_package_quantity is not null THEN title_package_quantity ELSE 1 END""")
).withColumn(
"is_package_quantity_abnormal", F.expr("""
CASE WHEN title_package_quantity is null and variat_package_quantity is not null THEN variat_package_quantity_is_abnormal
WHEN title_package_quantity is not null THEN title_package_quantity_is_abnormal ELSE 2 END""")
).drop("title_package_quantity", "variat_package_quantity", "title_package_quantity_is_abnormal", "variat_package_quantity_is_abnormal")
df = df.withColumn("title", F.lower(F.col("title")))
df = df.join(self.df_user_package_num, on=['asin', 'title'], how='left')
df = df.withColumn("package_quantity", F.coalesce(F.col("user_package_num"), F.col("package_quantity"))). \
withColumn(
"is_package_quantity_abnormal", F.coalesce(F.col("user_is_package_quantity_abnormal"), F.col("is_package_quantity_abnormal"))
).drop("user_package_num", "user_is_package_quantity_abnormal")
return df
# 10. 处理品牌标签、是否告警品牌、处理asin_lqs_rating信息
def handle_asin_lqs_and_brand(self, df):
# 1.品牌标签以及是否告警品牌
df = df.withColumn("is_brand_label", F.expr("""CASE WHEN brand is not null THEN 1 ELSE 0 END"""))
df = df.withColumn("brand", F.lower("brand"))
df = df.join(self.df_alarm_brand_info, on=['brand'], how='left')
df = df.withColumn("is_alarm_brand",
F.when(F.col("is_alarm_brand").isNotNull(), F.col("is_alarm_brand")).otherwise(F.lit(0)))
# 2. lqs评分
df = df.withColumn("category_node_rating",
F.expr(f"""CASE WHEN asin_bs_cate_current_id is not null THEN 1 ELSE 0 END""")) \
.withColumn("zr_rating", F.expr(f"""CASE WHEN zr_counts > 0 THEN 0.5 ELSE 0 END""")) \
.withColumn("sp_rating", F.expr(f"""CASE WHEN sp_counts > 0 THEN 1 ELSE 0 END""")) \
.withColumn("a_add_rating", F.expr(f"""CASE WHEN img_type like '%3%' THEN 1 ELSE 0 END""")) \
.withColumn("video_rating", F.expr(f"""CASE WHEN img_type like '%2%' THEN 0.5 ELSE 0 END""")) \
.withColumn("brand_rating", F.expr(f"""CASE WHEN is_brand_label = 1 THEN 0.2 ELSE 0 END""")) \
.withColumn("product_describe_rating",
F.expr(f"""CASE WHEN product_description is not null THEN 0.2 ELSE 0 END""")) \
.withColumn("highlight_rating", F.expr(f"""
CASE WHEN describe is not null AND size(split(describe, '\\|-\\|')) <= 4 THEN size(split(describe, '\\|-\\|')) * 0.4
WHEN describe is not null AND size(split(describe, '\\|-\\|')) > 4 THEN 1.6 ELSE 0 END""")) \
.withColumn("title_len_rating", F.expr(f"""CASE WHEN title_len >= 50 AND title_len <=200 THEN 0.5 ELSE 0 END""")) \
.withColumn("title_brand_rating", F.expr(f"""
CASE WHEN brand is not null AND lower(regexp_replace(title, '[^a-zA-Z0-9\\s]', '')) LIKE lower(regexp_replace(brand, '[^a-zA-Z0-9\\s]', '')) || '%' THEN 0.5
ELSE 0 END""")) \
.withColumn("img_num_rating", F.expr(f"""
CASE WHEN img_num <= 4 THEN img_num * 0.5 WHEN img_num >4 THEN 2 ELSE 0 END""")) \
.withColumn("img_enlarge_rating", F.expr(f"""CASE WHEN image_view = 1 THEN 0.5 ELSE 0 END"""))
df = df.withColumn(
"asin_lqs_rating",
(F.col("category_node_rating") + F.col("zr_rating") + F.col("sp_rating") + F.col("a_add_rating") +
F.col("video_rating") + F.col("brand_rating") + F.col("product_describe_rating") +
F.col("highlight_rating") + F.col("title_len_rating") + F.col("title_brand_rating") +
F.col("img_num_rating") + F.col("img_enlarge_rating")).cast("double")).withColumn(
"asin_lqs_rating_detail", F.to_json(
F.struct(F.col("category_node_rating"), F.col("zr_rating"), F.col("sp_rating"), F.col("a_add_rating"),
F.col("video_rating"), F.col("brand_rating"), F.col("product_describe_rating"),
F.col("highlight_rating"), F.col("title_len_rating"), F.col("title_brand_rating"),
F.col("img_num_rating"), F.col("img_enlarge_rating")))
)
df = df.drop("product_description", "image_view", "category_node_rating", "zr_rating", "sp_rating",
"a_add_rating", "video_rating", "brand_rating", "product_describe_rating", "highlight_rating",
"title_len_rating", "title_brand_rating", "img_num_rating", "img_enlarge_rating")
return df
# 11. 通过ASIN页面信息处理(评分类型、上架时间类型、电影标签、是否内部asin、是否隐藏分类、有效类型、必需ASIN、asin_type)
def handle_asin_detail_all_type(self, df):
# 1. 评分类型
df = df.withColumn("rating_type", F.expr("""
CASE WHEN rating >= 4.5 THEN 1 WHEN rating >= 4 AND rating < 4.5 THEN 2 WHEN rating >= 3.5 AND rating < 4 THEN 3
WHEN rating >= 3 AND rating < 3.5 THEN 4 WHEN rating < 3 AND rating >= 0 THEN 5 ELSE 0 END"""))
# 2. 上架时间类型
df = df.join(self.df_asin_keep_date, on=['asin'], how='left')
df = df.withColumn("launch_time", F.when(F.col("launch_time").isNull(), F.col("new_launch_time")).otherwise(
F.col("launch_time")))
one_month = self.launch_time_interval_dict['one_month']
three_month = self.launch_time_interval_dict['three_month']
six_month = self.launch_time_interval_dict['six_month']
twelve_month = self.launch_time_interval_dict['twelve_month']
twenty_four_month = self.launch_time_interval_dict['twenty_four_month']
thirty_six_month = self.launch_time_interval_dict['thirty_six_month']
expr_str = f"""
CASE WHEN launch_time >= '{one_month}' THEN 1
WHEN launch_time >= '{three_month}' AND launch_time < '{one_month}' THEN 2
WHEN launch_time >= '{six_month}' AND launch_time < '{three_month}' THEN 3
WHEN launch_time >= '{twelve_month}' AND launch_time < '{six_month}' THEN 4
WHEN launch_time >= '{twenty_four_month}' AND launch_time < '{twelve_month}' THEN 5
WHEN launch_time >= '{thirty_six_month}' AND launch_time < '{twenty_four_month}' THEN 6
WHEN launch_time < '{thirty_six_month}' THEN 7 ELSE 0 END"""
df = df.withColumn("launch_time_type", F.expr(expr_str))
# 3. 电影标签
movie_label_list = ['prime video', 'dvd', 'blu-ray', 'kindle', 'app', 'paperback', 'audible audiobook',
'kindle edition', 'kindle & comixology', 'hardcover', 'comic', 'multi-format', '4k',
'library binding', 'vinyl', 'audio cd', 'mp3 music', 'single issue magazine',
'print magazine', 'unknown binding']
df = df.join(self.df_asin_label_info, on=['asin'], how='left')
condition = reduce(
lambda acc, keyword: acc | F.expr(f"exists(asin_label_list, x -> x like '%{keyword}%')"),
movie_label_list,
F.lit(False)
)
df = df.withColumn("is_movie_label", condition.cast("int")).drop("asin_label_list")
# 4. 是否内部asin、是否隐藏分类
df = df.join(self.df_self_asin_info, on=['asin'], how='left')
df = df.withColumn(
"is_self_asin", F.when(F.col("is_self_asin").isNotNull(), F.col("is_self_asin")).otherwise(F.lit(0)))
df = df.join(self.df_hide_category, on=['asin_bs_cate_current_id'], how='left')
df = df.na.fill({"hide_flag": 0})
df = df.withColumn("is_hide_asin", F.expr("""
CASE WHEN hide_flag = 1 THEN 1 WHEN asin_bs_cate_1_id = 'grocery' and asin_bs_cate_current_id != '6492272011' THEN 1
WHEN asin_bs_cate_current_id in ('21393128011', '21377129011', '21377127011', '21377130011', '21388218011', '21377132011') THEN 1
ELSE 0 END""")).drop("hide_flag")
# 5. 有效类型
df = df.join(self.df_asin_bsr_end, on=['asin_bs_cate_1_id'], how='left')
df = df.withColumn("bsr_type", F.expr("""
CASE WHEN limit_rank is null and asin_bs_cate_1_rank <= 500000 THEN 1 WHEN limit_rank is not null and asin_bs_cate_1_rank <= limit_rank THEN 1 ELSE 0 END"""
)).drop("limit_rank")
# 5. 是否必需ASIN
df = df.withColumn("is_need_asin", F.expr("""
CASE WHEN asin_bs_cate_1_id in ('mobile-apps', 'audible', 'books', 'music', 'dmusic', 'digital-text', 'magazines', 'movies-tv', 'software', 'videogames', 'amazon-devices', 'boost', 'us-live-explorations', 'amazon-renewed') THEN 1
WHEN asin NOT LIKE 'B0%' THEN 1
ELSE 0 END"""))
# 6. asin_type
df = df.withColumn("asin_type", F.expr("""
CASE WHEN is_self_asin=1 THEN 1 WHEN is_need_asin=1 THEN 2 WHEN is_hide_asin=1 THEN 3 ELSE 0 END"""
)).drop("is_self_asin", "is_need_asin", "is_hide_asin")
return df
# 12. 处理变化率相关字段
def handle_asin_attribute_change(self, df):
# 处理ASIN维度的变化率信息
df = df.join(self.df_previous_flow_asin, on=['asin'], how='left')
columns_to_change = [
("ao_val", "previous_asin_ao_val", "ao"),
("price", "previous_asin_price", "price"),
("asin_bs_cate_1_rank", "previous_first_category_rank", "rank"),
("bsr_orders", "previous_asin_bsr_orders", "bsr_orders"),
("rating", "previous_asin_rating", "rating"),
("total_comments", "previous_asin_total_comments", "comments"),
("variat_num", "previous_asin_variation_num", "variation"),
("bsr_orders_sale", "previous_sales", "sales")
]
def calculate_change(current_col, previous_col):
rise_col = F.col(current_col) - F.col(previous_col)
change_col = F.when((F.col(previous_col).isNotNull()) & (F.col(previous_col) != 0),
F.round((F.col(current_col) - F.col(previous_col)) / F.col(previous_col), 4)
).otherwise(None)
return rise_col, change_col
for current_col, previous_col, suffix in columns_to_change:
rise_col, change_col = calculate_change(current_col, previous_col)
if suffix == 'ao':
df = df.withColumn(f"{suffix}_rise", F.round(rise_col, 3))
elif suffix in ['price', 'sales']:
df = df.withColumn(f"{suffix}_rise", F.round(rise_col, 2))
elif suffix == 'rating':
df = df.withColumn(f"{suffix}_rise", F.round(rise_col, 1))
else:
df = df.withColumn(f"{suffix}_rise", rise_col.cast(IntegerType()))
df = df.withColumn(f"{suffix}_change", F.round(change_col, 4))
df = df.drop(previous_col)
return df
# 13. 处理不同来源asin
def handle_asin_different_source(self, df):
df = df.join(self.df_asin_source_flag, on=['asin'], how='left').fillna({"asin_source_flag": "0"}).withColumn(
"asin_source_flag", F.split(F.col("asin_source_flag"), ",")
).withColumn(
"asin_source_flag", F.expr("transform(asin_source_flag, x -> cast(x as int))")
)
return df
# 14. 字段标准化
def handle_column_name(self, df):
df = df.withColumnRenamed("asin_bs_cate_1_id", "category_first_id")\
.withColumnRenamed("asin_bs_cate_current_id", "category_id") \
.withColumnRenamed("asin_bs_cate_1_rank", "first_category_rank")\
.withColumnRenamed("asin_bs_cate_current_rank", "current_category_rank") \
.withColumnRenamed("variat_num", "variation_num")\
.withColumnRenamed("seller_id", "account_id").withColumnRenamed("seller_country_name", "site_name") \
.withColumnRenamed("asinUpdateTime", "asin_crawl_date")\
.withColumnRenamed("customer_reviews_json", "product_features")\
.withColumn("collapse_asin", F.coalesce(F.col("parent_asin"), F.col("asin")))\
.withColumn("bsr_best_orders_type", F.lit(-1))
df_save = df.select("asin", "ao_val", "zr_counts", "sp_counts", "sb_counts", "vi_counts", "bs_counts", "ac_counts",
"tr_counts", "er_counts", "bsr_orders", "bsr_orders_sale", "title", "title_len", "price",
"rating", "total_comments", "buy_box_seller_type", "page_inventory", "volume", "weight", "color",
"size", "style", "material", "launch_time", "img_num", "parent_asin", "img_type", "img_url",
"activity_type", "one_two_val", "three_four_val", "five_six_val", "eight_val", "brand",
"variation_num", "one_star", "two_star", "three_star", "four_star", "five_star", "low_star",
"together_asin", "account_name", "account_id", "rank_rise", "rank_change", "ao_rise",
"ao_change", "price_rise", "price_change", "rating_rise", "rating_change", "comments_rise",
"comments_change", "bsr_orders_rise", "bsr_orders_change", "sales_rise", "sales_change",
"variation_rise", "variation_change", "size_type", "rating_type", "site_name_type",
"weight_type", "launch_time_type", "ao_val_type", "rank_type", "price_type", "bsr_type",
"bsr_best_orders_type", "quantity_variation_type", "package_quantity", "is_movie_label",
"is_brand_label", "is_alarm_brand", "asin_type", "asin_crawl_date", "category_first_id",
"category_id", "first_category_rank", "current_category_rank", "asin_weight_ratio",
"site_name", "asin_bought_month", "asin_lqs_rating", "asin_lqs_rating_detail",
"asin_lob_info", "is_contains_lob_info", "is_package_quantity_abnormal", "category",
"zr_flow_proportion", "matrix_flow_proportion", "matrix_ao_val", "product_features", "img_info",
"collapse_asin", F.col("follow_sellers").alias("follow_sellers_count"), "seller_json",
F.col("describe").alias("asin_describe"), F.round("fbm_delivery_price", 2).alias("fbm_price"),
"asin_source_flag", "bsr_last_seen_at", "bsr_seen_count_30d", "nsr_last_seen_at", "nsr_seen_count_30d")
df_save = df_save.na.fill(
{"zr_counts": 0, "sp_counts": 0, "sb_counts": 0, "vi_counts": 0, "bs_counts": 0, "ac_counts": 0,
"tr_counts": 0, "er_counts": 0, "title_len": 0, "total_comments": 0, "variation_num": 0, "img_num": 0,
"one_two_val": 0.0, "three_four_val": 0.0, "five_six_val": 0.0, "eight_val": 0.0,
"one_star": 0, "two_star": 0, "three_star": 0, "four_star": 0, "five_star": 0, "low_star": 0,
"size_type": 0, "rating_type": 0, "site_name_type": 0, "weight_type": 0, "launch_time_type": 0,
"ao_val_type": 0, "rank_type": 0, "price_type": 0, "quantity_variation_type": 0, "package_quantity": 1,
"is_movie_label": 0, "is_brand_label": 0, "is_alarm_brand": 0, "asin_lqs_rating": 0.0, "follow_sellers_count": -1,
"bsr_last_seen_at": "1970-01-01", "bsr_seen_count_30d": 0, "nsr_last_seen_at": "1970-01-01", "nsr_seen_count_30d": 0
}
)
print("asin的标准信息:")
df_save.show(10, truncate=False)
return df_save
def read_data(self):
print("1. 读取上个维度的flow_asin")
sql = f"""
select asin, asin_ao_val as previous_asin_ao_val, asin_price as previous_asin_price,
variation_num as previous_asin_variation_num, asin_rating as previous_asin_rating,
asin_total_comments as previous_asin_total_comments, first_category_rank as previous_first_category_rank,
bsr_orders as previous_asin_bsr_orders, sales as previous_sales
from dwt_flow_asin where site_name = '{self.site_name}' and date_type = '{self.date_type}'
and date_info = '{self.previous_date}'
"""
print("sql=", sql)
self.df_previous_flow_asin = self.spark.sql(sqlQuery=sql)
if self.df_previous_flow_asin.count() <= 1:
print("该历史节点数据不全,调整到上上个月")
sql = f"""
select asin, first_category_rank as previous_first_category_rank,
round(asin_ao_val, 3) as previous_asin_ao_val, asin_price as previous_asin_price,
bsr_orders as previous_asin_bsr_orders, asin_rating as previous_asin_rating,
asin_total_comments as previous_asin_total_comments, sales as previous_sales,
variation_num as previous_asin_variation_num
from dwt_flow_asin where site_name = '{self.site_name}' and date_type = '{self.date_type}'
and date_info = '{self.previous_two_date}'
"""
print("sql=", sql)
self.df_previous_flow_asin = self.spark.sql(sqlQuery=sql)
self.df_previous_flow_asin = self.df_previous_flow_asin.repartition(self.repartition_num).persist(StorageLevel.DISK_ONLY)
self.df_previous_flow_asin.show(10, truncate=False)
print("2. 获取卖家相关信息")
sql = f"""
select fd_unique as seller_id, upper(fd_country_name) as seller_country_name from dim_fd_asin_info
where site_name='{self.site_name}' and fd_unique is not null group by fd_unique, fd_country_name"""
print("sql=", sql)
self.df_seller_info = self.spark.sql(sqlQuery=sql)
self.df_seller_info = self.df_seller_info.repartition(self.repartition_num).persist(StorageLevel.DISK_ONLY)
self.df_seller_info.show(10, truncate=False)
print("3. 读取内部asin信息")
sql = f"""select asin, 1 as is_self_asin from {self.site_name}_self_asin group by asin"""
print("sql=", sql)
mysql_con_info = DBUtil.get_connection_info(db_type='mysql', site_name=self.site_name)
if mysql_con_info is not None:
df_self_asin_info = SparkUtil.read_jdbc_query(
session=self.spark, url=mysql_con_info['url'], pwd=mysql_con_info['pwd'],
username=mysql_con_info['username'], query=sql)
self.df_self_asin_info = F.broadcast(df_self_asin_info)
self.df_self_asin_info.show(10, truncate=False)
print("4. 读取告警品牌信息")
sql = f"""
select brand, 1 as is_alarm_brand
from (select lower(trim(brand_name)) as brand
from brand_alert_erp where brand_name is not null) t group by brand"""
print("sql=", sql)
if self.site_name == 'us':
pg_cluster_con_info = DBUtil.get_connection_info(db_type="postgresql_cluster", site_name=self.site_name)
if pg_cluster_con_info is not None:
df_alarm_brand_info = SparkUtil.read_jdbc_query(
session=self.spark, url=pg_cluster_con_info['url'], pwd=pg_cluster_con_info['pwd'],
username=pg_cluster_con_info['username'], query=sql)
self.df_alarm_brand_info = F.broadcast(df_alarm_brand_info)
self.df_alarm_brand_info.show(10, truncate=False)
else:
schema = StructType([
StructField("brand", StringType(), True),
StructField("is_alarm_brand", IntegerType(), True)
])
self.df_alarm_brand_info = self.spark.createDataFrame([], schema)
print("5. 读取隐藏分类信息")
sql = f"""
select category_id_base as asin_bs_cate_current_id, 1 as hide_flag from us_bs_category_hide group by category_id_base
"""
print("sql=", sql)
us_mysql_con_info = DBUtil.get_connection_info(db_type='mysql', site_name='us')
if us_mysql_con_info is not None:
df_hide_category = SparkUtil.read_jdbc_query(
session=self.spark, url=us_mysql_con_info['url'], pwd=us_mysql_con_info['pwd'],
username=us_mysql_con_info['username'], query=sql)
self.df_hide_category = F.broadcast(df_hide_category)
self.df_hide_category.show(10, truncate=False)
print("6. 读取asin_label信息")
sql = f"""
select asin, label from
(select asin, lower(label) as label, created_time,row_number() over(partition by asin,label order by updated_time desc) as crank
from ods_other_search_term_data where site_name='{self.site_name}' and date_type='{self.date_type}' and
date_info='{self.date_info}' and trim(label) not in ('null','') and label is not null) t where t.crank=1
"""
print("sql=", sql)
self.df_asin_label_info = self.spark.sql(sqlQuery=sql)
if self.df_asin_label_info.count() <= 1:
print("该历史节点数据不全,调整到上上个月")
sql = f"""
select asin, label from
(select asin, lower(label) as label, created_time,row_number()
over(partition by asin,label order by updated_time desc) as crank
from ods_other_search_term_data where site_name='{self.site_name}' and date_type='{self.date_type}' and
date_info='{self.previous_date}' and trim(label) not in ('null','') and label is not null) t where t.crank=1
"""
print("sql=", sql)
self.df_asin_label_info = self.spark.sql(sqlQuery=sql)
self.df_asin_label_info = self.df_asin_label_info.groupby(['asin']).agg(
F.collect_set("label").alias("asin_label_list"))
self.df_asin_label_info = self.df_asin_label_info.repartition(self.repartition_num).persist(StorageLevel.DISK_ONLY)
self.df_asin_label_info.show(10, truncate=False)
print("7. 读取dwd_asin_measure拿取ao及各类型数量")
sql = f"""
select asin, asin_sp_counts as sp_counts, (asin_sb1_counts + asin_sb2_counts) as sb_counts, asin_sb3_counts as vi_counts,
asin_bs_counts as bs_counts, asin_ac_counts as ac_counts, asin_tr_counts as tr_counts, asin_er_counts as er_counts,
asin_st_counts, asin_zr_counts, asin_adv_counts, round(asin_zr_flow_proportion, 3) as asin_zr_flow_proportion,
round(asin_ao_val, 3) as asin_ao_val, asin_amazon_orders
from dwd_asin_measure where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'
"""
print("sql=", sql)
self.df_asin_measure = self.spark.sql(sqlQuery=sql)
self.df_asin_measure = self.df_asin_measure.repartition(self.repartition_num).persist(StorageLevel.DISK_ONLY)
self.df_asin_measure.show(10, truncate=False)
print("8. 读取one_category_report表")
if int(self.year) == 2022 and int(self.month) < 3:
sql = f"select category_id as asin_bs_cate_1_id, rank as asin_bs_cate_1_rank, orders as bsr_orders from ods_one_category_report " \
f"where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='2022-12'"
else:
sql = f"select category_id as asin_bs_cate_1_id, rank as asin_bs_cate_1_rank, orders as bsr_orders from ods_one_category_report " \
f"where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'"
print("sql=", sql)
self.df_bs_report = self.spark.sql(sqlQuery=sql)
self.df_bs_report = self.df_bs_report.repartition(self.repartition_num).persist(StorageLevel.DISK_ONLY)
self.df_bs_report.show(10, truncate=False)
print("9. 读取keep_date获取上架时间")
sql = f"""
select asin, new_launch_time from
(select asin, launch_time as new_launch_time,
row_number() over(partition by asin order by updated_at desc) as trank
from ods_asin_keep_date where site_name='{self.site_name}' and state=3) t where t.trank=1
"""
print("sql=", sql)
self.df_asin_keep_date = self.spark.sql(sqlQuery=sql)
self.df_asin_keep_date = self.df_asin_keep_date.repartition(self.repartition_num).persist(StorageLevel.DISK_ONLY)
self.df_asin_keep_date.show(10, truncate=False)
print("10. 读取bsr有效排名信息")
sql = f"""select rank as limit_rank, category_id as asin_bs_cate_1_id from {self.site_name}_bsr_end"""
print("sql=", sql)
if mysql_con_info is not None:
df_asin_bsr_end = SparkUtil.read_jdbc_query(
session=self.spark, url=mysql_con_info['url'], pwd=mysql_con_info['pwd'],
username=mysql_con_info['username'], query=sql)
self.df_asin_bsr_end = F.broadcast(df_asin_bsr_end)
self.df_asin_bsr_end.show(10, truncate=False)
print("11. 通过node_id获取一级分类进行分类补充")
df_asin_new_cate = get_node_first_id_df(self.site_name, self.spark)
self.df_asin_new_cate = F.broadcast(df_asin_new_cate)
self.df_asin_new_cate.show(10, truncate=False)
print("12. 获取用户修改打包数量信息")
pg_con_info = DBUtil.get_connection_info("postgresql", "us")
sql = f"""
WITH ranked_edit_logs AS (SELECT edit_key_id, lower(val_related_info) as val_related_info, val_after,
ROW_NUMBER() OVER (PARTITION BY edit_key_id ORDER BY create_time DESC) AS rn FROM sys_edit_log
WHERE module = '流量选品' AND filed = 'package_quantity' AND site_name='{self.site_name}')
SELECT edit_key_id as asin, val_related_info as title, cast(val_after as int) as user_package_num,
0 as user_is_package_quantity_abnormal FROM ranked_edit_logs WHERE rn = 1"""
if pg_con_info is not None:
df_user_package_num = SparkUtil.read_jdbc_query(
session=self.spark, url=pg_con_info['url'], pwd=pg_con_info['pwd'],
username=pg_con_info['username'], query=sql)
self.df_user_package_num = F.broadcast(df_user_package_num)
self.df_user_package_num.show(10, truncate=False)
print("14. 获取分类ID与分类名称的对应关系")
self.df_asin_category = get_first_id_from_category_desc_df(self.site_name, self.spark)
self.df_asin_category = self.df_asin_category.withColumn(
"category_first_name", F.lower("category_first_name")
).repartition(self.repartition_num).persist(StorageLevel.DISK_ONLY)
self.df_asin_category.show(10, truncate=False)
print("15. 获取asin不同来源标识")
sql = f"""
select asin, asin_cate_flag as asin_source_flag, bsr_latest_date as bsr_last_seen_at, bsr_30day_count as bsr_seen_count_30d,
nsr_latest_date as nsr_last_seen_at, nsr_30day_count as nsr_seen_count_30d from dwd_asin_cate_flag where site_name='{self.site_name}'
"""
self.df_asin_source_flag = self.spark.sql(sqlQuery=sql)
self.df_asin_source_flag = self.df_asin_source_flag.repartition(self.repartition_num).persist(StorageLevel.DISK_ONLY)
self.df_asin_source_flag.show(10, truncate=False)
# 字段处理逻辑综合
def handle_all_field(self, df):
# 1. 处理asin分类及排名以及排名类型字段
df = self.handle_asin_bs_category_info(df)
# 2. 利用node_id进行分类补充
df = self.handle_asin_category_supplement(df)
# 3. 处理bsr销量及销售额信息以及价格类型字段
df = self.handle_asin_bsr_orders(df)
# 4. 解析Make-It-A-Bundle信息
df = self.handle_asin_lob_info(df)
# 5. 处理配送方式、卖家所在地以及卖家所在地类型
df = self.handle_asin_buy_box_seller_type(df)
# 6. 处理asin基础属性信息(长宽高重量等)
df = self.handle_asin_basic_attribute_info(df)
# 7. 处理asin图片信息
df = self.handle_asin_img_info(df)
# 8. 处理变体相关(ao及母体相关,自然占比及母体自然占比,各类型数量,月销信息等)
df = self.handle_asin_measure(df)
# 9. 提取打包数量字段
df = self.handle_asin_package_quantity(df)
# 10. 处理品牌标签、是否告警品牌、处理asin_lqs_rating信息
df = self.handle_asin_lqs_and_brand(df)
# 11.通过ASIN页面信息处理(评分类型、上架时间类型、电影标签、ASIN类型、有效类型)
df = self.handle_asin_detail_all_type(df)
# 12. 处理变化率相关字段
df = self.handle_asin_attribute_change(df)
# 13. 处理不同来源asin
df = self.handle_asin_different_source(df)
# 14. 字段标准化
df_save = self.handle_column_name(df)
return df_save
# 写入es前的准备工作
def es_prepare(self):
# 创建对应es索引
EsUtils.create_index(self.es_index_name, self.client, self.es_index_body)
print("索引名称为:", self.es_index_name)
# 执行富集策略
# self.client.enrich.execute_policy(name=self.policy_name1)
# self.client.enrich.execute_policy(name=self.policy_name2)
# 写入elasticsearch逻辑
def save_to_es(self, df, batch_num):
print("插入当前批次数据, 插入的数量量为: " + str(batch_num))
start_time = time.time()
df = df.drop("category", "seller_json")
df.write.format("org.elasticsearch.spark.sql").options(**self.es_options).mode("append").save()
end_time = time.time()
elapsed_time = end_time - start_time
print("当前插入时长为:" + str(elapsed_time))
# 实时消费中批次数据的处理逻辑
def handle_kafka_stream(self, df, batch_id):
try:
batch_num = df.count()
if batch_num > 0:
start_time = time.time()
print("当前批次:" + str(batch_id) + "; 该批次数据量为:" + str(batch_num))
df = df.repartition(self.repartition_num)
batch_id = int(batch_id) + self.initial_batch_id
df_save = self.handle_all_field(df)
self.es_prepare()
self.save_to_es(df_save, batch_num)
df_save.unpersist()
end_time = time.time()
print("当前批次:" + str(batch_id) + "执行完毕, 执行时长为:" + str(end_time - start_time))
else:
print("当前批次没有数据")
except Exception as e:
print(e, traceback.format_exc())
# 消费主题下的所有历史数据
def handle_kafka_history(self, kafka_df):
print("处理kafka历史数据")
batch_num = kafka_df.count()
if batch_num > 0:
self.history_batch_id = self.history_batch_id + 1
start_time = time.time()
kafka_df = kafka_df.repartition(self.repartition_num)
kafka_df = self.handle_all_field(kafka_df)
self.es_prepare()
self.save_to_es(kafka_df, batch_num)
end_time = time.time()
print("该批次数据处理完毕, 执行时长为:" + str(end_time - start_time))
else:
raise ValueError("当前主题中没有数据,请注意检查!")
if __name__ == '__main__':
arguments = sys.argv[1:]
site_name = sys.argv[1]
date_type = sys.argv[2]
date_info = sys.argv[3]
consumer_type = sys.argv[4]
if len(arguments) == 5:
test_flag = sys.argv[5]
else:
test_flag = 'normal'
handle_obj = KafkaFlowAsinDetail(site_name=site_name, date_type=date_type, date_info=date_info, consumer_type=consumer_type, test_flag=test_flag, batch_size=200000)
handle_obj.run_kafka()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment