Commit dbb59488 by chenyuanjie

流量选品实时任务增加五点描述

parent fa8a7fbb
......@@ -477,7 +477,7 @@ class KafkaFlowAsinDetail(Templates):
F.col("highlight_rating"), F.col("title_len_rating"), F.col("title_brand_rating"),
F.col("img_num_rating"), F.col("img_enlarge_rating")))
)
df = df.drop("product_description", "describe", "image_view", "category_node_rating", "zr_rating", "sp_rating",
df = df.drop("product_description", "image_view", "category_node_rating", "zr_rating", "sp_rating",
"a_add_rating", "video_rating", "brand_rating", "product_describe_rating", "highlight_rating",
"title_len_rating", "title_brand_rating", "img_num_rating", "img_enlarge_rating")
return df
......@@ -610,7 +610,7 @@ class KafkaFlowAsinDetail(Templates):
"site_name", "asin_bought_month", "asin_lqs_rating", "asin_lqs_rating_detail",
"asin_lob_info", "is_contains_lob_info", "is_package_quantity_abnormal", "category",
"zr_flow_proportion", "matrix_flow_proportion", "matrix_ao_val", "product_features", "img_info",
"collapse_asin", F.col("follow_sellers").alias("follow_sellers_count"), "seller_json")
"collapse_asin", F.col("follow_sellers").alias("follow_sellers_count"), "seller_json", F.col("describe").alias("asin_describe"))
df_save = df_save.na.fill(
{"zr_counts": 0, "sp_counts": 0, "sb_counts": 0, "vi_counts": 0, "bs_counts": 0, "ac_counts": 0,
"tr_counts": 0, "er_counts": 0, "title_len": 0, "total_comments": 0, "variation_num": 0, "img_num": 0,
......@@ -856,7 +856,8 @@ class KafkaFlowAsinDetail(Templates):
F.col("category_id").alias("category_current_id"),
F.col("first_category_rank").alias("category_first_rank"),
F.col("current_category_rank").alias("category_current_rank"), "asin_type",
"bsr_orders", "bsr_orders_sale", "page_inventory", "asin_bought_month", "seller_json", "buy_box_seller_type")
"bsr_orders", "bsr_orders_sale", "page_inventory", "asin_bought_month", "seller_json",
"buy_box_seller_type", "asin_describe")
df = df.drop("category", "seller_json")
df.write.format("org.elasticsearch.spark.sql").options(**self.es_options).mode("append").save()
end_time = time.time()
......@@ -870,7 +871,7 @@ class KafkaFlowAsinDetail(Templates):
account_name, account_id, seller_country_name, category_first_id, parent_asin, variation_num, img_info,
asin_crawl_date, asin_price, asin_rating, asin_total_comments, matrix_ao_val, zr_flow_proportion, matrix_flow_proportion,
date_info, img_url, category_current_id, category_first_rank, category_current_rank, asin_type, bsr_orders, bsr_orders_sale,
page_inventory, asin_bought_month, seller_json, buy_box_seller_type"""
page_inventory, asin_bought_month, seller_json, buy_box_seller_type, asin_describe"""
DorisHelper.spark_export_with_columns(df_save=df_asin_latest_detail, db_name=self.doris_db, table_name=self.asin_latest_detail_table, table_columns=table_columns)
df_asin_latest_detail.unpersist()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment