Commit c965875b by chenyuanjie

关联流量-新增字段“引流时间”

parent e607324a
...@@ -41,6 +41,7 @@ class DimAsinRelatedTraffic(object): ...@@ -41,6 +41,7 @@ class DimAsinRelatedTraffic(object):
self.df_compare_similar_asin_json = self.spark.sql(f"select 1+1;") self.df_compare_similar_asin_json = self.spark.sql(f"select 1+1;")
self.df_bundles_this_asins_json = self.spark.sql(f"select 1+1;") self.df_bundles_this_asins_json = self.spark.sql(f"select 1+1;")
self.df_save = self.spark.sql(f"select 1+1;") self.df_save = self.spark.sql(f"select 1+1;")
self.df_updated_at = self.spark.sql(f"select 1+1;")
self.u_categorize_flow = F.udf(self.categorize_flow, StringType()) self.u_categorize_flow = F.udf(self.categorize_flow, StringType())
self.u_merge_df = F.udf(self.merge_df, StringType()) self.u_merge_df = F.udf(self.merge_df, StringType())
...@@ -158,7 +159,7 @@ class DimAsinRelatedTraffic(object): ...@@ -158,7 +159,7 @@ class DimAsinRelatedTraffic(object):
result_list_json, result_list_json,
bundles_this_asins_json, bundles_this_asins_json,
updated_at updated_at
from ods_asin_related_traffic from ods_asin_detail
where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}' and asin is not null; where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}' and asin is not null;
""" """
self.df_asin_detail = self.spark.sql(sqlQuery=sql) self.df_asin_detail = self.spark.sql(sqlQuery=sql)
...@@ -185,7 +186,7 @@ class DimAsinRelatedTraffic(object): ...@@ -185,7 +186,7 @@ class DimAsinRelatedTraffic(object):
self.df_self_asin_detail, allowMissingColumns=False self.df_self_asin_detail, allowMissingColumns=False
).withColumn( ).withColumn(
"dt_rank", F.row_number().over(window=window) "dt_rank", F.row_number().over(window=window)
).filter("dt_rank=1").drop("updated_at", "dt_rank").cache() ).filter("dt_rank=1").drop("dt_rank").cache()
print("详情数据如下:") print("详情数据如下:")
self.df_asin_detail.show(10, True) self.df_asin_detail.show(10, True)
...@@ -311,7 +312,16 @@ class DimAsinRelatedTraffic(object): ...@@ -311,7 +312,16 @@ class DimAsinRelatedTraffic(object):
else: else:
main_df = main_df.join(df, "asin", "full") main_df = main_df.join(df, "asin", "full")
self.df_save = main_df.cache() self.df_save = main_df
# 关联asin抓取时间
self.df_updated_at = self.df_asin_detail.select(
'asin', 'updated_at'
).withColumn(
'updated_at', F.substring('updated_at', 1, 10)
)
self.df_save = self.df_save.join(
self.df_updated_at, 'asin', 'left'
).cache()
print("最终合并结果如下:") print("最终合并结果如下:")
self.df_save.show(10, True) self.df_save.show(10, True)
......
...@@ -61,7 +61,8 @@ class DwtAsinRelatedTraffic(object): ...@@ -61,7 +61,8 @@ class DwtAsinRelatedTraffic(object):
more_relevant, more_relevant,
bought_and_bought, bought_and_bought,
product_adv, product_adv,
brand_adv brand_adv,
updated_at as related_time
from dim_asin_related_traffic where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'; from dim_asin_related_traffic where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}';
""" """
self.df_dim_asin_related_traffic = self.spark.sql(sqlQuery=sql).cache() self.df_dim_asin_related_traffic = self.spark.sql(sqlQuery=sql).cache()
...@@ -70,7 +71,7 @@ class DwtAsinRelatedTraffic(object): ...@@ -70,7 +71,7 @@ class DwtAsinRelatedTraffic(object):
# 聚合计算 # 聚合计算
def handle_data(self): def handle_data(self):
cols = [col for col in self.df_dim_asin_related_traffic.columns if col != 'asin'] cols = [col for col in self.df_dim_asin_related_traffic.columns if col != 'asin' and col != 'related_time']
for col in cols: for col in cols:
self.df_dim_asin_related_traffic = self.df_dim_asin_related_traffic.withColumn( self.df_dim_asin_related_traffic = self.df_dim_asin_related_traffic.withColumn(
...@@ -88,8 +89,11 @@ class DwtAsinRelatedTraffic(object): ...@@ -88,8 +89,11 @@ class DwtAsinRelatedTraffic(object):
for col in cols: for col in cols:
num = self.col_num_index[col] num = self.col_num_index[col]
self.df_dim_asin_related_traffic = self.df_dim_asin_related_traffic.withColumn( self.df_dim_asin_related_traffic = self.df_dim_asin_related_traffic.withColumn(
f"{col}_num", F.when(F.col(col).isNull(), F.lit(None)) f"{col}_num", F.when(
.otherwise(F.concat_ws(",", F.array_repeat(F.lit(num), F.size(F.split(F.col(col), ","))))) F.col(col).isNull(), F.lit(None)
).otherwise(
F.concat_ws(",", F.array_repeat(F.lit(num), F.size(F.split(F.col(col), ","))))
)
) )
# 将所有编号列进行拼接 # 将所有编号列进行拼接
...@@ -103,6 +107,7 @@ class DwtAsinRelatedTraffic(object): ...@@ -103,6 +107,7 @@ class DwtAsinRelatedTraffic(object):
'asin', 'asin',
'related_asin', 'related_asin',
'related_type', 'related_type',
'related_time',
F.lit(self.site_name).alias('site_name'), F.lit(self.site_name).alias('site_name'),
F.lit(self.date_type).alias('date_type'), F.lit(self.date_type).alias('date_type'),
F.lit(self.date_info).alias('date_info') F.lit(self.date_info).alias('date_info')
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment