Commit 636c6c70 by chenyuanjie

关联流量-异常值处理

parent 145fe90b
......@@ -5,11 +5,11 @@ sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from utils.spark_util import SparkUtil
from pyspark.sql.types import *
from utils.common_util import CommonUtil
from pyspark.sql import DataFrame
from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil
from utils.common_util import CommonUtil
class DimAsinRelatedTraffic(object):
......@@ -111,7 +111,7 @@ class DimAsinRelatedTraffic(object):
).withColumn(
"flow_asin", F.col(f"exploded_item.{asin_key}")
).filter(
F.col('flow_asin').isNotNull() & (F.col('flow_asin') != "")
F.col('flow_asin').isNotNull() & (F.col('flow_asin') != "") & (F.length(F.col('flow_asin')) == 10)
).groupBy("asin").agg(
F.concat_ws(",", F.collect_set("flow_asin")).alias(f"{output_column}")
)
......@@ -149,33 +149,30 @@ class DimAsinRelatedTraffic(object):
F.from_json(F.coalesce(F.col('result_list_json'), F.lit('[]')), ArrayType(MapType(StringType(), ArrayType(StringType()))))
).withColumn(
"exploded_map", F.explode("result_list_json")
).select(
"*",
F.map_keys("exploded_map").alias("keys"), # 获取所有key
F.map_values("exploded_map").alias("value_arrays") # 获取所有value数组
).withColumn(
"key_value_pair", F.arrays_zip("keys", "value_arrays")
"key_value_pair", F.explode(F.map_entries("exploded_map"))
).withColumn(
"key_value_pair", F.explode("key_value_pair")
"original_key", F.col("key_value_pair.key")
).withColumn(
"original_key", F.col("key_value_pair.keys")
).withColumn(
"value_array", F.col("key_value_pair.value_arrays")
"value_array", F.col("key_value_pair.value")
).withColumn(
"distinct_values", F.array_distinct("value_array")
).withColumn(
"values", F.concat_ws(",", "distinct_values")
# 过滤掉数据中的脏数据asin
"filtered_values",
F.expr("filter(distinct_values, x -> x != '' AND length(x) = 10)")
).withColumn(
"values", F.concat_ws(",", "filtered_values")
).filter(
F.col("values").isNotNull() & (F.trim(F.col('values')) != "")
).select("asin", "original_key", "values")
# 将流量类型就行归类
self.df_result_list_json = self.df_result_list_json.withColumn(
"key", self.u_categorize_flow("original_key")
).filter(
"key != 'other'"
F.col('key') != 'other'
).groupBy("asin", "key").agg(
F.concat_ws(",", F.array_distinct(
F.flatten(F.collect_list(F.split("values", ",")))
)
).alias("values")
F.concat_ws(",", F.array_distinct(F.flatten(F.collect_list(F.split("values", ","))))).alias("values")
).select("asin", "key", "values")
# 行转列
self.df_result_list_json = self.df_result_list_json.groupBy("asin")\
......@@ -243,7 +240,9 @@ class DimAsinRelatedTraffic(object):
self.df_bundles_this_asins_json.show(10, True)
# 处理together_asin字段
self.df_together_asin = self.df_asin_detail.select('asin', 'together_asin').withColumnRenamed(
self.df_together_asin = self.df_asin_detail.select('asin', 'together_asin').filter(
F.col('together_asin').isNotNull()
).withColumnRenamed(
'together_asin', 'combination_bought'
).cache()
print("处理together_asin字段结果如下:")
......@@ -259,11 +258,11 @@ class DimAsinRelatedTraffic(object):
for col in set(df.columns) - {"asin"}:
if col in main_df.columns:
df = df.withColumnRenamed(col, f'{col}_tmp')
main_df = main_df.join(df, "asin", "left") \
main_df = main_df.join(df, "asin", "full") \
.withColumn(col, self.u_merge_df(F.col(col), F.col(f"{col}_tmp"))) \
.drop(f"{col}_tmp")
else:
main_df = main_df.join(df, "asin", "left")
main_df = main_df.join(df, "asin", "full")
self.df_save = main_df.cache()
print("最终合并结果如下:")
......
......@@ -3,12 +3,9 @@ import sys
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from utils.spark_util import SparkUtil
from pyspark.sql.types import *
from utils.common_util import CommonUtil
from pyspark.sql import DataFrame
from utils.hdfs_utils import HdfsUtils
......@@ -71,75 +68,26 @@ class DwtAsinRelatedTraffic(object):
print("dim_asin_related_traffic数据如下:")
self.df_dim_asin_related_traffic.show(10, True)
# 指标计算
# 聚合计算
def handle_data(self):
cols = [col for col in self.df_dim_asin_related_traffic.columns if col != 'asin']
# 不同类型下 关联流量asin数
for col in cols:
self.df_dim_asin_related_traffic = self.df_dim_asin_related_traffic.withColumn(
f"{col}_cnt", F.size(F.split(F.col(col), ","))
).withColumn(
f"{col}_cnt", F.when(F.col(f"{col}_cnt") == -1, 0).otherwise(F.col(f"{col}_cnt"))
)
# 关联流量asin总数
all_cnt_cols = [F.col(f"{col}_cnt") for col in cols]
self.df_dim_asin_related_traffic = self.df_dim_asin_related_traffic.withColumn(
"total_cnt", sum(all_cnt_cols)
).withColumn(
"total_cnt", F.when(F.col('total_cnt') == 0, -1).otherwise(F.col('total_cnt'))
)
# 不同类型 关联流量占比
for col in cols:
self.df_dim_asin_related_traffic = self.df_dim_asin_related_traffic.withColumn(
f"{col}_rat", F.round(F.col(f"{col}_cnt") / F.col("total_cnt"), 4)
)
# 付费/免费流量占比
pay_cols = ["four_star_above", "product_adv", "brand_adv"]
free_cols = [col for col in cols if col not in pay_cols]
self.df_dim_asin_related_traffic = self.df_dim_asin_related_traffic.withColumn(
"pay_cnt", sum([F.col(f"{col}_cnt") for col in pay_cols])
).withColumn(
"pay_rat", F.round(F.col("pay_cnt") / F.col("total_cnt"), 4)
).withColumn(
"free_cnt", sum([F.col(f"{col}_cnt") for col in free_cols])
).withColumn(
"free_rat", F.round(F.col("free_cnt") / F.col("total_cnt"), 4)
).fillna({"pay_cnt": 0,
"pay_rat": 0,
"free_cnt": 0,
"free_rat": 0})
# 字段整合
# 将所有类型下的关联流量asin拼接
self.df_dim_asin_related_traffic = self.df_dim_asin_related_traffic.withColumn(
"related_asin", F.concat_ws(",", *[F.col(c) for c in cols])
"related_asin", F.concat_ws(",", *[F.col(col) for col in cols])
)
# 根据map映射 生成与流量asin数量相等的号列
# 根据map映射 生成与流量asin数量相等的号列
for col in cols:
num = self.col_num_index[col]
self.df_dim_asin_related_traffic = self.df_dim_asin_related_traffic.withColumn(
f"{col}_num",
F.when(
F.col(col).isNull(), F.lit(None)
).otherwise(
F.concat_ws(",", F.array_repeat(F.lit(num), F.size(F.split(F.col(col), ",")))))
f"{col}_num", F.when(F.col(col).isNull(), F.lit(None))
.otherwise(F.concat_ws(",", F.array_repeat(F.lit(num), F.size(F.split(F.col(col), ",")))))
)
# 将所有编号列进行拼接
num_cols = [f"{col}_num" for col in cols]
self.df_dim_asin_related_traffic = self.df_dim_asin_related_traffic.withColumn(
"related_type", F.concat_ws(",", *[F.col(c) for c in num_cols])
)
# 将12种类型的流量数、占比数据整合
self.df_dim_asin_related_traffic = self.df_dim_asin_related_traffic.withColumn(
"related_type_num", F.concat_ws(",", *[F.col(f"{c}_cnt") for c in cols])
).withColumn(
"related_type_rate", F.concat_ws(",", *[F.col(f"{c}_rat") for c in cols])
"related_type", F.concat_ws(",", *[F.col(f"{col}_num") for col in cols])
)
# 数据落盘
......@@ -148,13 +96,6 @@ class DwtAsinRelatedTraffic(object):
'asin',
'related_asin',
'related_type',
'related_type_num',
'related_type_rate',
'free_cnt',
'free_rat',
'pay_cnt',
'pay_rat',
'total_cnt',
F.lit(self.site_name).alias('site_name'),
F.lit(self.date_type).alias('date_type'),
F.lit(self.date_info).alias('date_info')
......@@ -168,7 +109,7 @@ class DwtAsinRelatedTraffic(object):
def run(self):
# 读取数据
self.read_data()
# 指标计算
# 聚合计算
self.handle_data()
# 数据落盘
self.save_data()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment