Commit 690cb9ca by chenyuanjie

ai_asin_detail_month_date_info新增导出字段

parent 27622540
......@@ -4,7 +4,7 @@ import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.spark_util import SparkUtil
from pyspark.sql import functions as F
from pyspark.sql import functions as F, Window
from utils.db_util import DBUtil
......@@ -19,6 +19,7 @@ class ExportAsinDetailBase(object):
self.df_flow_asin = self.spark.sql(f"select 1+1;")
self.df_fb_info = self.spark.sql(f"select 1+1;")
self.df_asin_detail = self.spark.sql(f"select 1+1;")
self.df_save = self.spark.sql(f"select 1+1;")
def run(self):
......@@ -945,9 +946,32 @@ class ExportAsinDetailBase(object):
print("df_fb_info数据如下:")
self.df_fb_info.show(10, truncate=True)
# 读取review_json_list数据
sql3 = f"""
select
asin,
review_json_list,
updated_at
from ods_asin_detail
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.date_info}'
"""
self.df_asin_detail = self.spark.sql(sqlQuery=sql3)
window = Window.partitionBy(['asin']).orderBy(
self.df_asin_detail.updated_at.desc_nulls_last()
)
self.df_asin_detail = self.df_asin_detail.withColumn(
'rank', F.row_number().over(window=window)
).filter('rank = 1').drop('rank', 'updated_at').repartition(40).cache()
print("df_asin_detail数据如下:")
self.df_asin_detail.show(10, truncate=True)
def handle_data(self):
self.df_save = self.df_flow_asin.join(
self.df_fb_info, on='account_name', how='left'
).join(
self.df_asin_detail, on='asin', how='left'
).select(
"site_name", "asin", F.col("asin_weight").alias("weight"), F.col("asin_bought_month").alias("bought_month"),
F.col("asin_category_desc").alias("category"), F.col("asin_img_url").alias("img"), F.col("asin_title").alias("title"),
......@@ -956,7 +980,7 @@ class ExportAsinDetailBase(object):
F.col("asin_img_num").alias("img_num"), "variation_flag", "variation_num", F.col("asin_ao_val").alias("ao_val"),
F.col("category_first_id").alias("category_id"), F.col("category_id").alias("category_current_id"), "parent_asin",
F.col("first_category_rank").alias("bsr_rank"), F.col("asin_price").alias("price"), F.col("asin_rating").alias("rating"),
F.col("asin_total_comments").alias("total_comments"), "seller_id", "fb_country_name"
F.col("asin_total_comments").alias("total_comments"), "seller_id", "fb_country_name", "review_json_list"
).repartition(40, 'asin')
def save_data(self):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment