Commit 5c7a1380 by chenyuanjie

关联流量-新增字段 关联数

parent a0670622
......@@ -99,8 +99,25 @@ class DwtAsinRelatedTraffic(object):
# 将所有编号列进行拼接
self.df_dim_asin_related_traffic = self.df_dim_asin_related_traffic.withColumn(
"related_type", F.concat_ws(",", *[F.col(f"{col}_num") for col in cols])
).cache()
# 统计关联数
df_related = self.df_dim_asin_related_traffic.select(
'asin', F.explode(F.split(F.col('related_asin'), ',')).alias('related_asin')
).drop_duplicates(['asin', 'related_asin']).groupBy(
'related_asin'
).agg(
F.count('asin').alias('related_count')
).withColumnRenamed(
'related_asin', 'asin'
)
self.df_dim_asin_related_traffic = self.df_dim_asin_related_traffic.join(
df_related, on='asin', how='left'
).fillna({
'related_count': 0
})
# 数据落盘
def save_data(self):
self.df_save = self.df_dim_asin_related_traffic.select(
......@@ -108,6 +125,7 @@ class DwtAsinRelatedTraffic(object):
'related_asin',
'related_type',
'related_time',
'related_count',
F.lit(self.site_name).alias('site_name'),
F.lit(self.date_type).alias('date_type'),
F.lit(self.date_info).alias('date_info')
......
......@@ -25,7 +25,8 @@ if __name__ == '__main__':
'asin',
'related_asin',
'related_type',
'related_time'
'related_time',
'related_count'
]
if date_type == 'month':
......@@ -48,7 +49,8 @@ if __name__ == '__main__':
asin varchar(10) NOT NULL,
related_asin varchar(10)[] NOT NULL,
related_type int2[] NOT NULL,
related_time varchar(10) NOT NULL
related_time varchar(10) NOT NULL,
related_count int4 NOT NULL
);
ALTER TABLE {export_tb} ALTER COLUMN related_asin TYPE text;
ALTER TABLE {export_tb} ALTER COLUMN related_type TYPE text;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment