Commit 3e4fe845 by chenyuanjie

流量选品-新增ao值、价格等字段同比计算

parent 252e1e6b
......@@ -74,7 +74,7 @@ class DwtFlowAsin(Templates):
self.df_fd_asin_info = self.spark.sql(f"select 1+1;")
self.df_flow_asin_last = self.spark.sql(f"select 1+1;")
self.df_title_matching_degree = self.spark.sql(f"select 1+1;")
self.df_asin_bought_history = self.spark.sql(f"select 1+1;")
self.df_flow_asin_last_year = self.spark.sql(f"select 1+1;")
self.df_keepa_asin = self.spark.sql(f"select 1+1;")
@staticmethod
......@@ -146,11 +146,40 @@ class DwtFlowAsin(Templates):
}
@staticmethod
def calculate_change(current_col, previous_col):
def build_time_interval_type_expr(col_name, interval_dict):
"""根据列名和时间区间字典生成上架/追踪时间类型 CASE WHEN 表达式"""
one_month = interval_dict['one_month']
three_month = interval_dict['three_month']
six_month = interval_dict['six_month']
twelve_month = interval_dict['twelve_month']
twenty_four_month = interval_dict['twenty_four_month']
thirty_six_month = interval_dict['thirty_six_month']
return f"""CASE WHEN {col_name} >= '{one_month}' THEN 1
WHEN {col_name} >= '{three_month}' AND {col_name} < '{one_month}' THEN 2
WHEN {col_name} >= '{six_month}' AND {col_name} < '{three_month}' THEN 3
WHEN {col_name} >= '{twelve_month}' AND {col_name} < '{six_month}' THEN 4
WHEN {col_name} >= '{twenty_four_month}' AND {col_name} < '{twelve_month}' THEN 5
WHEN {col_name} >= '{thirty_six_month}' AND {col_name} < '{twenty_four_month}' THEN 6
WHEN {col_name} < '{thirty_six_month}' THEN 7 ELSE 0 END"""
@staticmethod
def calculate_change(current_col, previous_col, use_sentinel=False):
rise_col = F.col(current_col) - F.col(previous_col)
change_col = F.when((F.col(previous_col).isNotNull()) & (F.col(previous_col) != 0),
F.round((F.col(current_col) - F.col(previous_col)) / F.col(previous_col), 4)
).otherwise(None)
if use_sentinel:
change_col = F.when(
F.col(current_col).isNull() & F.col(previous_col).isNull(), F.lit(None)
).when(
F.col(current_col).isNull(), F.lit(-1000.0)
).when(
F.col(previous_col).isNull(), F.lit(1000.0)
).otherwise(
F.round((F.col(current_col) - F.col(previous_col)) / F.col(previous_col), 4)
)
else:
change_col = F.when(
(F.col(previous_col).isNotNull()) & (F.col(previous_col) != 0),
F.round((F.col(current_col) - F.col(previous_col)) / F.col(previous_col), 4)
).otherwise(None)
return rise_col, change_col
def read_data(self):
......@@ -215,19 +244,31 @@ class DwtFlowAsin(Templates):
self.df_fd_asin_info = self.spark.sql(sqlQuery=sql)
self.df_fd_asin_info = self.df_fd_asin_info.repartition(60).persist(StorageLevel.DISK_ONLY)
self.df_fd_asin_info.show(10, truncate=False)
print("6.获取上一个最近30天的整合结果")
print("6.获取环比上期整合结果")
sql = f"""
select asin, round(asin_ao_val, 3) as previous_asin_ao_val, asin_price as previous_asin_price,
sales as pervious_sales, variation_num as previous_variation_num, asin_rating as previous_asin_rating,
bsr_orders as previous_bsr_orders, asin_total_comments as previous_asin_total_comments,
first_category_rank as previous_first_category_rank from dwt_flow_asin
select asin, round(asin_ao_val, 3) as previous_asin_ao_val, asin_price as previous_asin_price,
sales as pervious_sales, variation_num as previous_variation_num, asin_rating as previous_asin_rating,
bsr_orders as previous_bsr_orders, asin_total_comments as previous_asin_total_comments,
first_category_rank as previous_first_category_rank, asin_bought_month as previous_asin_bought_month from dwt_flow_asin
where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.previous_date}'
"""
print("sql:" + sql)
self.df_flow_asin_last = self.spark.sql(sqlQuery=sql)
self.df_flow_asin_last = self.df_flow_asin_last.repartition(60).persist(StorageLevel.DISK_ONLY)
self.df_flow_asin_last.show(10, truncate=False)
print("7.获取asin的标题匹配度")
print("7.获取同比去年整合结果")
sql = f"""
select asin, round(asin_ao_val, 3) as lastyear_asin_ao_val, asin_price as lastyear_asin_price,
sales as lastyear_sales, variation_num as lastyear_variation_num, asin_rating as lastyear_asin_rating,
bsr_orders as lastyear_bsr_orders, asin_total_comments as lastyear_asin_total_comments,
first_category_rank as lastyear_first_category_rank, asin_bought_month as lastyear_asin_bought_month from dwt_flow_asin
where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info_last_year}'
"""
print("sql:" + sql)
self.df_flow_asin_last_year = self.spark.sql(sqlQuery=sql)
self.df_flow_asin_last_year = self.df_flow_asin_last_year.repartition(60).persist(StorageLevel.DISK_ONLY)
self.df_flow_asin_last_year.show(10, truncate=False)
print("8.获取asin的标题匹配度")
sql = f"""
select asin, contains_flag from dwd_title_matching_degree where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info}'
"""
......@@ -235,15 +276,6 @@ class DwtFlowAsin(Templates):
self.df_title_matching_degree = self.spark.sql(sqlQuery=sql)
self.df_title_matching_degree = self.df_title_matching_degree.repartition(60).persist(StorageLevel.DISK_ONLY)
self.df_title_matching_degree.show(10, truncate=False)
print("8.读取历史月销数据")
sql = f"""
select asin, asin_bought_month, date_info from dwt_flow_asin where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info in ('{self.date_info_last_month}', '{self.date_info_last_year}')
"""
print("sql:" + sql)
self.df_asin_bought_history = self.spark.sql(sqlQuery=sql)
self.df_asin_bought_history = self.df_asin_bought_history.repartition(60).persist(StorageLevel.DISK_ONLY)
self.df_asin_bought_history.show(10, truncate=False)
print("9.读取最新keepa数据")
sql = f"""
select asin, tracking_since, keepa_launch_time from dim_keepa_asin_info where site_name = '{self.site_name}'
......@@ -308,20 +340,10 @@ class DwtFlowAsin(Templates):
"asin_launch_time", F.coalesce(F.col("asin_launch_time"), F.col("keepa_launch_time"))
)
# 上架时间类型
one_month = self.launch_time_interval_dict['one_month']
three_month = self.launch_time_interval_dict['three_month']
six_month = self.launch_time_interval_dict['six_month']
twelve_month = self.launch_time_interval_dict['twelve_month']
twenty_four_month = self.launch_time_interval_dict['twenty_four_month']
thirty_six_month = self.launch_time_interval_dict['thirty_six_month']
expr_str = f"""CASE WHEN asin_launch_time >= '{one_month}' THEN 1
WHEN asin_launch_time >= '{three_month}' AND asin_launch_time < '{one_month}' THEN 2
WHEN asin_launch_time >= '{six_month}' AND asin_launch_time < '{three_month}' THEN 3
WHEN asin_launch_time >= '{twelve_month}' AND asin_launch_time < '{six_month}' THEN 4
WHEN asin_launch_time >= '{twenty_four_month}' AND asin_launch_time < '{twelve_month}' THEN 5
WHEN asin_launch_time >= '{thirty_six_month}' AND asin_launch_time < '{twenty_four_month}' THEN 6
WHEN asin_launch_time < '{thirty_six_month}' THEN 7 ELSE 0 END"""
self.df_asin_detail = self.df_asin_detail.withColumn("asin_launch_time_type", F.expr(expr_str))
self.df_asin_detail = self.df_asin_detail.withColumn(
"asin_launch_time_type",
F.expr(self.build_time_interval_type_expr("asin_launch_time", self.launch_time_interval_dict))
)
# 价格类型
self.df_asin_detail = self.df_asin_detail.withColumn(
"asin_price_type", F.expr("""
......@@ -510,99 +532,55 @@ class DwtFlowAsin(Templates):
self.df_asin_detail = self.df_asin_detail.join(self.df_title_matching_degree, on=['asin'], how='left')
self.df_title_matching_degree.unpersist()
# 处理变体ASIN属性(asin维度)的变化率相关信息
def handle_asin_attribute_change(self):
# 处理所有变化率相关指标(环比_mom / 同比_yoy 统一处理)
def handle_change_rate(self):
self.df_asin_detail = self.df_asin_detail.join(self.df_flow_asin_last, on=['asin'], how='left')
self.df_asin_detail = self.df_asin_detail.join(self.df_flow_asin_last_year, on=['asin'], how='left')
# (current_col, prev_col, lastyear_col, suffix, rise_round, use_sentinel)
# rise_round: None 表示不计算 _rise;整数表示保留小数位;'int' 表示转 IntegerType
columns_to_change = [
("first_category_rank", "previous_first_category_rank", "asin_rank"),
("bsr_orders", "previous_bsr_orders", "asin_bsr_orders"),
("asin_rating", "previous_asin_rating", "asin_rating"),
("asin_total_comments", "previous_asin_total_comments", "asin_comments"),
("variation_num", "previous_variation_num", "asin_variation"),
("asin_ao_val", "previous_asin_ao_val", "asin_ao"),
("asin_price", "previous_asin_price", "asin_price"),
("sales", "pervious_sales", "asin_sales")
("first_category_rank", "previous_first_category_rank", "lastyear_first_category_rank", "asin_rank", "int", False),
("bsr_orders", "previous_bsr_orders", "lastyear_bsr_orders", "asin_bsr_orders", "int", False),
("asin_rating", "previous_asin_rating", "lastyear_asin_rating", "asin_rating", 1, False),
("asin_total_comments", "previous_asin_total_comments", "lastyear_asin_total_comments", "asin_comments", "int", False),
("variation_num", "previous_variation_num", "lastyear_variation_num", "asin_variation", "int", False),
("asin_ao_val", "previous_asin_ao_val", "lastyear_asin_ao_val", "asin_ao", 3, False),
("asin_price", "previous_asin_price", "lastyear_asin_price", "asin_price", 2, False),
("sales", "pervious_sales", "lastyear_sales", "asin_sales", 2, False),
("asin_bought_month", "previous_asin_bought_month", "lastyear_asin_bought_month", "asin_bought", None, True),
]
for current_col, previous_col, suffix in columns_to_change:
rise_col, change_col = self.calculate_change(current_col, previous_col)
if suffix == 'asin_ao':
self.df_asin_detail = self.df_asin_detail.withColumn(f"{suffix}_rise", F.round(rise_col, 3))
elif suffix in ['asin_price', 'sales']:
self.df_asin_detail = self.df_asin_detail.withColumn(f"{suffix}_rise", F.round(rise_col, 2))
elif suffix == 'asin_rating':
self.df_asin_detail = self.df_asin_detail.withColumn(f"{suffix}_rise", F.round(rise_col, 1))
else:
self.df_asin_detail = self.df_asin_detail.withColumn(f"{suffix}_rise", rise_col.cast(IntegerType()))
self.df_asin_detail = self.df_asin_detail.withColumn(f"{suffix}_change", F.round(change_col, 4))
self.df_asin_detail.drop(previous_col)
for current_col, prev_col, lastyear_col, suffix, rise_round, use_sentinel in columns_to_change:
rise_col, mom_col = self.calculate_change(current_col, prev_col, use_sentinel=use_sentinel)
_, yoy_col = self.calculate_change(current_col, lastyear_col, use_sentinel=use_sentinel)
if rise_round is not None:
if rise_round == "int":
self.df_asin_detail = self.df_asin_detail.withColumn(f"{suffix}_rise", rise_col.cast(IntegerType()))
else:
self.df_asin_detail = self.df_asin_detail.withColumn(f"{suffix}_rise", F.round(rise_col, rise_round))
self.df_asin_detail = self.df_asin_detail \
.withColumn(f"{suffix}_mom", F.round(mom_col, 4)) \
.withColumn(f"{suffix}_yoy", F.round(yoy_col, 4))
self.df_asin_detail = self.df_asin_detail.drop('previous_asin_bought_month', 'lastyear_asin_bought_month')
self.df_flow_asin_last.unpersist()
self.df_flow_asin_last_year.unpersist()
# 处理一些新增字段
def handle_other_new_col(self):
# 处理月销同比环比
df_flow_asin_last_month = self.df_asin_bought_history.filter(f"date_info = '{self.date_info_last_month}'").withColumnRenamed(
'asin_bought_month', 'asin_bought_last_month'
).drop('date_info')
df_flow_asin_last_year = self.df_asin_bought_history.filter(f"date_info = '{self.date_info_last_year}'").withColumnRenamed(
'asin_bought_month', 'asin_bought_last_year'
).drop('date_info')
self.df_asin_detail = self.df_asin_detail.join(
df_flow_asin_last_month, 'asin', 'left'
).join(
df_flow_asin_last_year, 'asin', 'left'
).withColumn(
'asin_bought_yoy',
F.when(
F.col("asin_bought_month").isNull() & F.col("asin_bought_last_year").isNull(), F.lit(None)
).when(
F.col("asin_bought_month").isNull(), F.lit(-1000.0000)
).when(
F.col("asin_bought_last_year").isNull(), F.lit(1000.0000)
).otherwise(
F.round((F.col("asin_bought_month") - F.col("asin_bought_last_year")) / F.col("asin_bought_last_year"), 4)
)
).withColumn(
'asin_bought_mom',
F.when(
F.col("asin_bought_month").isNull() & F.col("asin_bought_last_month").isNull(), F.lit(None)
).when(
F.col("asin_bought_month").isNull(), F.lit(-1000.0000)
).when(
F.col("asin_bought_last_month").isNull(), F.lit(1000.0000)
).otherwise(
F.round((F.col("asin_bought_month") - F.col("asin_bought_last_month")) / F.col("asin_bought_last_month"), 4)
)
).drop('asin_bought_last_month', 'asin_bought_last_year')
self.df_asin_bought_history.unpersist()
# 处理五点描述长度
self.df_asin_detail = self.df_asin_detail.withColumn(
"describe_len", F.length(F.regexp_replace(F.col("asin_describe"), "\\|-\\|", ""))
)
# 处理keepa追踪时间
one_month = self.launch_time_interval_dict['one_month']
three_month = self.launch_time_interval_dict['three_month']
six_month = self.launch_time_interval_dict['six_month']
twelve_month = self.launch_time_interval_dict['twelve_month']
twenty_four_month = self.launch_time_interval_dict['twenty_four_month']
thirty_six_month = self.launch_time_interval_dict['thirty_six_month']
expr_str = f"""
CASE WHEN tracking_since >= '{one_month}' THEN 1
WHEN tracking_since >= '{three_month}' AND tracking_since < '{one_month}' THEN 2
WHEN tracking_since >= '{six_month}' AND tracking_since < '{three_month}' THEN 3
WHEN tracking_since >= '{twelve_month}' AND tracking_since < '{six_month}' THEN 4
WHEN tracking_since >= '{twenty_four_month}' AND tracking_since < '{twelve_month}' THEN 5
WHEN tracking_since >= '{thirty_six_month}' AND tracking_since < '{twenty_four_month}' THEN 6
WHEN tracking_since < '{thirty_six_month}' THEN 7 ELSE 0 END
"""
self.df_asin_detail = self.df_asin_detail.withColumn(
"tracking_since",
"tracking_since",
F.when(
F.col("tracking_since").isNull(), F.lit(None)
).otherwise(
F.date_format(F.from_unixtime((F.col("tracking_since") + F.lit(21564000)) * 60), "yyyy-MM-dd")
)
).withColumn(
"tracking_since_type", F.expr(expr_str)
"tracking_since_type",
F.expr(self.build_time_interval_type_expr("tracking_since", self.launch_time_interval_dict))
)
# 字段标准化
......@@ -620,11 +598,15 @@ class DwtFlowAsin(Templates):
"two_star", "three_star", "four_star", "five_star", "low_star", "together_asin", "ac_name",
"variation_num", "account_name", "account_id", "seller_country_name", "bsr_type",
F.lit(-1).alias("bsr_best_orders_type"), F.lit(None).alias("zr_best_orders_type"), "parent_asin", "asin_rank_rise",
"asin_rank_change", "asin_ao_rise", "asin_ao_change", "asin_price_rise", "asin_price_change",
F.lit(None).alias("asin_orders_rise"), F.lit(None).alias("asin_orders_change"), "asin_rating_rise",
"asin_rating_change", "asin_comments_rise", "asin_comments_change", "asin_bsr_orders_rise",
"asin_bsr_orders_change", "asin_sales_rise", "asin_sales_change", "asin_variation_rise",
"asin_variation_change", "asin_size_type", "asin_rating_type", "asin_site_name_type",
"asin_rank_mom", "asin_rank_yoy", "asin_ao_rise", "asin_ao_mom", "asin_ao_yoy",
"asin_price_rise", "asin_price_mom", "asin_price_yoy",
F.lit(None).alias("asin_orders_rise"), F.lit(None).alias("asin_orders_mom"), F.lit(None).alias("asin_orders_yoy"),
"asin_rating_rise", "asin_rating_mom", "asin_rating_yoy",
"asin_comments_rise", "asin_comments_mom", "asin_comments_yoy",
"asin_bsr_orders_rise", "asin_bsr_orders_mom", "asin_bsr_orders_yoy",
"asin_sales_rise", "asin_sales_mom", "asin_sales_yoy",
"asin_variation_rise", "asin_variation_mom", "asin_variation_yoy",
"asin_size_type", "asin_rating_type", "asin_site_name_type",
"asin_weight_type", "asin_launch_time_type", "asin_ao_val_type", "asin_rank_type", "asin_price_type",
F.lit(None).alias("created_time"), F.lit(None).alias("updated_time"), "asin_lob_info",
"customer_reviews_json", "img_info", "is_contains_lob_info", "is_package_quantity_abnormal",
......@@ -717,7 +699,7 @@ class DwtFlowAsin(Templates):
self.handle_asin_lqs_rating()
self.handle_asin_is_hide()
self.handle_title_matching_degree()
self.handle_asin_attribute_change()
self.handle_change_rate()
self.handle_other_new_col()
self.handle_column()
......
......@@ -84,13 +84,13 @@ class EsStDetail(TemplatesMysql):
asin_activity_type as activity_type, act_one_two_val as one_two_val, act_three_four_val as three_four_val,
act_five_six_val as five_six_val, act_eight_val as eight_val, asin_brand_name as brand, variation_num,
one_star, two_star, three_star, four_star, five_star, low_star, together_asin, account_name, account_id,
seller_country_name as site_name, asin_rank_rise as rank_rise, asin_rank_change as rank_change,
asin_ao_rise as ao_rise, asin_ao_change as ao_change, asin_price_rise as price_rise,
asin_price_change as price_change, asin_rating_rise as rating_rise, asin_rating_change as rating_change,
asin_comments_rise as comments_rise, asin_comments_change as comments_change,
asin_bsr_orders_rise as bsr_orders_rise, asin_bsr_orders_change as bsr_orders_change,
asin_sales_rise as sales_rise, asin_sales_change as sales_change, asin_variation_rise as variation_rise,
asin_variation_change as variation_change, asin_size_type as size_type, asin_rating_type as rating_type,
seller_country_name as site_name, asin_rank_rise as rank_rise, asin_rank_mom as rank_change,
asin_ao_rise as ao_rise, asin_ao_mom as ao_change, asin_price_rise as price_rise,
asin_price_mom as price_change, asin_rating_rise as rating_rise, asin_rating_mom as rating_change,
asin_comments_rise as comments_rise, asin_comments_mom as comments_change,
asin_bsr_orders_rise as bsr_orders_rise, asin_bsr_orders_mom as bsr_orders_change,
asin_sales_rise as sales_rise, asin_sales_mom as sales_change, asin_variation_rise as variation_rise,
asin_variation_mom as variation_change, asin_size_type as size_type, asin_rating_type as rating_type,
asin_site_name_type as site_name_type, asin_weight_type as weight_type, asin_launch_time_type as launch_time_type,
asin_ao_val_type as ao_val_type, asin_rank_type as rank_type, asin_price_type as price_type, bsr_type,
bsr_best_orders_type, asin_quantity_variation_type as quantity_variation_type, package_quantity, is_movie_label,
......@@ -99,7 +99,9 @@ class EsStDetail(TemplatesMysql):
title_matching_degree, asin_lob_info, is_contains_lob_info, is_package_quantity_abnormal, zr_flow_proportion,
matrix_flow_proportion, matrix_ao_val, customer_reviews_json as product_features, img_info,
coalesce(parent_asin, asin) as collapse_asin, follow_sellers_count, asin_describe, asin_fbm_price as fbm_price,
describe_len, asin_bought_mom as bought_month_mom, asin_bought_yoy as bought_month_yoy, tracking_since, tracking_since_type
describe_len, asin_bought_mom as bought_month_mom, asin_bought_yoy as bought_month_yoy, tracking_since, tracking_since_type,
asin_rank_yoy as rank_yoy, asin_ao_yoy as ao_yoy, asin_price_yoy as price_yoy, asin_rating_yoy as rating_yoy,
asin_comments_yoy as comments_yoy, asin_bsr_orders_yoy as bsr_orders_yoy, asin_sales_yoy as sales_yoy, asin_variation_yoy as variation_yoy
from {self.table_name} where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'
"""
print("sql:", sql)
......
......@@ -85,7 +85,9 @@ class KafkaFlowAsinDetail(Templates):
"it": "(\d+) in ",
}
# DataFrame初始化
self.date_info_last_year = CommonUtil.get_month_offset(self.date_info, -12)
self.df_previous_flow_asin = self.spark.sql("select 1+1;")
self.df_previous_flow_asin_lastyear = self.spark.sql("select 1+1;")
self.df_seller_info = self.spark.sql("select 1+1;")
self.df_seller_country = self.spark.sql("select 1+1;")
self.df_asin_seller = self.spark.sql("select 1+1;")
......@@ -508,6 +510,22 @@ class KafkaFlowAsinDetail(Templates):
"title_len_rating", "title_brand_rating", "img_num_rating", "img_enlarge_rating")
return df
@staticmethod
def build_time_interval_type_expr(col_name, interval_dict):
one_month = interval_dict['one_month']
three_month = interval_dict['three_month']
six_month = interval_dict['six_month']
twelve_month = interval_dict['twelve_month']
twenty_four_month = interval_dict['twenty_four_month']
thirty_six_month = interval_dict['thirty_six_month']
return (f"CASE WHEN {col_name} >= '{one_month}' THEN 1 "
f"WHEN {col_name} >= '{three_month}' AND {col_name} < '{one_month}' THEN 2 "
f"WHEN {col_name} >= '{six_month}' AND {col_name} < '{three_month}' THEN 3 "
f"WHEN {col_name} >= '{twelve_month}' AND {col_name} < '{six_month}' THEN 4 "
f"WHEN {col_name} >= '{twenty_four_month}' AND {col_name} < '{twelve_month}' THEN 5 "
f"WHEN {col_name} >= '{thirty_six_month}' AND {col_name} < '{twenty_four_month}' THEN 6 "
f"WHEN {col_name} < '{thirty_six_month}' THEN 7 ELSE 0 END")
# 11. 通过ASIN页面信息处理(评分类型、上架时间类型、电影标签、是否内部asin、是否隐藏分类、有效类型、必需ASIN、asin_type)
def handle_asin_detail_all_type(self, df):
# 1. 评分类型
......@@ -518,21 +536,10 @@ class KafkaFlowAsinDetail(Templates):
df = df.join(self.df_asin_keep_date, on=['asin'], how='left')
df = df.withColumn("launch_time", F.when(F.col("launch_time").isNull(), F.col("new_launch_time")).otherwise(
F.col("launch_time")))
one_month = self.launch_time_interval_dict['one_month']
three_month = self.launch_time_interval_dict['three_month']
six_month = self.launch_time_interval_dict['six_month']
twelve_month = self.launch_time_interval_dict['twelve_month']
twenty_four_month = self.launch_time_interval_dict['twenty_four_month']
thirty_six_month = self.launch_time_interval_dict['thirty_six_month']
expr_str = f"""
CASE WHEN launch_time >= '{one_month}' THEN 1
WHEN launch_time >= '{three_month}' AND launch_time < '{one_month}' THEN 2
WHEN launch_time >= '{six_month}' AND launch_time < '{three_month}' THEN 3
WHEN launch_time >= '{twelve_month}' AND launch_time < '{six_month}' THEN 4
WHEN launch_time >= '{twenty_four_month}' AND launch_time < '{twelve_month}' THEN 5
WHEN launch_time >= '{thirty_six_month}' AND launch_time < '{twenty_four_month}' THEN 6
WHEN launch_time < '{thirty_six_month}' THEN 7 ELSE 0 END"""
df = df.withColumn("launch_time_type", F.expr(expr_str))
df = df.withColumn(
"launch_time_type",
F.expr(self.build_time_interval_type_expr("launch_time", self.launch_time_interval_dict))
)
# 3. 电影标签
movie_label_list = ['prime video', 'dvd', 'blu-ray', 'kindle', 'app', 'paperback', 'audible audiobook',
'kindle edition', 'kindle & comixology', 'hardcover', 'comic', 'multi-format', '4k',
......@@ -571,40 +578,55 @@ class KafkaFlowAsinDetail(Templates):
)).drop("is_self_asin", "is_need_asin", "is_hide_asin")
return df
# 12. 处理变化率相关字段
# 12. 处理变化率相关字段(环比_mom / 同比_yoy 统一处理)
def handle_asin_attribute_change(self, df):
# 处理ASIN维度的变化率信息
df = df.join(self.df_previous_flow_asin, on=['asin'], how='left')
df = df.join(self.df_previous_flow_asin_lastyear, on=['asin'], how='left')
# (current_col, prev_col, lastyear_col, suffix, rise_round, use_sentinel)
# rise_round: None=不计算_rise; 整数=round精度; 'int'=IntegerType
columns_to_change = [
("ao_val", "previous_asin_ao_val", "ao"),
("price", "previous_asin_price", "price"),
("asin_bs_cate_1_rank", "previous_first_category_rank", "rank"),
("bsr_orders", "previous_asin_bsr_orders", "bsr_orders"),
("rating", "previous_asin_rating", "rating"),
("total_comments", "previous_asin_total_comments", "comments"),
("variat_num", "previous_asin_variation_num", "variation"),
("bsr_orders_sale", "previous_sales", "sales")
("ao_val", "previous_asin_ao_val", "lastyear_asin_ao_val", "ao", 3, False),
("price", "previous_asin_price", "lastyear_asin_price", "price", 2, False),
("asin_bs_cate_1_rank", "previous_first_category_rank", "lastyear_first_category_rank", "rank", "int", False),
("bsr_orders", "previous_asin_bsr_orders", "lastyear_asin_bsr_orders", "bsr_orders", "int", False),
("rating", "previous_asin_rating", "lastyear_asin_rating", "rating", 1, False),
("total_comments", "previous_asin_total_comments", "lastyear_asin_total_comments", "comments", "int", False),
("variat_num", "previous_asin_variation_num", "lastyear_asin_variation_num", "variation", "int", False),
("bsr_orders_sale", "previous_sales", "lastyear_sales", "sales", 2, False),
("asin_bought_month", "previous_asin_bought_month", "lastyear_asin_bought_month", "bought_month", None, True),
]
def calculate_change(current_col, previous_col):
def calculate_change(current_col, previous_col, use_sentinel=False):
rise_col = F.col(current_col) - F.col(previous_col)
change_col = F.when((F.col(previous_col).isNotNull()) & (F.col(previous_col) != 0),
F.round((F.col(current_col) - F.col(previous_col)) / F.col(previous_col), 4)
).otherwise(None)
if use_sentinel:
change_col = F.when(
F.col(current_col).isNull() & F.col(previous_col).isNull(), F.lit(None)
).when(
F.col(current_col).isNull(), F.lit(-1000.0)
).when(
F.col(previous_col).isNull(), F.lit(1000.0)
).otherwise(
F.round((F.col(current_col) - F.col(previous_col)) / F.col(previous_col), 4)
)
else:
change_col = F.when(
(F.col(previous_col).isNotNull()) & (F.col(previous_col) != 0),
F.round((F.col(current_col) - F.col(previous_col)) / F.col(previous_col), 4)
).otherwise(None)
return rise_col, change_col
for current_col, previous_col, suffix in columns_to_change:
rise_col, change_col = calculate_change(current_col, previous_col)
if suffix == 'ao':
df = df.withColumn(f"{suffix}_rise", F.round(rise_col, 3))
elif suffix in ['price', 'sales']:
df = df.withColumn(f"{suffix}_rise", F.round(rise_col, 2))
elif suffix == 'rating':
df = df.withColumn(f"{suffix}_rise", F.round(rise_col, 1))
else:
df = df.withColumn(f"{suffix}_rise", rise_col.cast(IntegerType()))
df = df.withColumn(f"{suffix}_change", F.round(change_col, 4))
df = df.drop(previous_col)
for current_col, prev_col, lastyear_col, suffix, rise_round, use_sentinel in columns_to_change:
rise_col, mom_col = calculate_change(current_col, prev_col, use_sentinel=use_sentinel)
_, yoy_col = calculate_change(current_col, lastyear_col, use_sentinel=use_sentinel)
if rise_round is not None:
if rise_round == "int":
df = df.withColumn(f"{suffix}_rise", rise_col.cast(IntegerType()))
else:
df = df.withColumn(f"{suffix}_rise", F.round(rise_col, rise_round))
mom_field = f"{suffix}_mom" if suffix == "bought_month" else f"{suffix}_change"
df = df.withColumn(mom_field, F.round(mom_col, 4)) \
.withColumn(f"{suffix}_yoy", F.round(yoy_col, 4))
df = df.drop('previous_asin_bought_month', 'lastyear_asin_bought_month')
return df
# 13. 处理不同来源asin
......@@ -623,25 +645,14 @@ class KafkaFlowAsinDetail(Templates):
).join(
self.df_asin_profit_rate, on=['asin', 'price'], how='left'
)
one_month = self.launch_time_interval_dict['one_month']
three_month = self.launch_time_interval_dict['three_month']
six_month = self.launch_time_interval_dict['six_month']
twelve_month = self.launch_time_interval_dict['twelve_month']
twenty_four_month = self.launch_time_interval_dict['twenty_four_month']
thirty_six_month = self.launch_time_interval_dict['thirty_six_month']
expr_str = f"""
CASE WHEN tracking_since >= '{one_month}' THEN 1
WHEN tracking_since >= '{three_month}' AND tracking_since < '{one_month}' THEN 2
WHEN tracking_since >= '{six_month}' AND tracking_since < '{three_month}' THEN 3
WHEN tracking_since >= '{twelve_month}' AND tracking_since < '{six_month}' THEN 4
WHEN tracking_since >= '{twenty_four_month}' AND tracking_since < '{twelve_month}' THEN 5
WHEN tracking_since >= '{thirty_six_month}' AND tracking_since < '{twenty_four_month}' THEN 6
WHEN tracking_since < '{thirty_six_month}' THEN 7 ELSE 0 END"""
df = df.withColumn(
"tracking_since",
F.when(F.col("tracking_since").isNull(), F.lit(None)).otherwise(F.date_format(F.from_unixtime((F.col("tracking_since") + F.lit(21564000)) * 60), "yyyy-MM-dd"))
"tracking_since",
F.when(F.col("tracking_since").isNull(), F.lit(None)).otherwise(
F.date_format(F.from_unixtime((F.col("tracking_since") + F.lit(21564000)) * 60), "yyyy-MM-dd")
)
).withColumn(
"tracking_since_type", F.expr(expr_str)
"tracking_since_type",
F.expr(self.build_time_interval_type_expr("tracking_since", self.launch_time_interval_dict))
).withColumn(
'profit_key', F.concat_ws("_", F.col("asin"), F.col("price"))
).withColumn(
......@@ -673,15 +684,15 @@ class KafkaFlowAsinDetail(Templates):
"size", "style", "material", "launch_time", "img_num", "parent_asin", "img_type", "img_url",
"activity_type", "one_two_val", "three_four_val", "five_six_val", "eight_val", "brand",
"variation_num", "one_star", "two_star", "three_star", "four_star", "five_star", "low_star",
"together_asin", "account_name", "account_id", "rank_rise", "rank_change", "ao_rise",
"ao_change", "price_rise", "price_change", "rating_rise", "rating_change", "comments_rise",
"comments_change", "bsr_orders_rise", "bsr_orders_change", "sales_rise", "sales_change",
"variation_rise", "variation_change", "size_type", "rating_type", "site_name_type",
"together_asin", "account_name", "account_id", "rank_rise", "rank_change", "rank_yoy", "ao_rise",
"ao_change", "ao_yoy", "price_rise", "price_change", "price_yoy", "rating_rise", "rating_change", "rating_yoy", "comments_rise",
"comments_change", "comments_yoy", "bsr_orders_rise", "bsr_orders_change", "bsr_orders_yoy", "sales_rise", "sales_change", "sales_yoy",
"variation_rise", "variation_change", "variation_yoy", "size_type", "rating_type", "site_name_type",
"weight_type", "launch_time_type", "ao_val_type", "rank_type", "price_type", "bsr_type",
"bsr_best_orders_type", "quantity_variation_type", "package_quantity", "is_movie_label",
"is_brand_label", "is_alarm_brand", "asin_type", "asin_crawl_date", "category_first_id",
"category_id", "first_category_rank", "current_category_rank", "asin_weight_ratio",
"site_name", "asin_bought_month", "asin_lqs_rating", "asin_lqs_rating_detail",
"site_name", "asin_bought_month", "bought_month_mom", "bought_month_yoy", "asin_lqs_rating", "asin_lqs_rating_detail",
"asin_lob_info", "is_contains_lob_info", "is_package_quantity_abnormal", "category",
"zr_flow_proportion", "matrix_flow_proportion", "matrix_ao_val", "product_features", "img_info",
"collapse_asin", F.col("follow_sellers").alias("follow_sellers_count"), "seller_json",
......@@ -709,13 +720,25 @@ class KafkaFlowAsinDetail(Templates):
select asin, asin_ao_val as previous_asin_ao_val, asin_price as previous_asin_price,
variation_num as previous_asin_variation_num, asin_rating as previous_asin_rating,
asin_total_comments as previous_asin_total_comments, first_category_rank as previous_first_category_rank,
bsr_orders as previous_asin_bsr_orders, sales as previous_sales
bsr_orders as previous_asin_bsr_orders, sales as previous_sales, asin_bought_month as previous_asin_bought_month
from dwt_flow_asin where site_name = '{self.site_name}' and date_type = '30day'
"""
print("sql=", sql)
self.df_previous_flow_asin = self.spark.sql(sqlQuery=sql)
self.df_previous_flow_asin = self.df_previous_flow_asin.repartition(self.repartition_num).persist(StorageLevel.DISK_ONLY)
self.df_previous_flow_asin.show(10, truncate=False)
print("1b. 读取同比去年的flow_asin")
sql = f"""
select asin, asin_ao_val as lastyear_asin_ao_val, asin_price as lastyear_asin_price,
variation_num as lastyear_asin_variation_num, asin_rating as lastyear_asin_rating,
asin_total_comments as lastyear_asin_total_comments, first_category_rank as lastyear_first_category_rank,
bsr_orders as lastyear_asin_bsr_orders, sales as lastyear_sales, asin_bought_month as lastyear_asin_bought_month
from dwt_flow_asin where site_name = '{self.site_name}' and date_type = 'month' and date_info = '{self.date_info_last_year}'
"""
print("sql=", sql)
self.df_previous_flow_asin_lastyear = self.spark.sql(sqlQuery=sql)
self.df_previous_flow_asin_lastyear = self.df_previous_flow_asin_lastyear.repartition(self.repartition_num).persist(StorageLevel.DISK_ONLY)
self.df_previous_flow_asin_lastyear.show(10, truncate=False)
print("2. 获取卖家相关信息")
sql = f"""
select fd_unique as seller_id, fd_account_name as account_name, upper(fd_country_name) as seller_country_name, asin, updated_at
......
......@@ -84,7 +84,9 @@ class KafkaRankAsinDetail(Templates):
"it": "(\d+) in ",
}
# DataFrame初始化
self.date_info_last_year = CommonUtil.get_month_offset(self.date_info[:7], -12)
self.df_previous_flow_asin = self.spark.sql("select 1+1;")
self.df_previous_flow_asin_lastyear = self.spark.sql("select 1+1;")
self.df_seller_info = self.spark.sql("select 1+1;")
self.df_seller_country = self.spark.sql("select 1+1;")
self.df_asin_seller = self.spark.sql("select 1+1;")
......@@ -507,6 +509,22 @@ class KafkaRankAsinDetail(Templates):
"title_len_rating", "title_brand_rating", "img_num_rating", "img_enlarge_rating")
return df
@staticmethod
def build_time_interval_type_expr(col_name, interval_dict):
one_month = interval_dict['one_month']
three_month = interval_dict['three_month']
six_month = interval_dict['six_month']
twelve_month = interval_dict['twelve_month']
twenty_four_month = interval_dict['twenty_four_month']
thirty_six_month = interval_dict['thirty_six_month']
return (f"CASE WHEN {col_name} >= '{one_month}' THEN 1 "
f"WHEN {col_name} >= '{three_month}' AND {col_name} < '{one_month}' THEN 2 "
f"WHEN {col_name} >= '{six_month}' AND {col_name} < '{three_month}' THEN 3 "
f"WHEN {col_name} >= '{twelve_month}' AND {col_name} < '{six_month}' THEN 4 "
f"WHEN {col_name} >= '{twenty_four_month}' AND {col_name} < '{twelve_month}' THEN 5 "
f"WHEN {col_name} >= '{thirty_six_month}' AND {col_name} < '{twenty_four_month}' THEN 6 "
f"WHEN {col_name} < '{thirty_six_month}' THEN 7 ELSE 0 END")
# 11. 通过ASIN页面信息处理(评分类型、上架时间类型、电影标签、是否内部asin、是否隐藏分类、有效类型、必需ASIN、asin_type)
def handle_asin_detail_all_type(self, df):
# 1. 评分类型
......@@ -517,21 +535,10 @@ class KafkaRankAsinDetail(Templates):
df = df.join(self.df_asin_keep_date, on=['asin'], how='left')
df = df.withColumn("launch_time", F.when(F.col("launch_time").isNull(), F.col("new_launch_time")).otherwise(
F.col("launch_time")))
one_month = self.launch_time_interval_dict['one_month']
three_month = self.launch_time_interval_dict['three_month']
six_month = self.launch_time_interval_dict['six_month']
twelve_month = self.launch_time_interval_dict['twelve_month']
twenty_four_month = self.launch_time_interval_dict['twenty_four_month']
thirty_six_month = self.launch_time_interval_dict['thirty_six_month']
expr_str = f"""
CASE WHEN launch_time >= '{one_month}' THEN 1
WHEN launch_time >= '{three_month}' AND launch_time < '{one_month}' THEN 2
WHEN launch_time >= '{six_month}' AND launch_time < '{three_month}' THEN 3
WHEN launch_time >= '{twelve_month}' AND launch_time < '{six_month}' THEN 4
WHEN launch_time >= '{twenty_four_month}' AND launch_time < '{twelve_month}' THEN 5
WHEN launch_time >= '{thirty_six_month}' AND launch_time < '{twenty_four_month}' THEN 6
WHEN launch_time < '{thirty_six_month}' THEN 7 ELSE 0 END"""
df = df.withColumn("launch_time_type", F.expr(expr_str))
df = df.withColumn(
"launch_time_type",
F.expr(self.build_time_interval_type_expr("launch_time", self.launch_time_interval_dict))
)
# 3. 电影标签
movie_label_list = ['prime video', 'dvd', 'blu-ray', 'kindle', 'app', 'paperback', 'audible audiobook',
'kindle edition', 'kindle & comixology', 'hardcover', 'comic', 'multi-format', '4k',
......@@ -570,40 +577,55 @@ class KafkaRankAsinDetail(Templates):
)).drop("is_self_asin", "is_need_asin", "is_hide_asin")
return df
# 12. 处理变化率相关字段
# 12. 处理变化率相关字段(环比_mom / 同比_yoy 统一处理)
def handle_asin_attribute_change(self, df):
# 处理ASIN维度的变化率信息
df = df.join(self.df_previous_flow_asin, on=['asin'], how='left')
df = df.join(self.df_previous_flow_asin_lastyear, on=['asin'], how='left')
# (current_col, prev_col, lastyear_col, suffix, rise_round, use_sentinel)
# rise_round: None=不计算_rise; 整数=round精度; 'int'=IntegerType
columns_to_change = [
("ao_val", "previous_asin_ao_val", "ao"),
("price", "previous_asin_price", "price"),
("asin_bs_cate_1_rank", "previous_first_category_rank", "rank"),
("bsr_orders", "previous_asin_bsr_orders", "bsr_orders"),
("rating", "previous_asin_rating", "rating"),
("total_comments", "previous_asin_total_comments", "comments"),
("variat_num", "previous_asin_variation_num", "variation"),
("bsr_orders_sale", "previous_sales", "sales")
("ao_val", "previous_asin_ao_val", "lastyear_asin_ao_val", "ao", 3, False),
("price", "previous_asin_price", "lastyear_asin_price", "price", 2, False),
("asin_bs_cate_1_rank", "previous_first_category_rank", "lastyear_first_category_rank", "rank", "int", False),
("bsr_orders", "previous_asin_bsr_orders", "lastyear_asin_bsr_orders", "bsr_orders", "int", False),
("rating", "previous_asin_rating", "lastyear_asin_rating", "rating", 1, False),
("total_comments", "previous_asin_total_comments", "lastyear_asin_total_comments", "comments", "int", False),
("variat_num", "previous_asin_variation_num", "lastyear_asin_variation_num", "variation", "int", False),
("bsr_orders_sale", "previous_sales", "lastyear_sales", "sales", 2, False),
("asin_bought_month", "previous_asin_bought_month", "lastyear_asin_bought_month", "bought_month", None, True),
]
def calculate_change(current_col, previous_col):
def calculate_change(current_col, previous_col, use_sentinel=False):
rise_col = F.col(current_col) - F.col(previous_col)
change_col = F.when((F.col(previous_col).isNotNull()) & (F.col(previous_col) != 0),
F.round((F.col(current_col) - F.col(previous_col)) / F.col(previous_col), 4)
).otherwise(None)
if use_sentinel:
change_col = F.when(
F.col(current_col).isNull() & F.col(previous_col).isNull(), F.lit(None)
).when(
F.col(current_col).isNull(), F.lit(-1000.0)
).when(
F.col(previous_col).isNull(), F.lit(1000.0)
).otherwise(
F.round((F.col(current_col) - F.col(previous_col)) / F.col(previous_col), 4)
)
else:
change_col = F.when(
(F.col(previous_col).isNotNull()) & (F.col(previous_col) != 0),
F.round((F.col(current_col) - F.col(previous_col)) / F.col(previous_col), 4)
).otherwise(None)
return rise_col, change_col
for current_col, previous_col, suffix in columns_to_change:
rise_col, change_col = calculate_change(current_col, previous_col)
if suffix == 'ao':
df = df.withColumn(f"{suffix}_rise", F.round(rise_col, 3))
elif suffix in ['price', 'sales']:
df = df.withColumn(f"{suffix}_rise", F.round(rise_col, 2))
elif suffix == 'rating':
df = df.withColumn(f"{suffix}_rise", F.round(rise_col, 1))
else:
df = df.withColumn(f"{suffix}_rise", rise_col.cast(IntegerType()))
df = df.withColumn(f"{suffix}_change", F.round(change_col, 4))
df = df.drop(previous_col)
for current_col, prev_col, lastyear_col, suffix, rise_round, use_sentinel in columns_to_change:
rise_col, mom_col = calculate_change(current_col, prev_col, use_sentinel=use_sentinel)
_, yoy_col = calculate_change(current_col, lastyear_col, use_sentinel=use_sentinel)
if rise_round is not None:
if rise_round == "int":
df = df.withColumn(f"{suffix}_rise", rise_col.cast(IntegerType()))
else:
df = df.withColumn(f"{suffix}_rise", F.round(rise_col, rise_round))
mom_field = f"{suffix}_mom" if suffix == "bought_month" else f"{suffix}_change"
df = df.withColumn(mom_field, F.round(mom_col, 4)) \
.withColumn(f"{suffix}_yoy", F.round(yoy_col, 4))
df = df.drop('previous_asin_bought_month', 'lastyear_asin_bought_month')
return df
# 13. 处理不同来源asin
......@@ -622,25 +644,14 @@ class KafkaRankAsinDetail(Templates):
).join(
self.df_asin_profit_rate, on=['asin', 'price'], how='left'
)
one_month = self.launch_time_interval_dict['one_month']
three_month = self.launch_time_interval_dict['three_month']
six_month = self.launch_time_interval_dict['six_month']
twelve_month = self.launch_time_interval_dict['twelve_month']
twenty_four_month = self.launch_time_interval_dict['twenty_four_month']
thirty_six_month = self.launch_time_interval_dict['thirty_six_month']
expr_str = f"""
CASE WHEN tracking_since >= '{one_month}' THEN 1
WHEN tracking_since >= '{three_month}' AND tracking_since < '{one_month}' THEN 2
WHEN tracking_since >= '{six_month}' AND tracking_since < '{three_month}' THEN 3
WHEN tracking_since >= '{twelve_month}' AND tracking_since < '{six_month}' THEN 4
WHEN tracking_since >= '{twenty_four_month}' AND tracking_since < '{twelve_month}' THEN 5
WHEN tracking_since >= '{thirty_six_month}' AND tracking_since < '{twenty_four_month}' THEN 6
WHEN tracking_since < '{thirty_six_month}' THEN 7 ELSE 0 END"""
df = df.withColumn(
"tracking_since",
F.when(F.col("tracking_since").isNull(), F.lit(None)).otherwise(F.date_format(F.from_unixtime((F.col("tracking_since") + F.lit(21564000)) * 60), "yyyy-MM-dd"))
"tracking_since",
F.when(F.col("tracking_since").isNull(), F.lit(None)).otherwise(
F.date_format(F.from_unixtime((F.col("tracking_since") + F.lit(21564000)) * 60), "yyyy-MM-dd")
)
).withColumn(
"tracking_since_type", F.expr(expr_str)
"tracking_since_type",
F.expr(self.build_time_interval_type_expr("tracking_since", self.launch_time_interval_dict))
).withColumn(
'profit_key', F.concat_ws("_", F.col("asin"), F.col("price"))
).withColumn(
......@@ -672,15 +683,15 @@ class KafkaRankAsinDetail(Templates):
"size", "style", "material", "launch_time", "img_num", "parent_asin", "img_type", "img_url",
"activity_type", "one_two_val", "three_four_val", "five_six_val", "eight_val", "brand",
"variation_num", "one_star", "two_star", "three_star", "four_star", "five_star", "low_star",
"together_asin", "account_name", "account_id", "rank_rise", "rank_change", "ao_rise",
"ao_change", "price_rise", "price_change", "rating_rise", "rating_change", "comments_rise",
"comments_change", "bsr_orders_rise", "bsr_orders_change", "sales_rise", "sales_change",
"variation_rise", "variation_change", "size_type", "rating_type", "site_name_type",
"together_asin", "account_name", "account_id", "rank_rise", "rank_change", "rank_yoy", "ao_rise",
"ao_change", "ao_yoy", "price_rise", "price_change", "price_yoy", "rating_rise", "rating_change", "rating_yoy", "comments_rise",
"comments_change", "comments_yoy", "bsr_orders_rise", "bsr_orders_change", "bsr_orders_yoy", "sales_rise", "sales_change", "sales_yoy",
"variation_rise", "variation_change", "variation_yoy", "size_type", "rating_type", "site_name_type",
"weight_type", "launch_time_type", "ao_val_type", "rank_type", "price_type", "bsr_type",
"bsr_best_orders_type", "quantity_variation_type", "package_quantity", "is_movie_label",
"is_brand_label", "is_alarm_brand", "asin_type", "asin_crawl_date", "category_first_id",
"category_id", "first_category_rank", "current_category_rank", "asin_weight_ratio",
"site_name", "asin_bought_month", "asin_lqs_rating", "asin_lqs_rating_detail",
"site_name", "asin_bought_month", "bought_month_mom", "bought_month_yoy", "asin_lqs_rating", "asin_lqs_rating_detail",
"asin_lob_info", "is_contains_lob_info", "is_package_quantity_abnormal", "category",
"zr_flow_proportion", "matrix_flow_proportion", "matrix_ao_val", "product_features", "img_info",
"collapse_asin", F.col("follow_sellers").alias("follow_sellers_count"), "seller_json",
......@@ -708,13 +719,25 @@ class KafkaRankAsinDetail(Templates):
select asin, asin_ao_val as previous_asin_ao_val, asin_price as previous_asin_price,
variation_num as previous_asin_variation_num, asin_rating as previous_asin_rating,
asin_total_comments as previous_asin_total_comments, first_category_rank as previous_first_category_rank,
bsr_orders as previous_asin_bsr_orders, sales as previous_sales
bsr_orders as previous_asin_bsr_orders, sales as previous_sales, asin_bought_month as previous_asin_bought_month
from dwt_flow_asin where site_name = '{self.site_name}' and date_type = '30day'
"""
print("sql=", sql)
self.df_previous_flow_asin = self.spark.sql(sqlQuery=sql)
self.df_previous_flow_asin = self.df_previous_flow_asin.repartition(self.repartition_num).persist(StorageLevel.DISK_ONLY)
self.df_previous_flow_asin.show(10, truncate=False)
print("1b. 读取同比去年的flow_asin")
sql = f"""
select asin, asin_ao_val as lastyear_asin_ao_val, asin_price as lastyear_asin_price,
variation_num as lastyear_asin_variation_num, asin_rating as lastyear_asin_rating,
asin_total_comments as lastyear_asin_total_comments, first_category_rank as lastyear_first_category_rank,
bsr_orders as lastyear_asin_bsr_orders, sales as lastyear_sales, asin_bought_month as lastyear_asin_bought_month
from dwt_flow_asin where site_name = '{self.site_name}' and date_type = 'month' and date_info = '{self.date_info_last_year}'
"""
print("sql=", sql)
self.df_previous_flow_asin_lastyear = self.spark.sql(sqlQuery=sql)
self.df_previous_flow_asin_lastyear = self.df_previous_flow_asin_lastyear.repartition(self.repartition_num).persist(StorageLevel.DISK_ONLY)
self.df_previous_flow_asin_lastyear.show(10, truncate=False)
print("2. 获取卖家相关信息")
sql = f"""
select fd_unique as seller_id, fd_account_name as account_name, upper(fd_country_name) as seller_country_name, asin, updated_at
......
......@@ -502,6 +502,30 @@ class EsUtils(object):
"type": "float"
}
}
},
"rank_yoy": {
"type": "float"
},
"ao_yoy": {
"type": "float"
},
"price_yoy": {
"type": "float"
},
"rating_yoy": {
"type": "float"
},
"comments_yoy": {
"type": "float"
},
"bsr_orders_yoy": {
"type": "float"
},
"sales_yoy": {
"type": "float"
},
"variation_yoy": {
"type": "float"
}
}
}
......@@ -972,6 +996,36 @@ class EsUtils(object):
"type": "float"
}
}
},
"rank_yoy": {
"type": "float"
},
"ao_yoy": {
"type": "float"
},
"price_yoy": {
"type": "float"
},
"rating_yoy": {
"type": "float"
},
"comments_yoy": {
"type": "float"
},
"bsr_orders_yoy": {
"type": "float"
},
"sales_yoy": {
"type": "float"
},
"variation_yoy": {
"type": "float"
},
"bought_month_mom": {
"type": "float"
},
"bought_month_yoy": {
"type": "float"
}
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment