Commit 332d6eb2 by chenyuanjie

流量选品30天-增加keepa追踪时间+asin利润率

parent 0510f163
......@@ -100,6 +100,8 @@ class KafkaFlowAsinDetail(Templates):
self.df_asin_category = self.spark.sql("select 1+1;")
self.df_max_bought_month_info_update = self.spark.sql("select 1+1;")
self.df_asin_source_flag = self.spark.sql("select 1+1;")
self.df_keepa_asin = self.spark.sql("select 1+1;")
self.df_asin_profit_rate = self.spark.sql("select 1+1;")
# udf函数注册
package_schema = StructType([
StructField("parse_package_quantity", IntegerType(), True),
......@@ -597,7 +599,44 @@ class KafkaFlowAsinDetail(Templates):
)
return df
# 14. 字段标准化
# 14. 处理keepa追踪时间和利润率
def handle_asin_keepa_profit(self, df):
df = df.join(
self.df_keepa_asin, on=['asin'], how='left'
).join(
self.df_asin_profit_rate, on=['asin', 'price'], how='left'
)
one_month = self.launch_time_interval_dict['one_month']
three_month = self.launch_time_interval_dict['three_month']
six_month = self.launch_time_interval_dict['six_month']
twelve_month = self.launch_time_interval_dict['twelve_month']
twenty_four_month = self.launch_time_interval_dict['twenty_four_month']
thirty_six_month = self.launch_time_interval_dict['thirty_six_month']
expr_str = f"""
CASE WHEN tracking_since >= '{one_month}' THEN 1
WHEN tracking_since >= '{three_month}' AND tracking_since < '{one_month}' THEN 2
WHEN tracking_since >= '{six_month}' AND tracking_since < '{three_month}' THEN 3
WHEN tracking_since >= '{twelve_month}' AND tracking_since < '{six_month}' THEN 4
WHEN tracking_since >= '{twenty_four_month}' AND tracking_since < '{twelve_month}' THEN 5
WHEN tracking_since >= '{thirty_six_month}' AND tracking_since < '{twenty_four_month}' THEN 6
WHEN tracking_since < '{thirty_six_month}' THEN 7 ELSE 0 END"""
df = df.withColumn(
"tracking_since",
F.when(F.col("tracking_since").isNull(), F.lit(None)).otherwise(F.date_format(F.from_unixtime((F.col("tracking_since") + F.lit(21564000)) * 60), "yyyy-MM-dd"))
).withColumn(
"tracking_since_type", F.expr(expr_str)
).withColumn(
'profit_key', F.concat_ws("_", F.col("asin"), F.col("price"))
).withColumn(
"profit_rate_extra",
F.struct(
F.col("ocean_profit").alias("ocean_profit"),
F.col("air_profit").alias("air_profit")
)
).drop('ocean_profit', 'air_profit')
return df
# 15. 字段标准化
def handle_column_name(self, df):
df = df.withColumnRenamed("asin_bs_cate_1_id", "category_first_id")\
.withColumnRenamed("asin_bs_cate_current_id", "category_id") \
......@@ -631,7 +670,7 @@ class KafkaFlowAsinDetail(Templates):
"collapse_asin", F.col("follow_sellers").alias("follow_sellers_count"), "seller_json",
F.col("describe").alias("asin_describe"), F.round("fbm_delivery_price", 2).alias("fbm_price"),
"asin_source_flag", "bsr_last_seen_at", "bsr_seen_count_30d", "nsr_last_seen_at", "nsr_seen_count_30d",
"describe_len")
"describe_len", "tracking_since", "tracking_since_type", "profit_key", "profit_rate_extra")
df_save = df_save.na.fill(
{"zr_counts": 0, "sp_counts": 0, "sb_counts": 0, "vi_counts": 0, "bs_counts": 0, "ac_counts": 0,
"tr_counts": 0, "er_counts": 0, "title_len": 0, "total_comments": 0, "variation_num": 0, "img_num": 0,
......@@ -796,6 +835,22 @@ class KafkaFlowAsinDetail(Templates):
self.df_asin_source_flag = self.df_asin_source_flag.repartition(self.repartition_num).persist(StorageLevel.DISK_ONLY)
self.df_asin_source_flag.show(10, truncate=False)
print("16. 获取keepa数据")
sql = f"""
select asin, tracking_since from dim_keepa_asin_info where site_name = '{self.site_name}'
"""
self.df_keepa_asin = self.spark.sql(sqlQuery=sql)
self.df_keepa_asin = self.df_keepa_asin.repartition(self.repartition_num).persist(StorageLevel.DISK_ONLY)
self.df_keepa_asin.show(10, truncate=False)
print("17. 获取asin利润率数据")
sql = f"""
select asin, price, ocean_profit, air_profit from dim_asin_profit_rate_info where site_name='{self.site_name}'
"""
self.df_asin_profit_rate = self.spark.sql(sqlQuery=sql)
self.df_asin_profit_rate = self.df_asin_profit_rate.repartition(self.repartition_num).persist(StorageLevel.DISK_ONLY)
self.df_asin_profit_rate.show(10, truncate=False)
# 字段处理逻辑综合
def handle_all_field(self, df):
# 1. 处理asin分类及排名以及排名类型字段
......@@ -824,7 +879,9 @@ class KafkaFlowAsinDetail(Templates):
df = self.handle_asin_attribute_change(df)
# 13. 处理不同来源asin
df = self.handle_asin_different_source(df)
# 14. 字段标准化
# 14. 处理keepa追踪时间和利润率
df = self.handle_asin_keepa_profit(df)
# 15. 字段标准化
df_save = self.handle_column_name(df)
return df_save
......
......@@ -99,6 +99,8 @@ class KafkaRankAsinDetail(Templates):
self.df_asin_category = self.spark.sql("select 1+1;")
self.df_max_bought_month_info_update = self.spark.sql("select 1+1;")
self.df_asin_source_flag = self.spark.sql("select 1+1;")
self.df_keepa_asin = self.spark.sql("select 1+1;")
self.df_asin_profit_rate = self.spark.sql("select 1+1;")
# udf函数注册
package_schema = StructType([
StructField("parse_package_quantity", IntegerType(), True),
......@@ -596,7 +598,44 @@ class KafkaRankAsinDetail(Templates):
)
return df
# 14. 字段标准化
# 14. 处理keepa追踪时间和利润率
def handle_asin_keepa_profit(self, df):
df = df.join(
self.df_keepa_asin, on=['asin'], how='left'
).join(
self.df_asin_profit_rate, on=['asin', 'price'], how='left'
)
one_month = self.launch_time_interval_dict['one_month']
three_month = self.launch_time_interval_dict['three_month']
six_month = self.launch_time_interval_dict['six_month']
twelve_month = self.launch_time_interval_dict['twelve_month']
twenty_four_month = self.launch_time_interval_dict['twenty_four_month']
thirty_six_month = self.launch_time_interval_dict['thirty_six_month']
expr_str = f"""
CASE WHEN tracking_since >= '{one_month}' THEN 1
WHEN tracking_since >= '{three_month}' AND tracking_since < '{one_month}' THEN 2
WHEN tracking_since >= '{six_month}' AND tracking_since < '{three_month}' THEN 3
WHEN tracking_since >= '{twelve_month}' AND tracking_since < '{six_month}' THEN 4
WHEN tracking_since >= '{twenty_four_month}' AND tracking_since < '{twelve_month}' THEN 5
WHEN tracking_since >= '{thirty_six_month}' AND tracking_since < '{twenty_four_month}' THEN 6
WHEN tracking_since < '{thirty_six_month}' THEN 7 ELSE 0 END"""
df = df.withColumn(
"tracking_since",
F.when(F.col("tracking_since").isNull(), F.lit(None)).otherwise(F.date_format(F.from_unixtime((F.col("tracking_since") + F.lit(21564000)) * 60), "yyyy-MM-dd"))
).withColumn(
"tracking_since_type", F.expr(expr_str)
).withColumn(
'profit_key', F.concat_ws("_", F.col("asin"), F.col("price"))
).withColumn(
"profit_rate_extra",
F.struct(
F.col("ocean_profit").alias("ocean_profit"),
F.col("air_profit").alias("air_profit")
)
).drop('ocean_profit', 'air_profit')
return df
# 15. 字段标准化
def handle_column_name(self, df):
df = df.withColumnRenamed("asin_bs_cate_1_id", "category_first_id")\
.withColumnRenamed("asin_bs_cate_current_id", "category_id") \
......@@ -630,7 +669,7 @@ class KafkaRankAsinDetail(Templates):
"collapse_asin", F.col("follow_sellers").alias("follow_sellers_count"), "seller_json",
F.col("describe").alias("asin_describe"), F.round("fbm_delivery_price", 2).alias("fbm_price"),
"asin_source_flag", "bsr_last_seen_at", "bsr_seen_count_30d", "nsr_last_seen_at", "nsr_seen_count_30d",
"describe_len")
"describe_len", "tracking_since", "tracking_since_type", "profit_key", "profit_rate_extra")
df_save = df_save.na.fill(
{"zr_counts": 0, "sp_counts": 0, "sb_counts": 0, "vi_counts": 0, "bs_counts": 0, "ac_counts": 0,
"tr_counts": 0, "er_counts": 0, "title_len": 0, "total_comments": 0, "variation_num": 0, "img_num": 0,
......@@ -795,6 +834,22 @@ class KafkaRankAsinDetail(Templates):
self.df_asin_source_flag = self.df_asin_source_flag.repartition(self.repartition_num).persist(StorageLevel.DISK_ONLY)
self.df_asin_source_flag.show(10, truncate=False)
print("16. 获取keepa数据")
sql = f"""
select asin, tracking_since from dim_keepa_asin_info where site_name = '{self.site_name}'
"""
self.df_keepa_asin = self.spark.sql(sqlQuery=sql)
self.df_keepa_asin = self.df_keepa_asin.repartition(self.repartition_num).persist(StorageLevel.DISK_ONLY)
self.df_keepa_asin.show(10, truncate=False)
print("17. 获取asin利润率数据")
sql = f"""
select asin, price, ocean_profit, air_profit from dim_asin_profit_rate_info where site_name='{self.site_name}'
"""
self.df_asin_profit_rate = self.spark.sql(sqlQuery=sql)
self.df_asin_profit_rate = self.df_asin_profit_rate.repartition(self.repartition_num).persist(StorageLevel.DISK_ONLY)
self.df_asin_profit_rate.show(10, truncate=False)
# 字段处理逻辑综合
def handle_all_field(self, df):
# 1. 处理asin分类及排名以及排名类型字段
......@@ -823,7 +878,9 @@ class KafkaRankAsinDetail(Templates):
df = self.handle_asin_attribute_change(df)
# 13. 处理不同来源asin
df = self.handle_asin_different_source(df)
# 14. 字段标准化
# 14. 处理keepa追踪时间和利润率
df = self.handle_asin_keepa_profit(df)
# 15. 字段标准化
df_save = self.handle_column_name(df)
return df_save
......
......@@ -952,6 +952,26 @@ class EsUtils(object):
},
"describe_len": {
"type": "integer"
},
"tracking_since": {
"type": "date"
},
"tracking_since_type": {
"type": "short"
},
"profit_key": {
"type": "keyword"
},
"profit_rate_extra": {
"type": "object",
"properties": {
"ocean_profit": {
"type": "float"
},
"air_profit": {
"type": "float"
}
}
}
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment