diff --git a/Pyspark_job/dim/dim_asin_related_traffic.py b/Pyspark_job/dim/dim_asin_related_traffic.py index f17e5aa..f19d52b 100644 --- a/Pyspark_job/dim/dim_asin_related_traffic.py +++ b/Pyspark_job/dim/dim_asin_related_traffic.py @@ -1,5 +1,6 @@ import os import sys +import re sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 @@ -43,15 +44,42 @@ class DimAsinRelatedTraffic(object): self.u_categorize_flow = F.udf(self.categorize_flow, StringType()) self.u_merge_df = F.udf(self.merge_df, StringType()) + self.u_repair_json = F.udf(self.repair_json, StringType()) + + @staticmethod + def repair_json(json_str): + """修复指定字段的数组格式""" + if not json_str: + return json_str + + # 匹配三种情况:1) 已格式化的数组 2) 引号包裹的字符串 3) 无引号的值 + pattern = re.compile( + r'("Brand in this category on Amazon"\s*:\s*)(\[[^\]]+\]|"([^"]+)"|([^,{}"]+))' + ) + + def replace_func(m): + # 如果已经是数组格式(group(2)以[开头),直接返回 + if m.group(2).startswith('['): + return m.group(0) # 返回整个匹配,不做修改 + + # 处理字符串值或无引号值 + raw_value = m.group(3) or m.group(4) + items = [v.strip() for v in raw_value.split(",") if v.strip()] + return f'{m.group(1)}["{""",""".join(items)}"]' + + return pattern.sub(replace_func, json_str) @staticmethod def merge_df(col1, col2): - if col1 is None: - return col2 - if col2 is None: - return col1 - combined = set(col1.split(",") + col2.split(",")) - return ",".join([x for x in combined if x]) + if not col1 or col1.strip() == "": + return col2 if (col2 and col2.strip()) else None + if not col2 or col2.strip() == "": + return col1 if (col1 and col1.strip()) else None + + list1 = list(set(x.strip() for x in col1.split(",") if x.strip())) + list2 = list(set(x.strip() for x in col2.split(",") if x.strip())) + combined = list(set(list1 + list2)) + return ",".join(combined) if combined else None @staticmethod def categorize_flow(key): @@ -106,7 +134,7 @@ class DimAsinRelatedTraffic(object): 包含去重后值的DataFrame(只有一列) """ return df.withColumn( - 'json_array', F.from_json(F.coalesce(F.col(json_column), F.lit("[]")), ArrayType(MapType(StringType(), StringType()))) + 'json_array', F.from_json(F.col(json_column), ArrayType(MapType(StringType(), StringType()))) ).withColumn( "exploded_item", F.explode("json_array") ).withColumn( @@ -130,7 +158,8 @@ class DimAsinRelatedTraffic(object): result_list_json, bundles_this_asins_json, updated_at - from ods_asin_detail where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'; + from ods_asin_related_traffic + where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}' and asin is not null; """ self.df_asin_detail = self.spark.sql(sqlQuery=sql) @@ -146,7 +175,7 @@ class DimAsinRelatedTraffic(object): result_list_json, null as bundles_this_asins_json, updated_at - from ods_self_asin_related_traffic where site_name='{self.site_name}'; + from ods_self_asin_related_traffic where site_name='{self.site_name}' and asin is not null; """ self.df_self_asin_detail = self.spark.sql(sqlQuery=sql) @@ -162,42 +191,30 @@ class DimAsinRelatedTraffic(object): # 处理result_list_json字段 def handle_result_list_json(self): - self.df_result_list_json = self.df_asin_detail.select('asin', 'result_list_json') - # 对复杂json进行拆解 - self.df_result_list_json = self.df_result_list_json.withColumn( - 'result_list_json', - F.from_json(F.coalesce(F.col('result_list_json'), F.lit('[]')), ArrayType(MapType(StringType(), ArrayType(StringType())))) - ).withColumn( - "exploded_map", F.explode("result_list_json") - ).withColumn( - "key_value_pair", F.explode(F.map_entries("exploded_map")) - ).withColumn( - "original_key", F.col("key_value_pair.key") + json_schema = ArrayType(MapType(StringType(), ArrayType(StringType()))) + self.df_result_list_json = self.df_asin_detail.filter( + F.col('result_list_json').isNotNull() + ).select( + 'asin', F.from_json(self.u_repair_json(F.col('result_list_json')), json_schema).alias('parsed_json') ).withColumn( - "value_array", F.col("key_value_pair.value") + "kv", F.explode("parsed_json") + ).select( + "asin", F.explode("kv").alias("key", "value") ).withColumn( - "distinct_values", F.array_distinct("value_array") - ).withColumn( - # 过滤掉asin长度不为10的脏数据 - "filtered_values", - F.filter("distinct_values", lambda x: (x.isNotNull() & (x != "") & (F.length(F.trim(x)) == 10))) - ).withColumn( - "values", F.concat_ws(",", "filtered_values") + "category", self.u_categorize_flow(F.col("key")) ).filter( - F.col("values").isNotNull() & (F.trim(F.col('values')) != "") - ).select("asin", "original_key", "values") - # 将流量类型就行归类 - self.df_result_list_json = self.df_result_list_json.withColumn( - "key", self.u_categorize_flow("original_key") + F.col("category") != "other" + ).withColumn( + "distinct_values", F.array_distinct("value") ).filter( - F.col('key') != 'other' - ).groupBy("asin", "key").agg( - F.concat_ws(",", F.array_distinct(F.flatten(F.collect_list(F.split("values", ","))))).alias("values") - ).select("asin", "key", "values") - # 行转列 - self.df_result_list_json = self.df_result_list_json.groupBy("asin")\ - .pivot("key")\ - .agg(F.first("values"))\ + F.expr("size(distinct_values) > 0") + ).select( + 'asin', 'category', 'distinct_values' + ).groupBy(["asin", "category"]).agg( + F.concat_ws(",", F.array_distinct(F.flatten(F.collect_list("distinct_values")))).alias("values") + ).groupBy("asin") \ + .pivot("category") \ + .agg(F.first("values")) \ .cache() print("处理result_list_json字段结果如下:") self.df_result_list_json.show(10, True) @@ -205,7 +222,9 @@ class DimAsinRelatedTraffic(object): # 处理其他流量字段 def handle_other_field(self): # 处理sp_initial_seen_asins_json字段 - self.df_sp_initial_seen_asins_json = self.df_asin_detail.select('asin', 'sp_initial_seen_asins_json') + self.df_sp_initial_seen_asins_json = self.df_asin_detail\ + .select('asin', 'sp_initial_seen_asins_json')\ + .filter(F.col('sp_initial_seen_asins_json').isNotNull()) self.df_sp_initial_seen_asins_json = self.other_json_handle( df=self.df_sp_initial_seen_asins_json, json_column='sp_initial_seen_asins_json', @@ -216,7 +235,9 @@ class DimAsinRelatedTraffic(object): self.df_sp_initial_seen_asins_json.show(10, True) # 处理sp_4stars_initial_seen_asins_json字段 - self.df_sp_4stars_initial_seen_asins_json = self.df_asin_detail.select('asin', 'sp_4stars_initial_seen_asins_json') + self.df_sp_4stars_initial_seen_asins_json = self.df_asin_detail\ + .select('asin', 'sp_4stars_initial_seen_asins_json')\ + .filter(F.col('sp_4stars_initial_seen_asins_json').isNotNull()) self.df_sp_4stars_initial_seen_asins_json = self.other_json_handle( df=self.df_sp_4stars_initial_seen_asins_json, json_column='sp_4stars_initial_seen_asins_json', @@ -227,7 +248,9 @@ class DimAsinRelatedTraffic(object): self.df_sp_4stars_initial_seen_asins_json.show(10, True) # 处理sp_delivery_initial_seen_asins_json字段 - self.df_sp_delivery_initial_seen_asins_json = self.df_asin_detail.select('asin', 'sp_delivery_initial_seen_asins_json') + self.df_sp_delivery_initial_seen_asins_json = self.df_asin_detail\ + .select('asin', 'sp_delivery_initial_seen_asins_json')\ + .filter(F.col('sp_delivery_initial_seen_asins_json').isNotNull()) self.df_sp_delivery_initial_seen_asins_json = self.other_json_handle( df=self.df_sp_delivery_initial_seen_asins_json, json_column='sp_delivery_initial_seen_asins_json', @@ -238,7 +261,9 @@ class DimAsinRelatedTraffic(object): self.df_sp_delivery_initial_seen_asins_json.show(10, True) # 处理compare_similar_asin_json字段 - self.df_compare_similar_asin_json = self.df_asin_detail.select('asin', 'compare_similar_asin_json') + self.df_compare_similar_asin_json = self.df_asin_detail\ + .select('asin', 'compare_similar_asin_json')\ + .filter(F.col('compare_similar_asin_json').isNotNull()) self.df_compare_similar_asin_json = self.other_json_handle( df=self.df_compare_similar_asin_json, json_column='compare_similar_asin_json', @@ -249,7 +274,9 @@ class DimAsinRelatedTraffic(object): self.df_compare_similar_asin_json.show(10, True) # 处理bundles_this_asins_json字段 - self.df_bundles_this_asins_json = self.df_asin_detail.select('asin', 'bundles_this_asins_json') + self.df_bundles_this_asins_json = self.df_asin_detail\ + .select('asin', 'bundles_this_asins_json')\ + .filter(F.col('bundles_this_asins_json').isNotNull()) self.df_bundles_this_asins_json = self.other_json_handle( df=self.df_bundles_this_asins_json, json_column='bundles_this_asins_json', diff --git a/Pyspark_job/dwt/dwt_asin_related_traffic.py b/Pyspark_job/dwt/dwt_asin_related_traffic.py index c65305a..356b217 100644 --- a/Pyspark_job/dwt/dwt_asin_related_traffic.py +++ b/Pyspark_job/dwt/dwt_asin_related_traffic.py @@ -72,6 +72,13 @@ class DwtAsinRelatedTraffic(object): def handle_data(self): cols = [col for col in self.df_dim_asin_related_traffic.columns if col != 'asin'] + for col in cols: + self.df_dim_asin_related_traffic = self.df_dim_asin_related_traffic.withColumn( + col, F.concat_ws(",", F.filter(F.split(F.col(col), ","), lambda x: (F.length(F.trim(x)) == 10))) + ).withColumn( + col, F.when(F.col(col) == "", None).otherwise(F.col(col)) + ) + # 将所有类型下的关联流量asin拼接 self.df_dim_asin_related_traffic = self.df_dim_asin_related_traffic.withColumn( "related_asin", F.concat_ws(",", *[F.col(col) for col in cols]) diff --git a/Pyspark_job/sqoop_export/export_dwt_asin_related_traffic.py b/Pyspark_job/sqoop_export/export_dwt_asin_related_traffic.py index e0bcf94..b6b7fc1 100644 --- a/Pyspark_job/sqoop_export/export_dwt_asin_related_traffic.py +++ b/Pyspark_job/sqoop_export/export_dwt_asin_related_traffic.py @@ -6,6 +6,7 @@ sys.path.append(os.path.dirname(sys.path[0])) from utils.ssh_util import SSHUtil from utils.common_util import CommonUtil from utils.db_util import DBUtil +from datetime import date if __name__ == '__main__': site_name = CommonUtil.get_sys_arg(1, None) @@ -13,32 +14,39 @@ if __name__ == '__main__': date_info = CommonUtil.get_sys_arg(3, None) print(f"执行参数为{sys.argv}") - # CommonUtil.judge_is_work_hours(site_name=site_name, date_type=date_type, date_info=date_info, - # principal='chenyuanjie', - # priority=2, export_tools_type=1, belonging_to_process=f'新ABA流程_{date_type}') + CommonUtil.judge_is_work_hours( + site_name=site_name, date_type=date_type, date_info=date_info, + principal='chenyuanjie', priority=3, export_tools_type=1, belonging_to_process='关联流量' + ) + db_type = 'postgresql_cluster' engine = DBUtil.get_db_engine(db_type, site_name) - dt = str(date_info).replace("-", "_") - export_tb = f"{site_name}_asin_related_{dt}" export_cols = [ 'asin', 'related_asin', - 'related_type', - 'related_type_num', - 'related_type_rate', - 'free_cnt', - 'free_rat', - 'pay_cnt', - 'pay_rat', - 'total_cnt' + 'related_type' ] + if date_type == 'month': + dt = str(date_info).replace("-", "_") + export_tb = f"{site_name}_asin_related_{dt}" + report_type = date_type + report_date = date_info + elif date_type == 'month_week': + export_tb = f"{site_name}_asin_related_30_day" + report_type = '30_day' + report_date = date.today() + else: + raise 'date_type有误!' + sql = f""" + DROP TABLE IF EXISTS {export_tb}; + CREATE TABLE {export_tb} (LIKE us_asin_related_template INCLUDING ALL); + ALTER TABLE {export_tb} ALTER COLUMN related_asin TYPE text; ALTER TABLE {export_tb} ALTER COLUMN related_type TYPE text; - ALTER TABLE {export_tb} ALTER COLUMN related_type_num TYPE text; - ALTER TABLE {export_tb} ALTER COLUMN related_type_rate TYPE text; """ + DBUtil.engine_exec_sql(engine, sql) partition_dict = { @@ -57,22 +65,37 @@ if __name__ == '__main__': client = SSHUtil.get_ssh_client() SSHUtil.exec_command_async(client, sh, ignore_err=False) client.close() + print("数据导出完成,准备修改数据类型!") sql = f""" ALTER TABLE {export_tb} - ALTER COLUMN related_asin TYPE VARCHAR(20)[] + ALTER COLUMN related_asin TYPE VARCHAR(10)[] USING string_to_array(related_asin, ','); ALTER TABLE {export_tb} ALTER COLUMN related_type TYPE INTEGER[] USING string_to_array(related_type, ',')::int[]; + """ + DBUtil.engine_exec_sql(engine, sql) + print("数据类型修改完成,准备创建索引!") - ALTER TABLE {export_tb} - ALTER COLUMN related_type_num TYPE INTEGER[] - USING string_to_array(related_type_num, ',')::int[]; + sql = f""" + CREATE INDEX {export_tb}_asin_idx ON {export_tb} USING btree ( + "asin" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST + ); - ALTER TABLE {export_tb} - ALTER COLUMN related_type_rate TYPE numeric(10,4)[] - USING string_to_array(related_type_rate, ',')::numeric(10,4)[]; + CREATE INDEX {export_tb}_related_asin_idx ON {export_tb} USING gin ( + "related_asin" COLLATE "pg_catalog"."default" "pg_catalog"."array_ops" + ); """ DBUtil.engine_exec_sql(engine, sql) + print("索引创建完成,准备插入流程记录表!") + + sql = f""" + REPLACE INTO selection.workflow_everyday + (site_name, report_date, status, status_val, table_name, date_type, page, is_end, remark, export_db_type) + VALUES + ('{site_name}', '{report_date}', '导出PG集群完成', 14, '{export_tb}', '{report_type}', '关联流量', '是', '关联流量模块数据', 'postgresql_cluster'); + """ + DBUtil.engine_exec_sql(DBUtil.get_db_engine('mysql', 'us'), sql) + print("success!")