Commit d059daa2 by chenyuanjie

实时任务-打包数量解析规则迭代

parent 7c8cdf59
...@@ -191,7 +191,8 @@ class KafkaFlowAsinDetail(Templates): ...@@ -191,7 +191,8 @@ class KafkaFlowAsinDetail(Templates):
StructField("customer_reviews_json", StringType(), True), StructField("customer_reviews_json", StringType(), True),
StructField("img_list", StringType(), True), StructField("img_list", StringType(), True),
StructField("follow_sellers", IntegerType(), True), StructField("follow_sellers", IntegerType(), True),
StructField("fbm_delivery_price", FloatType(), True) StructField("fbm_delivery_price", FloatType(), True),
StructField("product_json", StringType(), True)
]) ])
return schema return schema
...@@ -449,15 +450,26 @@ class KafkaFlowAsinDetail(Templates): ...@@ -449,15 +450,26 @@ class KafkaFlowAsinDetail(Templates):
withColumn("title_package_quantity_is_abnormal", df.title_parse.getField("is_package_quantity_abnormal")). \ withColumn("title_package_quantity_is_abnormal", df.title_parse.getField("is_package_quantity_abnormal")). \
withColumn("variat_package_quantity_is_abnormal", df.variat_parse.getField("is_package_quantity_abnormal")). \ withColumn("variat_package_quantity_is_abnormal", df.variat_parse.getField("is_package_quantity_abnormal")). \
drop("title_parse", "variat_parse", "variat_attribute") drop("title_parse", "variat_parse", "variat_attribute")
# Number of Items:从 product_json 提取,cast 失败(脏数据)自动为 null,提取后立即 drop
df = df.withColumn(
"number_of_items",
F.get_json_object(F.col("product_json"), "$.Number of Items").cast("int")
).drop("product_json")
# 优先级:Number of Items > 属性字段 > 标题解析 > 默认1
df = df.withColumn( df = df.withColumn(
"package_quantity", F.expr(""" "package_quantity", F.expr("""
CASE WHEN title_package_quantity is null and variat_package_quantity is not null THEN variat_package_quantity CASE WHEN number_of_items IS NOT NULL AND number_of_items > 0 THEN number_of_items
WHEN title_package_quantity is not null THEN title_package_quantity ELSE 1 END""") WHEN variat_package_quantity IS NOT NULL THEN variat_package_quantity
WHEN title_package_quantity IS NOT NULL THEN title_package_quantity
ELSE 1 END""")
).withColumn( ).withColumn(
"is_package_quantity_abnormal", F.expr(""" "is_package_quantity_abnormal", F.expr("""
CASE WHEN title_package_quantity is null and variat_package_quantity is not null THEN variat_package_quantity_is_abnormal CASE WHEN number_of_items IS NOT NULL AND number_of_items > 0 THEN 0
WHEN title_package_quantity is not null THEN title_package_quantity_is_abnormal ELSE 2 END""") WHEN variat_package_quantity IS NOT NULL THEN variat_package_quantity_is_abnormal
).drop("title_package_quantity", "variat_package_quantity", "title_package_quantity_is_abnormal", "variat_package_quantity_is_abnormal") WHEN title_package_quantity IS NOT NULL THEN title_package_quantity_is_abnormal
ELSE 2 END""")
).drop("number_of_items", "title_package_quantity", "variat_package_quantity",
"title_package_quantity_is_abnormal", "variat_package_quantity_is_abnormal")
df = df.withColumn("title", F.lower(F.col("title"))) df = df.withColumn("title", F.lower(F.col("title")))
df = df.join(self.df_user_package_num, on=['asin', 'title'], how='left') df = df.join(self.df_user_package_num, on=['asin', 'title'], how='left')
df = df.withColumn("package_quantity", F.coalesce(F.col("user_package_num"), F.col("package_quantity"))). \ df = df.withColumn("package_quantity", F.coalesce(F.col("user_package_num"), F.col("package_quantity"))). \
......
...@@ -190,7 +190,8 @@ class KafkaRankAsinDetail(Templates): ...@@ -190,7 +190,8 @@ class KafkaRankAsinDetail(Templates):
StructField("customer_reviews_json", StringType(), True), StructField("customer_reviews_json", StringType(), True),
StructField("img_list", StringType(), True), StructField("img_list", StringType(), True),
StructField("follow_sellers", IntegerType(), True), StructField("follow_sellers", IntegerType(), True),
StructField("fbm_delivery_price", FloatType(), True) StructField("fbm_delivery_price", FloatType(), True),
StructField("product_json", StringType(), True)
]) ])
return schema return schema
...@@ -448,15 +449,26 @@ class KafkaRankAsinDetail(Templates): ...@@ -448,15 +449,26 @@ class KafkaRankAsinDetail(Templates):
withColumn("title_package_quantity_is_abnormal", df.title_parse.getField("is_package_quantity_abnormal")). \ withColumn("title_package_quantity_is_abnormal", df.title_parse.getField("is_package_quantity_abnormal")). \
withColumn("variat_package_quantity_is_abnormal", df.variat_parse.getField("is_package_quantity_abnormal")). \ withColumn("variat_package_quantity_is_abnormal", df.variat_parse.getField("is_package_quantity_abnormal")). \
drop("title_parse", "variat_parse", "variat_attribute") drop("title_parse", "variat_parse", "variat_attribute")
# Number of Items:从 product_json 提取,cast 失败(脏数据)自动为 null,提取后立即 drop
df = df.withColumn(
"number_of_items",
F.get_json_object(F.col("product_json"), "$.Number of Items").cast("int")
).drop("product_json")
# 优先级:Number of Items > 属性字段 > 标题解析 > 默认1
df = df.withColumn( df = df.withColumn(
"package_quantity", F.expr(""" "package_quantity", F.expr("""
CASE WHEN title_package_quantity is null and variat_package_quantity is not null THEN variat_package_quantity CASE WHEN number_of_items IS NOT NULL AND number_of_items > 0 THEN number_of_items
WHEN title_package_quantity is not null THEN title_package_quantity ELSE 1 END""") WHEN variat_package_quantity IS NOT NULL THEN variat_package_quantity
WHEN title_package_quantity IS NOT NULL THEN title_package_quantity
ELSE 1 END""")
).withColumn( ).withColumn(
"is_package_quantity_abnormal", F.expr(""" "is_package_quantity_abnormal", F.expr("""
CASE WHEN title_package_quantity is null and variat_package_quantity is not null THEN variat_package_quantity_is_abnormal CASE WHEN number_of_items IS NOT NULL AND number_of_items > 0 THEN 0
WHEN title_package_quantity is not null THEN title_package_quantity_is_abnormal ELSE 2 END""") WHEN variat_package_quantity IS NOT NULL THEN variat_package_quantity_is_abnormal
).drop("title_package_quantity", "variat_package_quantity", "title_package_quantity_is_abnormal", "variat_package_quantity_is_abnormal") WHEN title_package_quantity IS NOT NULL THEN title_package_quantity_is_abnormal
ELSE 2 END""")
).drop("number_of_items", "title_package_quantity", "variat_package_quantity",
"title_package_quantity_is_abnormal", "variat_package_quantity_is_abnormal")
df = df.withColumn("title", F.lower(F.col("title"))) df = df.withColumn("title", F.lower(F.col("title")))
df = df.join(self.df_user_package_num, on=['asin', 'title'], how='left') df = df.join(self.df_user_package_num, on=['asin', 'title'], how='left')
df = df.withColumn("package_quantity", F.coalesce(F.col("user_package_num"), F.col("package_quantity"))). \ df = df.withColumn("package_quantity", F.coalesce(F.col("user_package_num"), F.col("package_quantity"))). \
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment