Commit 8d3c2240 by chenyuanjie

流量选品-月流程-增加字段

parent a0bfbc17
...@@ -38,6 +38,11 @@ class DwtFlowAsin(Templates): ...@@ -38,6 +38,11 @@ class DwtFlowAsin(Templates):
self.site_name = site_name self.site_name = site_name
self.date_type = date_type self.date_type = date_type
self.date_info = date_info self.date_info = date_info
# 获取历史date_info
# 环比上月
self.date_info_last_month = CommonUtil.get_month_offset(self.date_info, -1)
# 同比去年
self.date_info_last_year = CommonUtil.get_month_offset(self.date_info, -12)
self.hive_tb = f"dwt_flow_asin" self.hive_tb = f"dwt_flow_asin"
self.partition_dict = { self.partition_dict = {
"site_name": site_name, "site_name": site_name,
...@@ -69,6 +74,8 @@ class DwtFlowAsin(Templates): ...@@ -69,6 +74,8 @@ class DwtFlowAsin(Templates):
self.df_fd_asin_info = self.spark.sql(f"select 1+1;") self.df_fd_asin_info = self.spark.sql(f"select 1+1;")
self.df_flow_asin_last = self.spark.sql(f"select 1+1;") self.df_flow_asin_last = self.spark.sql(f"select 1+1;")
self.df_title_matching_degree = self.spark.sql(f"select 1+1;") self.df_title_matching_degree = self.spark.sql(f"select 1+1;")
self.df_asin_bought_history = self.spark.sql(f"select 1+1;")
self.df_keepa_asin = self.spark.sql(f"select 1+1;")
@staticmethod @staticmethod
def udf_get_previous_last_30_day(self): def udf_get_previous_last_30_day(self):
...@@ -228,6 +235,23 @@ class DwtFlowAsin(Templates): ...@@ -228,6 +235,23 @@ class DwtFlowAsin(Templates):
self.df_title_matching_degree = self.spark.sql(sqlQuery=sql) self.df_title_matching_degree = self.spark.sql(sqlQuery=sql)
self.df_title_matching_degree = self.df_title_matching_degree.repartition(60).persist(StorageLevel.DISK_ONLY) self.df_title_matching_degree = self.df_title_matching_degree.repartition(60).persist(StorageLevel.DISK_ONLY)
self.df_title_matching_degree.show(10, truncate=False) self.df_title_matching_degree.show(10, truncate=False)
print("8.读取历史月销数据")
sql = f"""
select asin, asin_bought_month, date_info from dwt_flow_asin where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info in ('{self.date_info_last_month}', '{self.date_info_last_year}')
"""
print("sql:" + sql)
self.df_asin_bought_history = self.spark.sql(sqlQuery=sql)
self.df_asin_bought_history = self.df_asin_bought_history.repartition(60).persist(StorageLevel.DISK_ONLY)
self.df_asin_bought_history.show(10, truncate=False)
print("9.读取最新keepa数据")
sql = f"""
select asin, tracking_since, keepa_launch_time from dim_keepa_asin_info where site_name = '{self.site_name}'
"""
print("sql:" + sql)
self.df_keepa_asin = self.spark.sql(sqlQuery=sql)
self.df_keepa_asin = self.df_keepa_asin.repartition(60).persist(StorageLevel.DISK_ONLY)
self.df_keepa_asin.show(10, truncate=False)
# 处理asin基础属性信息(体积重量相关)及bsr销售额相关信息 # 处理asin基础属性信息(体积重量相关)及bsr销售额相关信息
def handle_asin_basic_attribute(self): def handle_asin_basic_attribute(self):
...@@ -276,6 +300,13 @@ class DwtFlowAsin(Templates): ...@@ -276,6 +300,13 @@ class DwtFlowAsin(Templates):
self.df_asin_detail = self.df_asin_detail.withColumn( self.df_asin_detail = self.df_asin_detail.withColumn(
"asin_rating_type", F.expr("""CASE WHEN asin_rating >= 4.5 THEN 1 WHEN asin_rating >= 4 AND asin_rating < 4.5 THEN 2 WHEN asin_rating >= 3.5 AND asin_rating < 4 THEN 3 "asin_rating_type", F.expr("""CASE WHEN asin_rating >= 4.5 THEN 1 WHEN asin_rating >= 4 AND asin_rating < 4.5 THEN 2 WHEN asin_rating >= 3.5 AND asin_rating < 4 THEN 3
WHEN asin_rating >= 3 AND asin_rating < 3.5 THEN 4 WHEN asin_rating < 3 AND asin_rating >= 0 THEN 5 ELSE 0 END""")) WHEN asin_rating >= 3 AND asin_rating < 3.5 THEN 4 WHEN asin_rating < 3 AND asin_rating >= 0 THEN 5 ELSE 0 END"""))
# 关联keepa上架时间
self.df_asin_detail = self.df_asin_detail.join(
self.df_keepa_asin, on=['asin'], how='left'
)
self.df_asin_detail = self.df_asin_detail.withColumn(
"asin_launch_time", F.coalesce(F.col("asin_launch_time"), F.col("keepa_launch_time"))
)
# 上架时间类型 # 上架时间类型
one_month = self.launch_time_interval_dict['one_month'] one_month = self.launch_time_interval_dict['one_month']
three_month = self.launch_time_interval_dict['three_month'] three_month = self.launch_time_interval_dict['three_month']
...@@ -303,6 +334,7 @@ class DwtFlowAsin(Templates): ...@@ -303,6 +334,7 @@ class DwtFlowAsin(Templates):
WHEN asin_price >= 70 AND asin_price < 100 THEN 7 WHEN asin_price >= 70 AND asin_price < 100 THEN 7
WHEN asin_price >= 100 AND asin_price < 150 THEN 8 WHEN asin_price >= 100 AND asin_price < 150 THEN 8
WHEN asin_price >= 150 THEN 9 ELSE 0 END""")) WHEN asin_price >= 150 THEN 9 ELSE 0 END"""))
self.df_keepa_asin.unpersist()
# 处理asin分类、排名、排名类型字段、是否有效排名信息 # 处理asin分类、排名、排名类型字段、是否有效排名信息
def handle_asin_category_info(self): def handle_asin_category_info(self):
...@@ -479,6 +511,74 @@ class DwtFlowAsin(Templates): ...@@ -479,6 +511,74 @@ class DwtFlowAsin(Templates):
self.df_asin_detail.drop(previous_col) self.df_asin_detail.drop(previous_col)
self.df_flow_asin_last.unpersist() self.df_flow_asin_last.unpersist()
# 处理一些新增字段
def handle_other_new_col(self):
# 处理月销同比环比
df_flow_asin_last_month = self.df_asin_bought_history.filter(f"date_info = '{self.date_info_last_month}'").withColumnRenamed(
'asin_bought_month', 'asin_bought_last_month'
).drop('date_info')
df_flow_asin_last_year = self.df_asin_bought_history.filter(f"date_info = '{self.date_info_last_year}'").withColumnRenamed(
'asin_bought_month', 'asin_bought_last_year'
).drop('date_info')
self.df_asin_detail = self.df_asin_detail.join(
df_flow_asin_last_month, 'asin', 'left'
).join(
df_flow_asin_last_year, 'asin', 'left'
).withColumn(
'asin_bought_yoy',
F.when(
F.col("asin_bought_month").isNull() & F.col("asin_bought_last_year").isNull(), F.lit(None)
).when(
F.col("asin_bought_month").isNull(), F.lit(-1000.0000)
).when(
F.col("asin_bought_last_year").isNull(), F.lit(1000.0000)
).otherwise(
F.round((F.col("asin_bought_month") - F.col("asin_bought_last_year")) / F.col("asin_bought_last_year"), 4)
)
).withColumn(
'asin_bought_mom',
F.when(
F.col("asin_bought_month").isNull() & F.col("asin_bought_last_month").isNull(), F.lit(None)
).when(
F.col("asin_bought_month").isNull(), F.lit(-1000.0000)
).when(
F.col("asin_bought_last_month").isNull(), F.lit(1000.0000)
).otherwise(
F.round((F.col("asin_bought_month") - F.col("asin_bought_last_month")) / F.col("asin_bought_last_month"), 4)
)
).drop('asin_bought_last_month', 'asin_bought_last_year')
self.df_asin_bought_history.unpersist()
# 处理五点描述长度
self.df_asin_detail = self.df_asin_detail.withColumn(
"describe_len", F.length(F.regexp_replace(F.col("asin_describe"), "\\|-\\|", ""))
)
# 处理keepa追踪时间
one_month = self.launch_time_interval_dict['one_month']
three_month = self.launch_time_interval_dict['three_month']
six_month = self.launch_time_interval_dict['six_month']
twelve_month = self.launch_time_interval_dict['twelve_month']
twenty_four_month = self.launch_time_interval_dict['twenty_four_month']
thirty_six_month = self.launch_time_interval_dict['thirty_six_month']
expr_str = f"""
CASE WHEN tracking_since >= '{one_month}' THEN 1
WHEN tracking_since >= '{three_month}' AND tracking_since < '{one_month}' THEN 2
WHEN tracking_since >= '{six_month}' AND tracking_since < '{three_month}' THEN 3
WHEN tracking_since >= '{twelve_month}' AND tracking_since < '{six_month}' THEN 4
WHEN tracking_since >= '{twenty_four_month}' AND tracking_since < '{twelve_month}' THEN 5
WHEN tracking_since >= '{thirty_six_month}' AND tracking_since < '{twenty_four_month}' THEN 6
WHEN tracking_since < '{thirty_six_month}' THEN 7 ELSE 0 END
"""
self.df_asin_detail = self.df_asin_detail.withColumn(
"tracking_since",
F.when(
F.col("tracking_since").isNull(), F.lit(None)
).otherwise(
F.date_format(F.from_unixtime((F.col("tracking_since") + F.lit(21564000)) * 60), "yyyy-MM-dd")
)
).withColumn(
"tracking_since_type", F.expr(expr_str)
)
# 字段标准化 # 字段标准化
def handle_column(self): def handle_column(self):
self.df_save = self.df_asin_detail.\ self.df_save = self.df_asin_detail.\
...@@ -517,6 +617,7 @@ class DwtFlowAsin(Templates): ...@@ -517,6 +617,7 @@ class DwtFlowAsin(Templates):
F.lit(None).alias("theme_en"), F.lit(None).alias("theme_label_en"), "asin_lqs_rating", F.lit(None).alias("theme_en"), F.lit(None).alias("theme_label_en"), "asin_lqs_rating",
"asin_lqs_rating_detail", "title_matching_degree", "zr_flow_proportion", "matrix_flow_proportion", "asin_lqs_rating_detail", "title_matching_degree", "zr_flow_proportion", "matrix_flow_proportion",
"matrix_ao_val", "follow_sellers_count", "seller_json", "asin_describe", "asin_fbm_price", "matrix_ao_val", "follow_sellers_count", "seller_json", "asin_describe", "asin_fbm_price",
"asin_bought_mom", "asin_bought_yoy", "describe_len", "tracking_since", "tracking_since_type",
F.lit(self.site_name).alias("site_name"), F.lit(self.date_type).alias("date_type"), F.lit(self.site_name).alias("site_name"), F.lit(self.date_type).alias("date_type"),
F.lit(self.date_info).alias("date_info")) F.lit(self.date_info).alias("date_info"))
self.df_save = self.df_save.na.fill( self.df_save = self.df_save.na.fill(
...@@ -528,7 +629,7 @@ class DwtFlowAsin(Templates): ...@@ -528,7 +629,7 @@ class DwtFlowAsin(Templates):
"asin_rating_type": 0, "asin_site_name_type": 0, "asin_weight_type": 0, "asin_launch_time_type": 0, "asin_rating_type": 0, "asin_site_name_type": 0, "asin_weight_type": 0, "asin_launch_time_type": 0,
"asin_ao_val_type": 0, "asin_rank_type": 0, "asin_price_type": 0, "asin_quantity_variation_type": 0, "asin_ao_val_type": 0, "asin_rank_type": 0, "asin_price_type": 0, "asin_quantity_variation_type": 0,
"package_quantity": 1, "is_movie_label": 0, "is_brand_label": 0, "is_alarm_brand": 0, "package_quantity": 1, "is_movie_label": 0, "is_brand_label": 0, "is_alarm_brand": 0,
"title_matching_degree": 0.0, "asin_lqs_rating": 0.0, "follow_sellers_count": -1}) "title_matching_degree": 0.0, "asin_lqs_rating": 0.0, "follow_sellers_count": -1, "describe_len": 0})
self.df_save = self.df_save.repartition(60).persist(StorageLevel.DISK_ONLY) self.df_save = self.df_save.repartition(60).persist(StorageLevel.DISK_ONLY)
self.df_save = self.df_save.drop_duplicates(['asin']).filter((F.col("asin").isNotNull()) & (F.col("asin") != "") & (F.length(F.col("asin")) <= 10)) self.df_save = self.df_save.drop_duplicates(['asin']).filter((F.col("asin").isNotNull()) & (F.col("asin") != "") & (F.length(F.col("asin")) <= 10))
print("数据量为:", self.df_save.count()) print("数据量为:", self.df_save.count())
...@@ -566,14 +667,15 @@ class DwtFlowAsin(Templates): ...@@ -566,14 +667,15 @@ class DwtFlowAsin(Templates):
F.col("current_category_rank").alias("category_current_rank"), "asin_type", F.col("current_category_rank").alias("category_current_rank"), "asin_type",
"bsr_orders", F.col("sales").alias("bsr_orders_sale"), "bsr_orders", F.col("sales").alias("bsr_orders_sale"),
F.col("asin_page_inventory").alias("page_inventory"), "asin_bought_month", "seller_json", F.col("asin_page_inventory").alias("page_inventory"), "asin_bought_month", "seller_json",
F.col("asin_buy_box_seller_type").alias("buy_box_seller_type"), "asin_describe", "asin_fbm_price" F.col("asin_buy_box_seller_type").alias("buy_box_seller_type"), "asin_describe", "asin_fbm_price",
F.col("describe_len").alias("asin_describe_len")
) )
table_columns = """asin, asin_ao_val, asin_title, asin_title_len, asin_category_desc, asin_volume, table_columns = """asin, asin_ao_val, asin_title, asin_title_len, asin_category_desc, asin_volume,
asin_weight, asin_launch_time, asin_brand_name, one_star, two_star, three_star, four_star, five_star, low_star, asin_weight, asin_launch_time, asin_brand_name, one_star, two_star, three_star, four_star, five_star, low_star,
account_name, account_id, seller_country_name, category_first_id, parent_asin, variation_num, img_info, account_name, account_id, seller_country_name, category_first_id, parent_asin, variation_num, img_info,
asin_crawl_date, asin_price, asin_rating, asin_total_comments, matrix_ao_val, zr_flow_proportion, matrix_flow_proportion, asin_crawl_date, asin_price, asin_rating, asin_total_comments, matrix_ao_val, zr_flow_proportion, matrix_flow_proportion,
date_info, img_url, category_current_id, category_first_rank, category_current_rank, asin_type, bsr_orders, bsr_orders_sale, date_info, img_url, category_current_id, category_first_rank, category_current_rank, asin_type, bsr_orders, bsr_orders_sale,
page_inventory, asin_bought_month, seller_json, buy_box_seller_type, asin_describe, asin_fbm_price""" page_inventory, asin_bought_month, seller_json, buy_box_seller_type, asin_describe, asin_fbm_price, asin_describe_len"""
DorisHelper.spark_export_with_columns(df_save=df_doris, db_name=self.doris_db, table_name=self.asin_latest_detail_table, table_columns=table_columns) DorisHelper.spark_export_with_columns(df_save=df_doris, db_name=self.doris_db, table_name=self.asin_latest_detail_table, table_columns=table_columns)
print("save asin_latest_detail success") print("save asin_latest_detail success")
else: else:
...@@ -590,6 +692,7 @@ class DwtFlowAsin(Templates): ...@@ -590,6 +692,7 @@ class DwtFlowAsin(Templates):
self.handle_asin_is_hide() self.handle_asin_is_hide()
self.handle_title_matching_degree() self.handle_title_matching_degree()
self.handle_asin_attribute_change() self.handle_asin_attribute_change()
self.handle_other_new_col()
self.handle_column() self.handle_column()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment