Commit 145fe90b by chenyuanjie

“关联流量”相关代码

parent 20ea475b
import os
import sys
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from utils.spark_util import SparkUtil
from pyspark.sql.types import *
from utils.common_util import CommonUtil
from pyspark.sql import DataFrame
from utils.hdfs_utils import HdfsUtils
class DimAsinRelatedTraffic(object):
def __init__(self, site_name, date_type, date_info):
super().__init__()
self.site_name = site_name
self.date_type = date_type
self.date_info = date_info
self.hive_tb = f'dim_asin_related_traffic'
self.partition_dict = {
"site_name": site_name,
"date_type": date_type,
"date_info": date_info
}
self.hdfs_path = CommonUtil.build_hdfs_path(self.hive_tb, partition_dict=self.partition_dict)
app_name = f"{self.__class__.__name__}:{site_name}:{date_type}:{date_info}"
self.spark = SparkUtil.get_spark_session(app_name)
self.partitions_by = ['site_name', 'date_type', 'date_info']
self.df_asin_detail = self.spark.sql(f"select 1+1;")
self.df_result_list_json = self.spark.sql(f"select 1+1;")
self.df_together_asin = self.spark.sql(f"select 1+1;")
self.df_sp_initial_seen_asins_json = self.spark.sql(f"select 1+1;")
self.df_sp_4stars_initial_seen_asins_json = self.spark.sql(f"select 1+1;")
self.df_sp_delivery_initial_seen_asins_json = self.spark.sql(f"select 1+1;")
self.df_compare_similar_asin_json = self.spark.sql(f"select 1+1;")
self.df_bundles_this_asins_json = self.spark.sql(f"select 1+1;")
self.df_save = self.spark.sql(f"select 1+1;")
self.u_categorize_flow = F.udf(self.categorize_flow, StringType())
self.u_merge_df = F.udf(self.merge_df, StringType())
@staticmethod
def merge_df(col1, col2):
if col1 is None:
return col2
if col2 is None:
return col1
combined = set(col1.split(",") + col2.split(","))
return ",".join([x for x in combined if x])
@staticmethod
def categorize_flow(key):
key_lower = key.lower()
if key_lower == '4 stars and above':
return "four_star_above"
elif key_lower in ('brands you might like', 'more to consider from our brands',
'exclusive items from our brands', 'more from frequently bought brands'):
return "brand_recommendation"
elif key_lower in ('products related to this item', 'based on your recent views', 'customer also bought',
'deals on related products', 'similar items in new arrivals', 'top rated similar items',
'compare with similar items', 'discover similar items'):
return "similar_items"
elif key_lower in ('customers who viewed this item also viewed', 'customers frequently viewed'):
return "look_and_look"
elif key_lower.startswith('customers also'):
return "look_also_look"
elif key_lower.startswith('what other items do customers buy after viewing this item'):
return "look_but_bought"
elif key_lower in ('make it a bundle', 'bundles with this item'):
return "bundle_bought"
elif key_lower in ('buy it with', 'frequently bought together'):
return "combination_bought"
elif key_lower in ('more items to explore', 'based on your recent shopping trends',
'related products with free delivery on eligible orders') \
or key_lower.startswith('explore more'):
return "more_relevant"
elif key_lower == 'customers who bought this item also bought':
return "bought_and_bought"
elif key_lower == 'sponsored products related to this item':
return "product_adv"
elif key_lower in ('brands related to this category on amazon', 'brand in this category on amazon'):
return "brand_adv"
else:
return "other"
@staticmethod
def other_json_handle(
df: DataFrame,
json_column: str,
asin_key: str,
output_column: str
) -> DataFrame:
"""
从JSON数组字段中提取特定键的值并去重
参数:
df: 输入的DataFrame
json_column: 包含JSON数组的列名
asin_key: 要从JSON对象中提取的键名
output_column: 输出结果的列名
返回:
包含去重后值的DataFrame(只有一列)
"""
return df.withColumn(
'json_array', F.from_json(F.coalesce(F.col(json_column), F.lit("[]")), ArrayType(MapType(StringType(), StringType())))
).withColumn(
"exploded_item", F.explode("json_array")
).withColumn(
"flow_asin", F.col(f"exploded_item.{asin_key}")
).filter(
F.col('flow_asin').isNotNull() & (F.col('flow_asin') != "")
).groupBy("asin").agg(
F.concat_ws(",", F.collect_set("flow_asin")).alias(f"{output_column}")
)
def read_data(self):
print("读取ods_asin_detail中流量相关数据")
sql = f"""
select
asin,
together_asin,
sp_initial_seen_asins_json,
sp_4stars_initial_seen_asins_json,
sp_delivery_initial_seen_asins_json,
compare_similar_asin_json,
result_list_json,
bundles_this_asins_json,
updated_at
from ods_asin_detail where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}';
"""
self.df_asin_detail = self.spark.sql(sqlQuery=sql)
# 去重
window = Window.partitionBy(['asin']).orderBy(self.df_asin_detail.updated_at.desc_nulls_last())
self.df_asin_detail = self.df_asin_detail.withColumn(
"dt_rank", F.row_number().over(window=window)
).filter("dt_rank=1").drop("updated_at", "dt_rank").cache()
print("详情数据如下:")
self.df_asin_detail.show(10, True)
# 处理result_list_json字段
def handle_result_list_json(self):
self.df_result_list_json = self.df_asin_detail.select('asin', 'result_list_json')
# 对复杂json进行拆解
self.df_result_list_json = self.df_result_list_json.withColumn(
'result_list_json',
F.from_json(F.coalesce(F.col('result_list_json'), F.lit('[]')), ArrayType(MapType(StringType(), ArrayType(StringType()))))
).withColumn(
"exploded_map", F.explode("result_list_json")
).select(
"*",
F.map_keys("exploded_map").alias("keys"), # 获取所有key
F.map_values("exploded_map").alias("value_arrays") # 获取所有value数组
).withColumn(
"key_value_pair", F.arrays_zip("keys", "value_arrays")
).withColumn(
"key_value_pair", F.explode("key_value_pair")
).withColumn(
"original_key", F.col("key_value_pair.keys")
).withColumn(
"value_array", F.col("key_value_pair.value_arrays")
).withColumn(
"distinct_values", F.array_distinct("value_array")
).withColumn(
"values", F.concat_ws(",", "distinct_values")
).select("asin", "original_key", "values")
# 将流量类型就行归类
self.df_result_list_json = self.df_result_list_json.withColumn(
"key", self.u_categorize_flow("original_key")
).filter(
"key != 'other'"
).groupBy("asin", "key").agg(
F.concat_ws(",", F.array_distinct(
F.flatten(F.collect_list(F.split("values", ",")))
)
).alias("values")
).select("asin", "key", "values")
# 行转列
self.df_result_list_json = self.df_result_list_json.groupBy("asin")\
.pivot("key")\
.agg(F.first("values"))\
.cache()
print("处理result_list_json字段结果如下:")
self.df_result_list_json.show(10, True)
# 处理其他流量字段
def handle_other_field(self):
# 处理sp_initial_seen_asins_json字段
self.df_sp_initial_seen_asins_json = self.df_asin_detail.select('asin', 'sp_initial_seen_asins_json')
self.df_sp_initial_seen_asins_json = self.other_json_handle(
df=self.df_sp_initial_seen_asins_json,
json_column='sp_initial_seen_asins_json',
asin_key='seen_asins',
output_column='similar_items'
).cache()
print("处理sp_initial_seen_asins_json字段结果如下:")
self.df_sp_initial_seen_asins_json.show(10, True)
# 处理sp_4stars_initial_seen_asins_json字段
self.df_sp_4stars_initial_seen_asins_json = self.df_asin_detail.select('asin', 'sp_4stars_initial_seen_asins_json')
self.df_sp_4stars_initial_seen_asins_json = self.other_json_handle(
df=self.df_sp_4stars_initial_seen_asins_json,
json_column='sp_4stars_initial_seen_asins_json',
asin_key='seen_asins',
output_column='four_star_above'
).cache()
print("处理sp_4stars_initial_seen_asins_json字段结果如下:")
self.df_sp_4stars_initial_seen_asins_json.show(10, True)
# 处理sp_delivery_initial_seen_asins_json字段
self.df_sp_delivery_initial_seen_asins_json = self.df_asin_detail.select('asin', 'sp_delivery_initial_seen_asins_json')
self.df_sp_delivery_initial_seen_asins_json = self.other_json_handle(
df=self.df_sp_delivery_initial_seen_asins_json,
json_column='sp_delivery_initial_seen_asins_json',
asin_key='seen_asins',
output_column='more_relevant'
).cache()
print("处理sp_delivery_initial_seen_asins_json字段结果如下:")
self.df_sp_delivery_initial_seen_asins_json.show(10, True)
# 处理compare_similar_asin_json字段
self.df_compare_similar_asin_json = self.df_asin_detail.select('asin', 'compare_similar_asin_json')
self.df_compare_similar_asin_json = self.other_json_handle(
df=self.df_compare_similar_asin_json,
json_column='compare_similar_asin_json',
asin_key='compare_asin',
output_column='similar_items'
).cache()
print("处理compare_similar_asin_json字段结果如下:")
self.df_compare_similar_asin_json.show(10, True)
# 处理bundles_this_asins_json字段
self.df_bundles_this_asins_json = self.df_asin_detail.select('asin', 'bundles_this_asins_json')
self.df_bundles_this_asins_json = self.other_json_handle(
df=self.df_bundles_this_asins_json,
json_column='bundles_this_asins_json',
asin_key='bundles_Asins',
output_column='bundle_bought'
).cache()
print("处理bundles_this_asins_json字段结果如下:")
self.df_bundles_this_asins_json.show(10, True)
# 处理together_asin字段
self.df_together_asin = self.df_asin_detail.select('asin', 'together_asin').withColumnRenamed(
'together_asin', 'combination_bought'
).cache()
print("处理together_asin字段结果如下:")
self.df_together_asin.show(10, True)
# 合并所有df
def handle_merge_df(self):
all_merge_df = [self.df_together_asin, self.df_sp_initial_seen_asins_json,
self.df_sp_4stars_initial_seen_asins_json, self.df_sp_delivery_initial_seen_asins_json,
self.df_compare_similar_asin_json, self.df_bundles_this_asins_json]
main_df = self.df_result_list_json
for df in all_merge_df:
for col in set(df.columns) - {"asin"}:
if col in main_df.columns:
df = df.withColumnRenamed(col, f'{col}_tmp')
main_df = main_df.join(df, "asin", "left") \
.withColumn(col, self.u_merge_df(F.col(col), F.col(f"{col}_tmp"))) \
.drop(f"{col}_tmp")
else:
main_df = main_df.join(df, "asin", "left")
self.df_save = main_df.cache()
print("最终合并结果如下:")
self.df_save.show(10, True)
self.df_asin_detail.unpersist()
self.df_result_list_json.unpersist()
self.df_together_asin.unpersist()
self.df_sp_initial_seen_asins_json.unpersist()
self.df_sp_4stars_initial_seen_asins_json.unpersist()
self.df_sp_delivery_initial_seen_asins_json.unpersist()
self.df_compare_similar_asin_json.unpersist()
self.df_bundles_this_asins_json.unpersist()
# 数据落盘
def save_data(self):
# 确保df字段与hive表字段结构统一
hive_tb_cols = [f.name for f in self.spark.table(f"{self.hive_tb}").schema]
for col in hive_tb_cols:
if col not in self.df_save.columns:
self.df_save = self.df_save.withColumn(col, F.lit(None))
# 分区字段处理
self.df_save = self.df_save.withColumn(
'site_name', F.lit(self.site_name)
).withColumn(
'date_type', F.lit(self.date_type)
).withColumn(
'date_info', F.lit(self.date_info)
).select(*hive_tb_cols).replace('', None)
print(f"清除hdfs目录中:{self.hdfs_path}")
HdfsUtils.delete_file_in_folder(self.hdfs_path)
print(f"当前存储的表名为:{self.hive_tb},分区为:{self.partitions_by}")
self.df_save.repartition(40).write.saveAsTable(name=self.hive_tb, format='hive', mode='append', partitionBy=self.partitions_by)
print("success")
def run(self):
# 读取数据
self.read_data()
# 处理result_list_json字段
self.handle_result_list_json()
# 处理其他流量字段
self.handle_other_field()
# 合并所有df
self.handle_merge_df()
# 数据落盘
self.save_data()
if __name__ == '__main__':
site_name = sys.argv[1]
date_type = sys.argv[2]
date_info = sys.argv[3]
handle_obj = DimAsinRelatedTraffic(site_name=site_name, date_type=date_type, date_info=date_info)
handle_obj.run()
import os
import sys
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from utils.spark_util import SparkUtil
from pyspark.sql.types import *
from utils.common_util import CommonUtil
from pyspark.sql import DataFrame
from utils.hdfs_utils import HdfsUtils
class DwtAsinRelatedTraffic(object):
def __init__(self, site_name, date_type, date_info):
super().__init__()
self.site_name = site_name
self.date_type = date_type
self.date_info = date_info
self.hive_tb = f'dwt_asin_related_traffic'
self.partition_dict = {
"site_name": site_name,
"date_type": date_type,
"date_info": date_info
}
self.hdfs_path = CommonUtil.build_hdfs_path(self.hive_tb, partition_dict=self.partition_dict)
app_name = f"{self.__class__.__name__}:{site_name}:{date_type}:{date_info}"
self.spark = SparkUtil.get_spark_session(app_name)
self.partitions_by = ['site_name', 'date_type', 'date_info']
self.df_dim_asin_related_traffic = self.spark.sql(f"select 1+1;")
self.df_save = self.spark.sql(f"select 1+1;")
self.col_num_index = {
"four_star_above": 1,
"brand_recommendation": 2,
"similar_items": 3,
"look_and_look": 4,
"look_also_look": 5,
"look_but_bought": 6,
"bundle_bought": 7,
"combination_bought": 8,
"more_relevant": 9,
"bought_and_bought": 10,
"product_adv": 11,
"brand_adv": 12
}
def read_data(self):
print("读取dim_asin_related_traffic流量数据")
sql = f"""
select
asin,
four_star_above,
brand_recommendation,
similar_items,
look_and_look,
look_also_look,
look_but_bought,
bundle_bought,
combination_bought,
more_relevant,
bought_and_bought,
product_adv,
brand_adv
from dim_asin_related_traffic where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}';
"""
self.df_dim_asin_related_traffic = self.spark.sql(sqlQuery=sql).cache()
print("dim_asin_related_traffic数据如下:")
self.df_dim_asin_related_traffic.show(10, True)
# 指标计算
def handle_data(self):
cols = [col for col in self.df_dim_asin_related_traffic.columns if col != 'asin']
# 不同类型下 关联流量asin数
for col in cols:
self.df_dim_asin_related_traffic = self.df_dim_asin_related_traffic.withColumn(
f"{col}_cnt", F.size(F.split(F.col(col), ","))
).withColumn(
f"{col}_cnt", F.when(F.col(f"{col}_cnt") == -1, 0).otherwise(F.col(f"{col}_cnt"))
)
# 关联流量asin总数
all_cnt_cols = [F.col(f"{col}_cnt") for col in cols]
self.df_dim_asin_related_traffic = self.df_dim_asin_related_traffic.withColumn(
"total_cnt", sum(all_cnt_cols)
).withColumn(
"total_cnt", F.when(F.col('total_cnt') == 0, -1).otherwise(F.col('total_cnt'))
)
# 不同类型 关联流量占比
for col in cols:
self.df_dim_asin_related_traffic = self.df_dim_asin_related_traffic.withColumn(
f"{col}_rat", F.round(F.col(f"{col}_cnt") / F.col("total_cnt"), 4)
)
# 付费/免费流量占比
pay_cols = ["four_star_above", "product_adv", "brand_adv"]
free_cols = [col for col in cols if col not in pay_cols]
self.df_dim_asin_related_traffic = self.df_dim_asin_related_traffic.withColumn(
"pay_cnt", sum([F.col(f"{col}_cnt") for col in pay_cols])
).withColumn(
"pay_rat", F.round(F.col("pay_cnt") / F.col("total_cnt"), 4)
).withColumn(
"free_cnt", sum([F.col(f"{col}_cnt") for col in free_cols])
).withColumn(
"free_rat", F.round(F.col("free_cnt") / F.col("total_cnt"), 4)
).fillna({"pay_cnt": 0,
"pay_rat": 0,
"free_cnt": 0,
"free_rat": 0})
# 字段整合
# 将所有类型下的关联流量asin拼接
self.df_dim_asin_related_traffic = self.df_dim_asin_related_traffic.withColumn(
"related_asin", F.concat_ws(",", *[F.col(c) for c in cols])
)
# 根据map映射 生成与流量asin数量相等的标号列
for col in cols:
num = self.col_num_index[col]
self.df_dim_asin_related_traffic = self.df_dim_asin_related_traffic.withColumn(
f"{col}_num",
F.when(
F.col(col).isNull(), F.lit(None)
).otherwise(
F.concat_ws(",", F.array_repeat(F.lit(num), F.size(F.split(F.col(col), ",")))))
)
# 将所有编号列进行拼接
num_cols = [f"{col}_num" for col in cols]
self.df_dim_asin_related_traffic = self.df_dim_asin_related_traffic.withColumn(
"related_type", F.concat_ws(",", *[F.col(c) for c in num_cols])
)
# 将12种类型的流量数、占比数据整合
self.df_dim_asin_related_traffic = self.df_dim_asin_related_traffic.withColumn(
"related_type_num", F.concat_ws(",", *[F.col(f"{c}_cnt") for c in cols])
).withColumn(
"related_type_rate", F.concat_ws(",", *[F.col(f"{c}_rat") for c in cols])
)
# 数据落盘
def save_data(self):
self.df_save = self.df_dim_asin_related_traffic.select(
'asin',
'related_asin',
'related_type',
'related_type_num',
'related_type_rate',
'free_cnt',
'free_rat',
'pay_cnt',
'pay_rat',
'total_cnt',
F.lit(self.site_name).alias('site_name'),
F.lit(self.date_type).alias('date_type'),
F.lit(self.date_info).alias('date_info')
)
print(f"清除hdfs目录中:{self.hdfs_path}")
HdfsUtils.delete_file_in_folder(self.hdfs_path)
print(f"当前存储的表名为:{self.hive_tb},分区为:{self.partitions_by}")
self.df_save.repartition(40).write.saveAsTable(name=self.hive_tb, format='hive', mode='append', partitionBy=self.partitions_by)
print("success")
def run(self):
# 读取数据
self.read_data()
# 指标计算
self.handle_data()
# 数据落盘
self.save_data()
if __name__ == '__main__':
site_name = sys.argv[1]
date_type = sys.argv[2]
date_info = sys.argv[3]
handle_obj = DwtAsinRelatedTraffic(site_name=site_name, date_type=date_type, date_info=date_info)
handle_obj.run()
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.db_util import DBUtil
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
date_type = CommonUtil.get_sys_arg(2, None)
date_info = CommonUtil.get_sys_arg(3, None)
print(f"执行参数为{sys.argv}")
# CommonUtil.judge_is_work_hours(site_name=site_name, date_type=date_type, date_info=date_info,
# principal='chenyuanjie',
# priority=2, export_tools_type=1, belonging_to_process=f'新ABA流程_{date_type}')
db_type = 'postgresql_cluster'
engine = DBUtil.get_db_engine(db_type, site_name)
dt = str(date_info).replace("-", "_")
export_tb = f"{site_name}_asin_related_{dt}"
export_cols = [
'asin',
'related_asin',
'related_type',
'related_type_num',
'related_type_rate',
'free_cnt',
'free_rat',
'pay_cnt',
'pay_rat',
'total_cnt'
]
sql = f"""
ALTER TABLE {export_tb} ALTER COLUMN related_asin TYPE text;
ALTER TABLE {export_tb} ALTER COLUMN related_type TYPE text;
ALTER TABLE {export_tb} ALTER COLUMN related_type_num TYPE text;
ALTER TABLE {export_tb} ALTER COLUMN related_type_rate TYPE text;
"""
DBUtil.engine_exec_sql(engine, sql)
partition_dict = {
"site_name": site_name,
"date_type": date_type,
"date_info": date_info
}
sh = CommonUtil.build_export_sh(
site_name=site_name,
db_type=db_type,
hive_tb="dwt_asin_related_traffic",
export_tb=export_tb,
col=export_cols,
partition_dict=partition_dict
)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
client.close()
sql = f"""
ALTER TABLE {export_tb}
ALTER COLUMN related_asin TYPE VARCHAR(20)[]
USING string_to_array(related_asin, ',');
ALTER TABLE {export_tb}
ALTER COLUMN related_type TYPE INTEGER[]
USING string_to_array(related_type, ',')::int[];
ALTER TABLE {export_tb}
ALTER COLUMN related_type_num TYPE INTEGER[]
USING string_to_array(related_type_num, ',')::int[];
ALTER TABLE {export_tb}
ALTER COLUMN related_type_rate TYPE numeric(10,4)[]
USING string_to_array(related_type_rate, ',')::numeric(10,4)[];
"""
DBUtil.engine_exec_sql(engine, sql)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment