Commit ac41b65d by chenyuanjie

关联流量-代码完善+bug处理

parent 92434c42
import os
import sys
import re
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
......@@ -43,15 +44,42 @@ class DimAsinRelatedTraffic(object):
self.u_categorize_flow = F.udf(self.categorize_flow, StringType())
self.u_merge_df = F.udf(self.merge_df, StringType())
self.u_repair_json = F.udf(self.repair_json, StringType())
@staticmethod
def repair_json(json_str):
"""修复指定字段的数组格式"""
if not json_str:
return json_str
# 匹配三种情况:1) 已格式化的数组 2) 引号包裹的字符串 3) 无引号的值
pattern = re.compile(
r'("Brand in this category on Amazon"\s*:\s*)(\[[^\]]+\]|"([^"]+)"|([^,{}"]+))'
)
def replace_func(m):
# 如果已经是数组格式(group(2)以[开头),直接返回
if m.group(2).startswith('['):
return m.group(0) # 返回整个匹配,不做修改
# 处理字符串值或无引号值
raw_value = m.group(3) or m.group(4)
items = [v.strip() for v in raw_value.split(",") if v.strip()]
return f'{m.group(1)}["{""",""".join(items)}"]'
return pattern.sub(replace_func, json_str)
@staticmethod
def merge_df(col1, col2):
if col1 is None:
return col2
if col2 is None:
return col1
combined = set(col1.split(",") + col2.split(","))
return ",".join([x for x in combined if x])
if not col1 or col1.strip() == "":
return col2 if (col2 and col2.strip()) else None
if not col2 or col2.strip() == "":
return col1 if (col1 and col1.strip()) else None
list1 = list(set(x.strip() for x in col1.split(",") if x.strip()))
list2 = list(set(x.strip() for x in col2.split(",") if x.strip()))
combined = list(set(list1 + list2))
return ",".join(combined) if combined else None
@staticmethod
def categorize_flow(key):
......@@ -106,7 +134,7 @@ class DimAsinRelatedTraffic(object):
包含去重后值的DataFrame(只有一列)
"""
return df.withColumn(
'json_array', F.from_json(F.coalesce(F.col(json_column), F.lit("[]")), ArrayType(MapType(StringType(), StringType())))
'json_array', F.from_json(F.col(json_column), ArrayType(MapType(StringType(), StringType())))
).withColumn(
"exploded_item", F.explode("json_array")
).withColumn(
......@@ -130,7 +158,8 @@ class DimAsinRelatedTraffic(object):
result_list_json,
bundles_this_asins_json,
updated_at
from ods_asin_detail where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}';
from ods_asin_related_traffic
where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}' and asin is not null;
"""
self.df_asin_detail = self.spark.sql(sqlQuery=sql)
......@@ -146,7 +175,7 @@ class DimAsinRelatedTraffic(object):
result_list_json,
null as bundles_this_asins_json,
updated_at
from ods_self_asin_related_traffic where site_name='{self.site_name}';
from ods_self_asin_related_traffic where site_name='{self.site_name}' and asin is not null;
"""
self.df_self_asin_detail = self.spark.sql(sqlQuery=sql)
......@@ -162,42 +191,30 @@ class DimAsinRelatedTraffic(object):
# 处理result_list_json字段
def handle_result_list_json(self):
self.df_result_list_json = self.df_asin_detail.select('asin', 'result_list_json')
# 对复杂json进行拆解
self.df_result_list_json = self.df_result_list_json.withColumn(
'result_list_json',
F.from_json(F.coalesce(F.col('result_list_json'), F.lit('[]')), ArrayType(MapType(StringType(), ArrayType(StringType()))))
).withColumn(
"exploded_map", F.explode("result_list_json")
).withColumn(
"key_value_pair", F.explode(F.map_entries("exploded_map"))
).withColumn(
"original_key", F.col("key_value_pair.key")
json_schema = ArrayType(MapType(StringType(), ArrayType(StringType())))
self.df_result_list_json = self.df_asin_detail.filter(
F.col('result_list_json').isNotNull()
).select(
'asin', F.from_json(self.u_repair_json(F.col('result_list_json')), json_schema).alias('parsed_json')
).withColumn(
"value_array", F.col("key_value_pair.value")
"kv", F.explode("parsed_json")
).select(
"asin", F.explode("kv").alias("key", "value")
).withColumn(
"distinct_values", F.array_distinct("value_array")
).withColumn(
# 过滤掉asin长度不为10的脏数据
"filtered_values",
F.filter("distinct_values", lambda x: (x.isNotNull() & (x != "") & (F.length(F.trim(x)) == 10)))
).withColumn(
"values", F.concat_ws(",", "filtered_values")
"category", self.u_categorize_flow(F.col("key"))
).filter(
F.col("values").isNotNull() & (F.trim(F.col('values')) != "")
).select("asin", "original_key", "values")
# 将流量类型就行归类
self.df_result_list_json = self.df_result_list_json.withColumn(
"key", self.u_categorize_flow("original_key")
F.col("category") != "other"
).withColumn(
"distinct_values", F.array_distinct("value")
).filter(
F.col('key') != 'other'
).groupBy("asin", "key").agg(
F.concat_ws(",", F.array_distinct(F.flatten(F.collect_list(F.split("values", ","))))).alias("values")
).select("asin", "key", "values")
# 行转列
self.df_result_list_json = self.df_result_list_json.groupBy("asin")\
.pivot("key")\
.agg(F.first("values"))\
F.expr("size(distinct_values) > 0")
).select(
'asin', 'category', 'distinct_values'
).groupBy(["asin", "category"]).agg(
F.concat_ws(",", F.array_distinct(F.flatten(F.collect_list("distinct_values")))).alias("values")
).groupBy("asin") \
.pivot("category") \
.agg(F.first("values")) \
.cache()
print("处理result_list_json字段结果如下:")
self.df_result_list_json.show(10, True)
......@@ -205,7 +222,9 @@ class DimAsinRelatedTraffic(object):
# 处理其他流量字段
def handle_other_field(self):
# 处理sp_initial_seen_asins_json字段
self.df_sp_initial_seen_asins_json = self.df_asin_detail.select('asin', 'sp_initial_seen_asins_json')
self.df_sp_initial_seen_asins_json = self.df_asin_detail\
.select('asin', 'sp_initial_seen_asins_json')\
.filter(F.col('sp_initial_seen_asins_json').isNotNull())
self.df_sp_initial_seen_asins_json = self.other_json_handle(
df=self.df_sp_initial_seen_asins_json,
json_column='sp_initial_seen_asins_json',
......@@ -216,7 +235,9 @@ class DimAsinRelatedTraffic(object):
self.df_sp_initial_seen_asins_json.show(10, True)
# 处理sp_4stars_initial_seen_asins_json字段
self.df_sp_4stars_initial_seen_asins_json = self.df_asin_detail.select('asin', 'sp_4stars_initial_seen_asins_json')
self.df_sp_4stars_initial_seen_asins_json = self.df_asin_detail\
.select('asin', 'sp_4stars_initial_seen_asins_json')\
.filter(F.col('sp_4stars_initial_seen_asins_json').isNotNull())
self.df_sp_4stars_initial_seen_asins_json = self.other_json_handle(
df=self.df_sp_4stars_initial_seen_asins_json,
json_column='sp_4stars_initial_seen_asins_json',
......@@ -227,7 +248,9 @@ class DimAsinRelatedTraffic(object):
self.df_sp_4stars_initial_seen_asins_json.show(10, True)
# 处理sp_delivery_initial_seen_asins_json字段
self.df_sp_delivery_initial_seen_asins_json = self.df_asin_detail.select('asin', 'sp_delivery_initial_seen_asins_json')
self.df_sp_delivery_initial_seen_asins_json = self.df_asin_detail\
.select('asin', 'sp_delivery_initial_seen_asins_json')\
.filter(F.col('sp_delivery_initial_seen_asins_json').isNotNull())
self.df_sp_delivery_initial_seen_asins_json = self.other_json_handle(
df=self.df_sp_delivery_initial_seen_asins_json,
json_column='sp_delivery_initial_seen_asins_json',
......@@ -238,7 +261,9 @@ class DimAsinRelatedTraffic(object):
self.df_sp_delivery_initial_seen_asins_json.show(10, True)
# 处理compare_similar_asin_json字段
self.df_compare_similar_asin_json = self.df_asin_detail.select('asin', 'compare_similar_asin_json')
self.df_compare_similar_asin_json = self.df_asin_detail\
.select('asin', 'compare_similar_asin_json')\
.filter(F.col('compare_similar_asin_json').isNotNull())
self.df_compare_similar_asin_json = self.other_json_handle(
df=self.df_compare_similar_asin_json,
json_column='compare_similar_asin_json',
......@@ -249,7 +274,9 @@ class DimAsinRelatedTraffic(object):
self.df_compare_similar_asin_json.show(10, True)
# 处理bundles_this_asins_json字段
self.df_bundles_this_asins_json = self.df_asin_detail.select('asin', 'bundles_this_asins_json')
self.df_bundles_this_asins_json = self.df_asin_detail\
.select('asin', 'bundles_this_asins_json')\
.filter(F.col('bundles_this_asins_json').isNotNull())
self.df_bundles_this_asins_json = self.other_json_handle(
df=self.df_bundles_this_asins_json,
json_column='bundles_this_asins_json',
......
......@@ -72,6 +72,13 @@ class DwtAsinRelatedTraffic(object):
def handle_data(self):
cols = [col for col in self.df_dim_asin_related_traffic.columns if col != 'asin']
for col in cols:
self.df_dim_asin_related_traffic = self.df_dim_asin_related_traffic.withColumn(
col, F.concat_ws(",", F.filter(F.split(F.col(col), ","), lambda x: (F.length(F.trim(x)) == 10)))
).withColumn(
col, F.when(F.col(col) == "", None).otherwise(F.col(col))
)
# 将所有类型下的关联流量asin拼接
self.df_dim_asin_related_traffic = self.df_dim_asin_related_traffic.withColumn(
"related_asin", F.concat_ws(",", *[F.col(col) for col in cols])
......
......@@ -6,6 +6,7 @@ sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.db_util import DBUtil
from datetime import date
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
......@@ -13,32 +14,39 @@ if __name__ == '__main__':
date_info = CommonUtil.get_sys_arg(3, None)
print(f"执行参数为{sys.argv}")
# CommonUtil.judge_is_work_hours(site_name=site_name, date_type=date_type, date_info=date_info,
# principal='chenyuanjie',
# priority=2, export_tools_type=1, belonging_to_process=f'新ABA流程_{date_type}')
CommonUtil.judge_is_work_hours(
site_name=site_name, date_type=date_type, date_info=date_info,
principal='chenyuanjie', priority=3, export_tools_type=1, belonging_to_process='关联流量'
)
db_type = 'postgresql_cluster'
engine = DBUtil.get_db_engine(db_type, site_name)
dt = str(date_info).replace("-", "_")
export_tb = f"{site_name}_asin_related_{dt}"
export_cols = [
'asin',
'related_asin',
'related_type',
'related_type_num',
'related_type_rate',
'free_cnt',
'free_rat',
'pay_cnt',
'pay_rat',
'total_cnt'
'related_type'
]
if date_type == 'month':
dt = str(date_info).replace("-", "_")
export_tb = f"{site_name}_asin_related_{dt}"
report_type = date_type
report_date = date_info
elif date_type == 'month_week':
export_tb = f"{site_name}_asin_related_30_day"
report_type = '30_day'
report_date = date.today()
else:
raise 'date_type有误!'
sql = f"""
DROP TABLE IF EXISTS {export_tb};
CREATE TABLE {export_tb} (LIKE us_asin_related_template INCLUDING ALL);
ALTER TABLE {export_tb} ALTER COLUMN related_asin TYPE text;
ALTER TABLE {export_tb} ALTER COLUMN related_type TYPE text;
ALTER TABLE {export_tb} ALTER COLUMN related_type_num TYPE text;
ALTER TABLE {export_tb} ALTER COLUMN related_type_rate TYPE text;
"""
DBUtil.engine_exec_sql(engine, sql)
partition_dict = {
......@@ -57,22 +65,37 @@ if __name__ == '__main__':
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
client.close()
print("数据导出完成,准备修改数据类型!")
sql = f"""
ALTER TABLE {export_tb}
ALTER COLUMN related_asin TYPE VARCHAR(20)[]
ALTER COLUMN related_asin TYPE VARCHAR(10)[]
USING string_to_array(related_asin, ',');
ALTER TABLE {export_tb}
ALTER COLUMN related_type TYPE INTEGER[]
USING string_to_array(related_type, ',')::int[];
"""
DBUtil.engine_exec_sql(engine, sql)
print("数据类型修改完成,准备创建索引!")
ALTER TABLE {export_tb}
ALTER COLUMN related_type_num TYPE INTEGER[]
USING string_to_array(related_type_num, ',')::int[];
sql = f"""
CREATE INDEX {export_tb}_asin_idx ON {export_tb} USING btree (
"asin" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST
);
ALTER TABLE {export_tb}
ALTER COLUMN related_type_rate TYPE numeric(10,4)[]
USING string_to_array(related_type_rate, ',')::numeric(10,4)[];
CREATE INDEX {export_tb}_related_asin_idx ON {export_tb} USING gin (
"related_asin" COLLATE "pg_catalog"."default" "pg_catalog"."array_ops"
);
"""
DBUtil.engine_exec_sql(engine, sql)
print("索引创建完成,准备插入流程记录表!")
sql = f"""
REPLACE INTO selection.workflow_everyday
(site_name, report_date, status, status_val, table_name, date_type, page, is_end, remark, export_db_type)
VALUES
('{site_name}', '{report_date}', '导出PG集群完成', 14, '{export_tb}', '{report_type}', '关联流量', '是', '关联流量模块数据', 'postgresql_cluster');
"""
DBUtil.engine_exec_sql(DBUtil.get_db_engine('mysql', 'us'), sql)
print("success!")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment