Commit b318040a by chenyuanjie

amazon_label脏数据处理

parent a8a463a2
...@@ -162,7 +162,7 @@ class DimAsinDetail(object): ...@@ -162,7 +162,7 @@ class DimAsinDetail(object):
get_json_object(product_json, '$.Color') as product_json_color, get_json_object(product_json, '$.Color') as product_json_color,
get_json_object(product_json, '$.Number of Items') as product_json_number_of_items, get_json_object(product_json, '$.Number of Items') as product_json_number_of_items,
current_asin, current_asin,
get_json_object(amazon_label, '$.badge_type') as amazon_label nullif(get_json_object(amazon_label, '$.badge_type'), 'unknown') as amazon_label
from ods_asin_detail where site_name='{self.site_name}' {self.date_sql}""" from ods_asin_detail where site_name='{self.site_name}' {self.date_sql}"""
print(sql) print(sql)
self.df_asin_detail = self.spark.sql(sqlQuery=sql) self.df_asin_detail = self.spark.sql(sqlQuery=sql)
......
...@@ -809,7 +809,10 @@ class KafkaFlowAsinDetail(Templates): ...@@ -809,7 +809,10 @@ class KafkaFlowAsinDetail(Templates):
.withColumn("bsr_best_orders_type", F.lit(-1))\ .withColumn("bsr_best_orders_type", F.lit(-1))\
.withColumn("img_type_arr", F.split(F.col("img_type"), ","))\ .withColumn("img_type_arr", F.split(F.col("img_type"), ","))\
.withColumn("img_type_arr", F.expr("transform(img_type_arr, x -> cast(x as int))"))\ .withColumn("img_type_arr", F.expr("transform(img_type_arr, x -> cast(x as int))"))\
.withColumn("amazon_label", F.get_json_object(F.col("amazon_label"), "$.badge_type")) .withColumn("amazon_label", F.when(
F.get_json_object(F.col("amazon_label"), "$.badge_type") != "unknown",
F.get_json_object(F.col("amazon_label"), "$.badge_type")
))
df_save = df.select("asin", "ao_val", "zr_counts", "sp_counts", "sb_counts", "vi_counts", "bs_counts", "ac_counts", df_save = df.select("asin", "ao_val", "zr_counts", "sp_counts", "sb_counts", "vi_counts", "bs_counts", "ac_counts",
"tr_counts", "er_counts", "bsr_orders", "bsr_orders_sale", "title", "title_len", "price", "tr_counts", "er_counts", "bsr_orders", "bsr_orders_sale", "title", "title_len", "price",
"rating", "total_comments", "buy_box_seller_type", "page_inventory", "volume", "weight", "color", "rating", "total_comments", "buy_box_seller_type", "page_inventory", "volume", "weight", "color",
......
...@@ -193,7 +193,8 @@ class KafkaRankAsinDetail(Templates): ...@@ -193,7 +193,8 @@ class KafkaRankAsinDetail(Templates):
StructField("img_list", StringType(), True), StructField("img_list", StringType(), True),
StructField("follow_sellers", IntegerType(), True), StructField("follow_sellers", IntegerType(), True),
StructField("fbm_delivery_price", FloatType(), True), StructField("fbm_delivery_price", FloatType(), True),
StructField("product_json", StringType(), True) StructField("product_json", StringType(), True),
StructField("amazon_label", StringType(), True)
]) ])
return schema return schema
...@@ -806,7 +807,11 @@ class KafkaRankAsinDetail(Templates): ...@@ -806,7 +807,11 @@ class KafkaRankAsinDetail(Templates):
.withColumn("collapse_asin", F.coalesce(F.col("parent_asin"), F.col("asin")))\ .withColumn("collapse_asin", F.coalesce(F.col("parent_asin"), F.col("asin")))\
.withColumn("bsr_best_orders_type", F.lit(-1))\ .withColumn("bsr_best_orders_type", F.lit(-1))\
.withColumn("img_type_arr", F.split(F.col("img_type"), ","))\ .withColumn("img_type_arr", F.split(F.col("img_type"), ","))\
.withColumn("img_type_arr", F.expr("transform(img_type_arr, x -> cast(x as int))")) .withColumn("img_type_arr", F.expr("transform(img_type_arr, x -> cast(x as int))"))\
.withColumn("amazon_label", F.when(
F.get_json_object(F.col("amazon_label"), "$.badge_type") != "unknown",
F.get_json_object(F.col("amazon_label"), "$.badge_type")
))
df_save = df.select("asin", "ao_val", "zr_counts", "sp_counts", "sb_counts", "vi_counts", "bs_counts", "ac_counts", df_save = df.select("asin", "ao_val", "zr_counts", "sp_counts", "sb_counts", "vi_counts", "bs_counts", "ac_counts",
"tr_counts", "er_counts", "bsr_orders", "bsr_orders_sale", "title", "title_len", "price", "tr_counts", "er_counts", "bsr_orders", "bsr_orders_sale", "title", "title_len", "price",
"rating", "total_comments", "buy_box_seller_type", "page_inventory", "volume", "weight", "color", "rating", "total_comments", "buy_box_seller_type", "page_inventory", "volume", "weight", "color",
...@@ -828,7 +833,7 @@ class KafkaRankAsinDetail(Templates): ...@@ -828,7 +833,7 @@ class KafkaRankAsinDetail(Templates):
F.col("describe").alias("asin_describe"), F.round("fbm_delivery_price", 2).alias("fbm_price"), F.col("describe").alias("asin_describe"), F.round("fbm_delivery_price", 2).alias("fbm_price"),
"asin_source_flag", "bsr_last_seen_at", "bsr_seen_count_30d", "nsr_last_seen_at", "nsr_seen_count_30d", "asin_source_flag", "bsr_last_seen_at", "bsr_seen_count_30d", "nsr_last_seen_at", "nsr_seen_count_30d",
"describe_len", "tracking_since", "tracking_since_type", "profit_key", "profit_rate_extra", "img_type_arr", "describe_len", "tracking_since", "tracking_since_type", "profit_key", "profit_rate_extra", "img_type_arr",
"multi_color_flag", "multi_color_str") "multi_color_flag", "multi_color_str", "amazon_label")
df_save = df_save.na.fill( df_save = df_save.na.fill(
{"zr_counts": 0, "sp_counts": 0, "sb_counts": 0, "vi_counts": 0, "bs_counts": 0, "ac_counts": 0, {"zr_counts": 0, "sp_counts": 0, "sb_counts": 0, "vi_counts": 0, "bs_counts": 0, "ac_counts": 0,
"tr_counts": 0, "er_counts": 0, "title_len": 0, "total_comments": 0, "variation_num": 0, "img_num": 0, "tr_counts": 0, "er_counts": 0, "title_len": 0, "total_comments": 0, "variation_num": 0, "img_num": 0,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment