Commit 81ba7508 by chenyuanjie

fix

parent 0cd7f499
......@@ -114,10 +114,15 @@ class EsStDetail(TemplatesMysql):
print("sql:", sql)
self.df_profit_rate = self.spark.sql(sqlQuery=sql).repartition(40, 'asin').withColumn(
"profit_rate_extra",
F.when(
F.col("ocean_profit").isNull() & F.col("air_profit").isNull(),
F.lit(None)
).otherwise(
F.struct(
F.col("ocean_profit").alias("ocean_profit"),
F.col("air_profit").alias("air_profit")
)
)
).drop("ocean_profit", "air_profit")
self.df_synchronize = self.df_synchronize.join(
......
......@@ -657,10 +657,15 @@ class KafkaFlowAsinDetail(Templates):
'profit_key', F.concat_ws("_", F.col("asin"), F.col("price"))
).withColumn(
"profit_rate_extra",
F.when(
F.col("ocean_profit").isNull() & F.col("air_profit").isNull(),
F.lit(None)
).otherwise(
F.struct(
F.col("ocean_profit").alias("ocean_profit"),
F.col("air_profit").alias("air_profit")
)
)
).drop('ocean_profit', 'air_profit')
return df
......@@ -676,8 +681,8 @@ class KafkaFlowAsinDetail(Templates):
.withColumnRenamed("customer_reviews_json", "product_features")\
.withColumn("collapse_asin", F.coalesce(F.col("parent_asin"), F.col("asin")))\
.withColumn("bsr_best_orders_type", F.lit(-1))\
.withColumn("img_type", F.split(F.col("img_type"), ","))\
.withColumn("img_type", F.expr("transform(img_type, x -> cast(x as int))"))
.withColumn("img_type_arr", F.split(F.col("img_type"), ","))\
.withColumn("img_type_arr", F.expr("transform(img_type_arr, x -> cast(x as int))"))
df_save = df.select("asin", "ao_val", "zr_counts", "sp_counts", "sb_counts", "vi_counts", "bs_counts", "ac_counts",
"tr_counts", "er_counts", "bsr_orders", "bsr_orders_sale", "title", "title_len", "price",
"rating", "total_comments", "buy_box_seller_type", "page_inventory", "volume", "weight", "color",
......@@ -698,7 +703,7 @@ class KafkaFlowAsinDetail(Templates):
"collapse_asin", F.col("follow_sellers").alias("follow_sellers_count"), "seller_json",
F.col("describe").alias("asin_describe"), F.round("fbm_delivery_price", 2).alias("fbm_price"),
"asin_source_flag", "bsr_last_seen_at", "bsr_seen_count_30d", "nsr_last_seen_at", "nsr_seen_count_30d",
"describe_len", "tracking_since", "tracking_since_type", "profit_key", "profit_rate_extra")
"describe_len", "tracking_since", "tracking_since_type", "profit_key", "profit_rate_extra", "img_type_arr")
df_save = df_save.na.fill(
{"zr_counts": 0, "sp_counts": 0, "sb_counts": 0, "vi_counts": 0, "bs_counts": 0, "ac_counts": 0,
"tr_counts": 0, "er_counts": 0, "title_len": 0, "total_comments": 0, "variation_num": 0, "img_num": 0,
......
......@@ -656,10 +656,15 @@ class KafkaRankAsinDetail(Templates):
'profit_key', F.concat_ws("_", F.col("asin"), F.col("price"))
).withColumn(
"profit_rate_extra",
F.when(
F.col("ocean_profit").isNull() & F.col("air_profit").isNull(),
F.lit(None)
).otherwise(
F.struct(
F.col("ocean_profit").alias("ocean_profit"),
F.col("air_profit").alias("air_profit")
)
)
).drop('ocean_profit', 'air_profit')
return df
......@@ -675,8 +680,8 @@ class KafkaRankAsinDetail(Templates):
.withColumnRenamed("customer_reviews_json", "product_features")\
.withColumn("collapse_asin", F.coalesce(F.col("parent_asin"), F.col("asin")))\
.withColumn("bsr_best_orders_type", F.lit(-1))\
.withColumn("img_type", F.split(F.col("img_type"), ","))\
.withColumn("img_type", F.expr("transform(img_type, x -> cast(x as int))"))
.withColumn("img_type_arr", F.split(F.col("img_type"), ","))\
.withColumn("img_type_arr", F.expr("transform(img_type_arr, x -> cast(x as int))"))
df_save = df.select("asin", "ao_val", "zr_counts", "sp_counts", "sb_counts", "vi_counts", "bs_counts", "ac_counts",
"tr_counts", "er_counts", "bsr_orders", "bsr_orders_sale", "title", "title_len", "price",
"rating", "total_comments", "buy_box_seller_type", "page_inventory", "volume", "weight", "color",
......@@ -697,7 +702,7 @@ class KafkaRankAsinDetail(Templates):
"collapse_asin", F.col("follow_sellers").alias("follow_sellers_count"), "seller_json",
F.col("describe").alias("asin_describe"), F.round("fbm_delivery_price", 2).alias("fbm_price"),
"asin_source_flag", "bsr_last_seen_at", "bsr_seen_count_30d", "nsr_last_seen_at", "nsr_seen_count_30d",
"describe_len", "tracking_since", "tracking_since_type", "profit_key", "profit_rate_extra")
"describe_len", "tracking_since", "tracking_since_type", "profit_key", "profit_rate_extra", "img_type_arr")
df_save = df_save.na.fill(
{"zr_counts": 0, "sp_counts": 0, "sb_counts": 0, "vi_counts": 0, "bs_counts": 0, "ac_counts": 0,
"tr_counts": 0, "er_counts": 0, "title_len": 0, "total_comments": 0, "variation_num": 0, "img_num": 0,
......
......@@ -731,7 +731,7 @@ class EsUtils(object):
"type": "short"
},
"img_type": {
"type": "integer"
"type": "keyword"
},
"activity_type": {
"type": "keyword"
......@@ -1041,6 +1041,9 @@ class EsUtils(object):
},
"bought_month_yoy": {
"type": "float"
},
"img_type_arr": {
"type": "integer"
}
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment