Commit 4af80620 by chenyuanjie

流量选品30天-旧流程临时修改读取数据源

parent 61cd5e9d
......@@ -86,7 +86,8 @@ class KafkaFlowAsinDetail(Templates):
"it": "(\d+) in ",
}
# DataFrame初始化
self.date_info_last_year = CommonUtil.get_month_offset(self.date_info, -12)
self.date_info_last_month = CommonUtil.get_month_offset(self.date_info, -1)
self.date_info_last_year = CommonUtil.get_month_offset(self.date_info, -12)
self.df_previous_flow_asin = self.spark.sql("select 1+1;")
self.df_previous_flow_asin_lastyear = self.spark.sql("select 1+1;")
self.df_seller_info = self.spark.sql("select 1+1;")
......@@ -853,29 +854,50 @@ class KafkaFlowAsinDetail(Templates):
return df_save
def read_data(self):
print("1. 读取上个维度的flow_asin")
sql = f"""
select asin, asin_ao_val as previous_asin_ao_val, asin_price as previous_asin_price,
variation_num as previous_asin_variation_num, asin_rating as previous_asin_rating,
asin_total_comments as previous_asin_total_comments, first_category_rank as previous_first_category_rank,
bsr_orders as previous_asin_bsr_orders, sales as previous_sales, asin_bought_month as previous_asin_bought_month
from dwt_flow_asin where site_name = '{self.site_name}' and date_type = '30day'
"""
print("sql=", sql)
self.df_previous_flow_asin = self.spark.sql(sqlQuery=sql)
self.df_previous_flow_asin = self.df_previous_flow_asin.repartition(self.repartition_num).persist(StorageLevel.DISK_ONLY)
# previous/lastyear 共用:单表读取 + PySpark DataFrame join,不再从 dwt_flow_asin 取数据
# date_type 固定 month;date_info 必传(previous=上个月,lastyear=去年同月)
def _load_baseline(date_info_target, alias_prefix):
where_clause = f"site_name = '{self.site_name}' and date_type = 'month' and date_info = '{date_info_target}'"
sql_measure = f"""
select asin, asin_ao_val, asin_bsr_orders
from dwd_asin_measure
where {where_clause}
"""
sql_detail = f"""
select asin, asin_price, variation_num, asin_rating, asin_total_comments, asin_bought_month
from dim_asin_detail
where {where_clause}
"""
sql_bs = f"""
select asin, asin_bs_cate_1_rank
from dim_asin_bs_info
where {where_clause}
"""
print(f"sql_measure({alias_prefix})=", sql_measure)
print(f"sql_detail ({alias_prefix})=", sql_detail)
print(f"sql_bs ({alias_prefix})=", sql_bs)
df_m = self.spark.sql(sqlQuery=sql_measure).repartition(self.repartition_num, 'asin')
df_d = self.spark.sql(sqlQuery=sql_detail).repartition(self.repartition_num, 'asin')
df_b = self.spark.sql(sqlQuery=sql_bs).repartition(self.repartition_num, 'asin')
df = df_m.join(df_d, on='asin', how='left').join(df_b, on='asin', how='left')
return df.select(
F.col('asin'),
F.round(F.col('asin_ao_val'), 3).alias(f'{alias_prefix}_asin_ao_val'),
F.col('asin_price').alias(f'{alias_prefix}_asin_price'),
F.col('variation_num').alias(f'{alias_prefix}_asin_variation_num'),
F.col('asin_rating').alias(f'{alias_prefix}_asin_rating'),
F.col('asin_total_comments').alias(f'{alias_prefix}_asin_total_comments'),
F.col('asin_bs_cate_1_rank').alias(f'{alias_prefix}_first_category_rank'),
F.col('asin_bsr_orders').alias(f'{alias_prefix}_asin_bsr_orders'),
F.round(F.col('asin_bsr_orders') * F.col('asin_price'), 2).alias(f'{alias_prefix}_sales'),
F.col('asin_bought_month').alias(f'{alias_prefix}_asin_bought_month'),
).persist(StorageLevel.DISK_ONLY)
print(f"1. 读取上个月维度的flow_asin(date_type=month, date_info={self.date_info_last_month})")
self.df_previous_flow_asin = _load_baseline(self.date_info_last_month, 'previous')
self.df_previous_flow_asin.show(10, truncate=False)
print("1b. 读取同比去年的flow_asin")
sql = f"""
select asin, asin_ao_val as lastyear_asin_ao_val, asin_price as lastyear_asin_price,
variation_num as lastyear_asin_variation_num, asin_rating as lastyear_asin_rating,
asin_total_comments as lastyear_asin_total_comments, first_category_rank as lastyear_first_category_rank,
bsr_orders as lastyear_asin_bsr_orders, sales as lastyear_sales, asin_bought_month as lastyear_asin_bought_month
from dwt_flow_asin where site_name = '{self.site_name}' and date_type = 'month' and date_info = '{self.date_info_last_year}'
"""
print("sql=", sql)
self.df_previous_flow_asin_lastyear = self.spark.sql(sqlQuery=sql)
self.df_previous_flow_asin_lastyear = self.df_previous_flow_asin_lastyear.repartition(self.repartition_num).persist(StorageLevel.DISK_ONLY)
print(f"1b. 读取同比去年的flow_asin(date_type=month, date_info={self.date_info_last_year})")
self.df_previous_flow_asin_lastyear = _load_baseline(self.date_info_last_year, 'lastyear')
self.df_previous_flow_asin_lastyear.show(10, truncate=False)
print("2. 获取卖家相关信息")
sql = f"""
......
......@@ -85,7 +85,8 @@ class KafkaRankAsinDetail(Templates):
"it": "(\d+) in ",
}
# DataFrame初始化
self.date_info_last_year = CommonUtil.get_month_offset(self.date_info[:7], -12)
self.date_info_last_month = CommonUtil.get_month_offset(self.date_info[:7], -1)
self.date_info_last_year = CommonUtil.get_month_offset(self.date_info[:7], -12)
self.df_previous_flow_asin = self.spark.sql("select 1+1;")
self.df_previous_flow_asin_lastyear = self.spark.sql("select 1+1;")
self.df_seller_info = self.spark.sql("select 1+1;")
......@@ -852,29 +853,50 @@ class KafkaRankAsinDetail(Templates):
return df_save
def read_data(self):
print("1. 读取上个维度的flow_asin")
sql = f"""
select asin, asin_ao_val as previous_asin_ao_val, asin_price as previous_asin_price,
variation_num as previous_asin_variation_num, asin_rating as previous_asin_rating,
asin_total_comments as previous_asin_total_comments, first_category_rank as previous_first_category_rank,
bsr_orders as previous_asin_bsr_orders, sales as previous_sales, asin_bought_month as previous_asin_bought_month
from dwt_flow_asin where site_name = '{self.site_name}' and date_type = '30day'
"""
print("sql=", sql)
self.df_previous_flow_asin = self.spark.sql(sqlQuery=sql)
self.df_previous_flow_asin = self.df_previous_flow_asin.repartition(self.repartition_num).persist(StorageLevel.DISK_ONLY)
# previous/lastyear 共用:单表读取 + PySpark DataFrame join,不再从 dwt_flow_asin 取数据
# date_type 固定 month;date_info 必传(previous=上个月,lastyear=去年同月)
def _load_baseline(date_info_target, alias_prefix):
where_clause = f"site_name = '{self.site_name}' and date_type = 'month' and date_info = '{date_info_target}'"
sql_measure = f"""
select asin, asin_ao_val, asin_bsr_orders
from dwd_asin_measure
where {where_clause}
"""
sql_detail = f"""
select asin, asin_price, variation_num, asin_rating, asin_total_comments, asin_bought_month
from dim_asin_detail
where {where_clause}
"""
sql_bs = f"""
select asin, asin_bs_cate_1_rank
from dim_asin_bs_info
where {where_clause}
"""
print(f"sql_measure({alias_prefix})=", sql_measure)
print(f"sql_detail ({alias_prefix})=", sql_detail)
print(f"sql_bs ({alias_prefix})=", sql_bs)
df_m = self.spark.sql(sqlQuery=sql_measure).repartition(self.repartition_num, 'asin')
df_d = self.spark.sql(sqlQuery=sql_detail).repartition(self.repartition_num, 'asin')
df_b = self.spark.sql(sqlQuery=sql_bs).repartition(self.repartition_num, 'asin')
df = df_m.join(df_d, on='asin', how='left').join(df_b, on='asin', how='left')
return df.select(
F.col('asin'),
F.round(F.col('asin_ao_val'), 3).alias(f'{alias_prefix}_asin_ao_val'),
F.col('asin_price').alias(f'{alias_prefix}_asin_price'),
F.col('variation_num').alias(f'{alias_prefix}_asin_variation_num'),
F.col('asin_rating').alias(f'{alias_prefix}_asin_rating'),
F.col('asin_total_comments').alias(f'{alias_prefix}_asin_total_comments'),
F.col('asin_bs_cate_1_rank').alias(f'{alias_prefix}_first_category_rank'),
F.col('asin_bsr_orders').alias(f'{alias_prefix}_asin_bsr_orders'),
F.round(F.col('asin_bsr_orders') * F.col('asin_price'), 2).alias(f'{alias_prefix}_sales'),
F.col('asin_bought_month').alias(f'{alias_prefix}_asin_bought_month'),
).persist(StorageLevel.DISK_ONLY)
print(f"1. 读取上个月维度的flow_asin(date_type=month, date_info={self.date_info_last_month})")
self.df_previous_flow_asin = _load_baseline(self.date_info_last_month, 'previous')
self.df_previous_flow_asin.show(10, truncate=False)
print("1b. 读取同比去年的flow_asin")
sql = f"""
select asin, asin_ao_val as lastyear_asin_ao_val, asin_price as lastyear_asin_price,
variation_num as lastyear_asin_variation_num, asin_rating as lastyear_asin_rating,
asin_total_comments as lastyear_asin_total_comments, first_category_rank as lastyear_first_category_rank,
bsr_orders as lastyear_asin_bsr_orders, sales as lastyear_sales, asin_bought_month as lastyear_asin_bought_month
from dwt_flow_asin where site_name = '{self.site_name}' and date_type = 'month' and date_info = '{self.date_info_last_year}'
"""
print("sql=", sql)
self.df_previous_flow_asin_lastyear = self.spark.sql(sqlQuery=sql)
self.df_previous_flow_asin_lastyear = self.df_previous_flow_asin_lastyear.repartition(self.repartition_num).persist(StorageLevel.DISK_ONLY)
print(f"1b. 读取同比去年的flow_asin(date_type=month, date_info={self.date_info_last_year})")
self.df_previous_flow_asin_lastyear = _load_baseline(self.date_info_last_year, 'lastyear')
self.df_previous_flow_asin_lastyear.show(10, truncate=False)
print("2. 获取卖家相关信息")
sql = f"""
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment