Commit 1a47b299 by chenyuanjie

增加四个字段

parent de54f924
......@@ -201,7 +201,7 @@ class DwtFlowAsin(Templates):
print("2.获取dim_asin_detail,得到asin详情")
sql = f"""
select asin, asin_img_url, lower(asin_title) as asin_title, asin_title_len, asin_price, asin_rating, asin_total_comments,
asin_buy_box_seller_type, seller_json, asin_page_inventory, asin_category_desc, asin_volume, asin_weight, asin_length, asin_width, asin_height,
asin_buy_box_seller_type, seller_json, asin_page_inventory, asin_category_desc, asin_volume, asin_weight, asin_weight_str, asin_length, asin_width, asin_height,
asin_color, asin_size, asin_style, asin_is_sale, asin_launch_time, asin_is_new, asin_img_num, asin_img_type, asin_material,
lower(asin_brand_name) as asin_brand_name, asin_activity_type, act_one_two_val, act_three_four_val, act_five_six_val, act_eight_val,
one_star,two_star, three_star, four_star, five_star, low_star, together_asin, ac_name, variation_num, account_name, account_id, parent_asin,
......@@ -224,8 +224,9 @@ class DwtFlowAsin(Templates):
self.df_bsr_end.show(10, truncate=False)
print("4.获取dim_asin_bs_category,获取分类名称")
sql = f"""
select asin, asin_bs_cate_1_rank as first_category_rank, asin_bs_cate_current_rank as current_category_rank,
asin_bs_cate_1_id as category_first_id, asin_bs_cate_current_id as category_id
select asin, asin_bs_cate_1_rank as first_category_rank, asin_bs_cate_current_rank as current_category_rank,
asin_bs_cate_1_id as category_first_id, asin_bs_cate_current_id as category_id,
last_herf as best_sellers_herf, asin_rank_in_cate as best_sellers_rank
from dim_asin_bs_info where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info = '{self.date_info}'"""
print("sql:" + sql)
self.df_asin_bs_category = self.spark.sql(sqlQuery=sql)
......@@ -550,7 +551,7 @@ class DwtFlowAsin(Templates):
AND desc_category_first_id in {need_categories} THEN 1
WHEN asin NOT LIKE 'B0%' THEN 1
ELSE 0 END"""))
self.df_asin_detail = self.df_asin_detail.drop("desc_category_first_name", "desc_category_first_id")
self.df_asin_detail = self.df_asin_detail.drop("desc_category_first_name")
self.df_asin_detail = self.df_asin_detail.withColumn("asin_type", F.expr("""
CASE WHEN asin_is_self=1 THEN 1 WHEN asin_is_need=1 THEN 2 WHEN asin_is_hide=1 THEN 3 ELSE 0 END"""
)).drop("asin_is_self", "asin_is_need", "asin_is_hide")
......@@ -759,7 +760,8 @@ class DwtFlowAsin(Templates):
F.lit(None).alias("asin_operate_fee"), F.lit(None).alias("asin_air_freight_gross_margin"),
F.lit(None).alias("asin_ocean_freight_gross_margin"), "asin_crawl_date",
F.lit(None).alias("asin_package_quantity"), F.lit(None).alias("asin_pattern_name"),
"category_first_id", "category_id", "first_category_rank", "current_category_rank",
"category_first_id", "category_id", "desc_category_first_id",
"first_category_rank", "current_category_rank",
"asin_weight_ratio", "asin_bought_month", F.lit(None).alias("buy_data_bought_week"),
F.lit(None).alias("buy_data_viewed_month"), F.lit(None).alias("buy_data_viewed_week"),
F.lit(None).alias("theme_en"), F.lit(None).alias("theme_label_en"), "asin_lqs_rating",
......@@ -768,6 +770,7 @@ class DwtFlowAsin(Templates):
"asin_bought_mom", "asin_bought_yoy", "describe_len", "tracking_since", "tracking_since_type",
"asin_source_flag", "bsr_last_seen_at", "bsr_seen_count_30d", "nsr_last_seen_at", "nsr_seen_count_30d",
"multi_color_flag", "multi_color_str", "amazon_label",
"asin_weight_str", "best_sellers_herf", "best_sellers_rank",
F.lit(self.site_name).alias("site_name"), F.lit(self.date_type).alias("date_type"),
F.lit(self.date_info).alias("date_info"))
self.df_save = self.df_save.na.fill(
......@@ -808,25 +811,43 @@ class DwtFlowAsin(Templates):
print("往doris存储最新asin详情信息:")
df_doris = self.df_save.\
select("asin", "asin_ao_val", "asin_title", "asin_title_len", "asin_category_desc", "asin_volume",
"asin_weight", "asin_launch_time", "asin_brand_name", "one_star", "two_star", "three_star",
"asin_weight", "asin_weight_str",
# Doris DATETIME 严格类型检查,string → timestamp
F.to_timestamp(F.col("asin_launch_time")).alias("asin_launch_time"),
"asin_brand_name", "one_star", "two_star", "three_star",
"four_star", "five_star", "low_star", "account_name", "account_id", "seller_country_name",
"category_first_id", "parent_asin", "variation_num", "img_info", "asin_crawl_date", "asin_price",
"asin_rating", "asin_total_comments", "matrix_ao_val", "zr_flow_proportion",
"matrix_flow_proportion", "date_info", F.col("asin_img_url").alias("img_url"),
"category_first_id", "parent_asin", "variation_num", "img_info",
F.col("asin_img_url").alias("img_url"),
F.col("category_id").alias("category_current_id"),
F.col("first_category_rank").alias("category_first_rank"),
F.col("current_category_rank").alias("category_current_rank"), "asin_type",
F.col("current_category_rank").alias("category_current_rank"),
"best_sellers_rank", "best_sellers_herf",
F.to_timestamp(F.col("asin_crawl_date")).alias("asin_crawl_date"),
"asin_price", "asin_rating", "asin_total_comments",
"matrix_ao_val", "zr_flow_proportion", "matrix_flow_proportion",
"date_info",
"bsr_orders", F.col("sales").alias("bsr_orders_sale"),
F.col("asin_page_inventory").alias("page_inventory"), "asin_bought_month", "seller_json",
F.col("asin_buy_box_seller_type").alias("buy_box_seller_type"), "asin_describe", "asin_fbm_price",
F.col("describe_len").alias("asin_describe_len")
F.col("asin_page_inventory").cast('tinyint').alias("page_inventory"),
"asin_bought_month", "seller_json",
F.col("asin_buy_box_seller_type").cast('tinyint').alias("buy_box_seller_type"),
"asin_describe", "asin_fbm_price",
F.col("describe_len").alias("asin_describe_len"),
"asin_type"
)
table_columns = """asin, asin_ao_val, asin_title, asin_title_len, asin_category_desc, asin_volume,
asin_weight, asin_launch_time, asin_brand_name, one_star, two_star, three_star, four_star, five_star, low_star,
account_name, account_id, seller_country_name, category_first_id, parent_asin, variation_num, img_info,
asin_crawl_date, asin_price, asin_rating, asin_total_comments, matrix_ao_val, zr_flow_proportion, matrix_flow_proportion,
date_info, img_url, category_current_id, category_first_rank, category_current_rank, asin_type, bsr_orders, bsr_orders_sale,
page_inventory, asin_bought_month, seller_json, buy_box_seller_type, asin_describe, asin_fbm_price, asin_describe_len"""
table_columns = """asin, asin_ao_val, asin_title, asin_title_len, asin_category_desc, asin_volume,
asin_weight, asin_weight_str, asin_launch_time, asin_brand_name,
one_star, two_star, three_star, four_star, five_star, low_star,
account_name, account_id, seller_country_name,
category_first_id, parent_asin, variation_num, img_info, img_url,
category_current_id, category_first_rank, category_current_rank,
best_sellers_rank, best_sellers_herf,
asin_crawl_date, asin_price, asin_rating, asin_total_comments,
matrix_ao_val, zr_flow_proportion, matrix_flow_proportion,
date_info,
bsr_orders, bsr_orders_sale, page_inventory, asin_bought_month,
seller_json, buy_box_seller_type,
asin_describe, asin_fbm_price, asin_describe_len,
asin_type"""
DorisHelper.spark_export_with_columns(df_save=df_doris, db_name=self.doris_db, table_name=self.asin_latest_detail_table, table_columns=table_columns)
print("save asin_latest_detail success")
else:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment