dim_asin_related_traffic.py 16.3 KB
Newer Older
1 2
import os
import sys
3
import re
4 5 6 7 8 9 10 11

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录

from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql import DataFrame
from utils.hdfs_utils import HdfsUtils
12 13
from utils.spark_util import SparkUtil
from utils.common_util import CommonUtil
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34


class DimAsinRelatedTraffic(object):

    def __init__(self, site_name, date_type, date_info):
        super().__init__()
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        self.hive_tb = f'dim_asin_related_traffic'
        self.partition_dict = {
            "site_name": site_name,
            "date_type": date_type,
            "date_info": date_info
        }
        self.hdfs_path = CommonUtil.build_hdfs_path(self.hive_tb, partition_dict=self.partition_dict)
        app_name = f"{self.__class__.__name__}:{site_name}:{date_type}:{date_info}"
        self.spark = SparkUtil.get_spark_session(app_name)
        self.partitions_by = ['site_name', 'date_type', 'date_info']

        self.df_asin_detail = self.spark.sql(f"select 1+1;")
35
        self.df_self_asin_detail = self.spark.sql(f"select 1+1;")
36 37 38 39 40 41 42 43
        self.df_result_list_json = self.spark.sql(f"select 1+1;")
        self.df_together_asin = self.spark.sql(f"select 1+1;")
        self.df_sp_initial_seen_asins_json = self.spark.sql(f"select 1+1;")
        self.df_sp_4stars_initial_seen_asins_json = self.spark.sql(f"select 1+1;")
        self.df_sp_delivery_initial_seen_asins_json = self.spark.sql(f"select 1+1;")
        self.df_compare_similar_asin_json = self.spark.sql(f"select 1+1;")
        self.df_bundles_this_asins_json = self.spark.sql(f"select 1+1;")
        self.df_save = self.spark.sql(f"select 1+1;")
44
        self.df_updated_at = self.spark.sql(f"select 1+1;")
45 46 47

        self.u_categorize_flow = F.udf(self.categorize_flow, StringType())
        self.u_merge_df = F.udf(self.merge_df, StringType())
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
        self.u_repair_json = F.udf(self.repair_json, StringType())

    @staticmethod
    def repair_json(json_str):
        """修复指定字段的数组格式"""
        if not json_str:
            return json_str

        # 匹配三种情况:1) 已格式化的数组 2) 引号包裹的字符串 3) 无引号的值
        pattern = re.compile(
            r'("Brand in this category on Amazon"\s*:\s*)(\[[^\]]+\]|"([^"]+)"|([^,{}"]+))'
        )

        def replace_func(m):
            # 如果已经是数组格式(group(2)以[开头),直接返回
            if m.group(2).startswith('['):
                return m.group(0)  # 返回整个匹配,不做修改

            # 处理字符串值或无引号值
            raw_value = m.group(3) or m.group(4)
            items = [v.strip() for v in raw_value.split(",") if v.strip()]
            return f'{m.group(1)}["{""",""".join(items)}"]'

        return pattern.sub(replace_func, json_str)
72 73 74

    @staticmethod
    def merge_df(col1, col2):
75 76 77 78 79 80 81 82 83
        if not col1 or col1.strip() == "":
            return col2 if (col2 and col2.strip()) else None
        if not col2 or col2.strip() == "":
            return col1 if (col1 and col1.strip()) else None

        list1 = list(set(x.strip() for x in col1.split(",") if x.strip()))
        list2 = list(set(x.strip() for x in col2.split(",") if x.strip()))
        combined = list(set(list1 + list2))
        return ",".join(combined) if combined else None
84 85 86 87 88 89

    @staticmethod
    def categorize_flow(key):
        key_lower = key.lower()
        if key_lower == '4 stars and above':
            return "four_star_above"
90
        elif key_lower in ('brands you might like', 'more to consider from our brands', 'similar brands on amazon',
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
                           'exclusive items from our brands', 'more from frequently bought brands'):
            return "brand_recommendation"
        elif key_lower in ('products related to this item', 'based on your recent views', 'customer also bought',
                           'deals on related products', 'similar items in new arrivals', 'top rated similar items',
                           'compare with similar items', 'discover similar items'):
            return "similar_items"
        elif key_lower in ('customers who viewed this item also viewed', 'customers frequently viewed'):
            return "look_and_look"
        elif key_lower.startswith('customers also'):
            return "look_also_look"
        elif key_lower.startswith('what other items do customers buy after viewing this item'):
            return "look_but_bought"
        elif key_lower in ('make it a bundle', 'bundles with this item'):
            return "bundle_bought"
        elif key_lower in ('buy it with', 'frequently bought together'):
            return "combination_bought"
        elif key_lower in ('more items to explore', 'based on your recent shopping trends',
                           'related products with free delivery on eligible orders') \
                or key_lower.startswith('explore more'):
            return "more_relevant"
        elif key_lower == 'customers who bought this item also bought':
            return "bought_and_bought"
        elif key_lower == 'sponsored products related to this item':
            return "product_adv"
        elif key_lower in ('brands related to this category on amazon', 'brand in this category on amazon'):
            return "brand_adv"
        else:
            return "other"

    @staticmethod
    def other_json_handle(
            df: DataFrame,
            json_column: str,
            asin_key: str,
            output_column: str
    ) -> DataFrame:
        """
        从JSON数组字段中提取特定键的值并去重
        参数:
            df: 输入的DataFrame
            json_column: 包含JSON数组的列名
            asin_key: 要从JSON对象中提取的键名
            output_column: 输出结果的列名
        返回:
            包含去重后值的DataFrame(只有一列)
        """
        return df.withColumn(
138
            'json_array', F.from_json(F.col(json_column), ArrayType(MapType(StringType(), StringType())))
139 140 141 142 143
        ).withColumn(
            "exploded_item", F.explode("json_array")
        ).withColumn(
            "flow_asin", F.col(f"exploded_item.{asin_key}")
        ).filter(
144
            F.col('flow_asin').isNotNull() & (F.col('flow_asin') != "") & (F.length(F.col('flow_asin')) == 10)
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
        ).groupBy("asin").agg(
            F.concat_ws(",", F.collect_set("flow_asin")).alias(f"{output_column}")
        )

    def read_data(self):
        print("读取ods_asin_detail中流量相关数据")
        sql = f"""
        select 
            asin, 
            together_asin, 
            sp_initial_seen_asins_json, 
            sp_4stars_initial_seen_asins_json, 
            sp_delivery_initial_seen_asins_json, 
            compare_similar_asin_json, 
            result_list_json, 
            bundles_this_asins_json, 
            updated_at 
162
        from ods_asin_detail 
163
        where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}' and asin is not null;
164 165
        """
        self.df_asin_detail = self.spark.sql(sqlQuery=sql)
166 167 168 169 170 171 172 173 174 175 176 177 178

        print("读取ods_self_asin_related_traffic数据")
        sql = f"""
        select 
            asin, 
            together_asin, 
            sp_initial_seen_asins_json, 
            sp_4stars_initial_seen_asins_json, 
            sp_delivery_initial_seen_asins_json, 
            compare_similar_asin_json, 
            result_list_json, 
            null as bundles_this_asins_json, 
            updated_at 
179
        from ods_self_asin_related_traffic where site_name='{self.site_name}' and asin is not null;
180 181 182 183
        """
        self.df_self_asin_detail = self.spark.sql(sqlQuery=sql)

        # 合并去重
184
        window = Window.partitionBy(['asin']).orderBy(self.df_asin_detail.updated_at.desc_nulls_last())
185 186 187
        self.df_asin_detail = self.df_asin_detail.unionByName(
            self.df_self_asin_detail, allowMissingColumns=False
        ).withColumn(
188
            "dt_rank", F.row_number().over(window=window)
189
        ).filter("dt_rank=1").drop("dt_rank").cache()
190 191 192 193 194
        print("详情数据如下:")
        self.df_asin_detail.show(10, True)

    # 处理result_list_json字段
    def handle_result_list_json(self):
195 196 197 198 199
        json_schema = ArrayType(MapType(StringType(), ArrayType(StringType())))
        self.df_result_list_json = self.df_asin_detail.filter(
            F.col('result_list_json').isNotNull()
        ).select(
            'asin', F.from_json(self.u_repair_json(F.col('result_list_json')), json_schema).alias('parsed_json')
200
        ).withColumn(
201 202 203
            "kv", F.explode("parsed_json")
        ).select(
            "asin", F.explode("kv").alias("key", "value")
204
        ).withColumn(
205
            "category", self.u_categorize_flow(F.col("key"))
206
        ).filter(
207 208 209
            F.col("category") != "other"
        ).withColumn(
            "distinct_values", F.array_distinct("value")
210
        ).filter(
211 212 213 214 215 216 217 218
            F.expr("size(distinct_values) > 0")
        ).select(
            'asin', 'category', 'distinct_values'
        ).groupBy(["asin", "category"]).agg(
            F.concat_ws(",", F.array_distinct(F.flatten(F.collect_list("distinct_values")))).alias("values")
        ).groupBy("asin") \
            .pivot("category") \
            .agg(F.first("values")) \
219 220 221 222 223 224 225
            .cache()
        print("处理result_list_json字段结果如下:")
        self.df_result_list_json.show(10, True)

    # 处理其他流量字段
    def handle_other_field(self):
        # 处理sp_initial_seen_asins_json字段
226 227 228
        self.df_sp_initial_seen_asins_json = self.df_asin_detail\
            .select('asin', 'sp_initial_seen_asins_json')\
            .filter(F.col('sp_initial_seen_asins_json').isNotNull())
229 230 231 232 233 234 235 236 237 238
        self.df_sp_initial_seen_asins_json = self.other_json_handle(
            df=self.df_sp_initial_seen_asins_json,
            json_column='sp_initial_seen_asins_json',
            asin_key='seen_asins',
            output_column='similar_items'
        ).cache()
        print("处理sp_initial_seen_asins_json字段结果如下:")
        self.df_sp_initial_seen_asins_json.show(10, True)

        # 处理sp_4stars_initial_seen_asins_json字段
239 240 241
        self.df_sp_4stars_initial_seen_asins_json = self.df_asin_detail\
            .select('asin', 'sp_4stars_initial_seen_asins_json')\
            .filter(F.col('sp_4stars_initial_seen_asins_json').isNotNull())
242 243 244 245 246 247 248 249 250 251
        self.df_sp_4stars_initial_seen_asins_json = self.other_json_handle(
            df=self.df_sp_4stars_initial_seen_asins_json,
            json_column='sp_4stars_initial_seen_asins_json',
            asin_key='seen_asins',
            output_column='four_star_above'
        ).cache()
        print("处理sp_4stars_initial_seen_asins_json字段结果如下:")
        self.df_sp_4stars_initial_seen_asins_json.show(10, True)

        # 处理sp_delivery_initial_seen_asins_json字段
252 253 254
        self.df_sp_delivery_initial_seen_asins_json = self.df_asin_detail\
            .select('asin', 'sp_delivery_initial_seen_asins_json')\
            .filter(F.col('sp_delivery_initial_seen_asins_json').isNotNull())
255 256 257 258 259 260 261 262 263 264
        self.df_sp_delivery_initial_seen_asins_json = self.other_json_handle(
            df=self.df_sp_delivery_initial_seen_asins_json,
            json_column='sp_delivery_initial_seen_asins_json',
            asin_key='seen_asins',
            output_column='more_relevant'
        ).cache()
        print("处理sp_delivery_initial_seen_asins_json字段结果如下:")
        self.df_sp_delivery_initial_seen_asins_json.show(10, True)

        # 处理compare_similar_asin_json字段
265 266 267
        self.df_compare_similar_asin_json = self.df_asin_detail\
            .select('asin', 'compare_similar_asin_json')\
            .filter(F.col('compare_similar_asin_json').isNotNull())
268 269 270 271 272 273 274 275 276 277
        self.df_compare_similar_asin_json = self.other_json_handle(
            df=self.df_compare_similar_asin_json,
            json_column='compare_similar_asin_json',
            asin_key='compare_asin',
            output_column='similar_items'
        ).cache()
        print("处理compare_similar_asin_json字段结果如下:")
        self.df_compare_similar_asin_json.show(10, True)

        # 处理bundles_this_asins_json字段
278 279 280
        self.df_bundles_this_asins_json = self.df_asin_detail\
            .select('asin', 'bundles_this_asins_json')\
            .filter(F.col('bundles_this_asins_json').isNotNull())
281 282 283 284 285 286 287 288 289 290
        self.df_bundles_this_asins_json = self.other_json_handle(
            df=self.df_bundles_this_asins_json,
            json_column='bundles_this_asins_json',
            asin_key='bundles_Asins',
            output_column='bundle_bought'
        ).cache()
        print("处理bundles_this_asins_json字段结果如下:")
        self.df_bundles_this_asins_json.show(10, True)

        # 处理together_asin字段
291 292 293
        self.df_together_asin = self.df_asin_detail.select('asin', 'together_asin').filter(
            F.col('together_asin').isNotNull()
        ).withColumnRenamed(
294 295 296 297 298 299 300 301 302 303 304 305 306 307 308
            'together_asin', 'combination_bought'
        ).cache()
        print("处理together_asin字段结果如下:")
        self.df_together_asin.show(10, True)

    # 合并所有df
    def handle_merge_df(self):
        all_merge_df = [self.df_together_asin, self.df_sp_initial_seen_asins_json,
                        self.df_sp_4stars_initial_seen_asins_json, self.df_sp_delivery_initial_seen_asins_json,
                        self.df_compare_similar_asin_json, self.df_bundles_this_asins_json]
        main_df = self.df_result_list_json
        for df in all_merge_df:
            for col in set(df.columns) - {"asin"}:
                if col in main_df.columns:
                    df = df.withColumnRenamed(col, f'{col}_tmp')
309
                    main_df = main_df.join(df, "asin", "full") \
310 311 312
                        .withColumn(col, self.u_merge_df(F.col(col), F.col(f"{col}_tmp"))) \
                        .drop(f"{col}_tmp")
                else:
313
                    main_df = main_df.join(df, "asin", "full")
314

315 316 317 318 319 320 321 322 323 324
        self.df_save = main_df
        # 关联asin抓取时间
        self.df_updated_at = self.df_asin_detail.select(
            'asin', 'updated_at'
        ).withColumn(
            'updated_at', F.substring('updated_at', 1, 10)
        )
        self.df_save = self.df_save.join(
            self.df_updated_at, 'asin', 'left'
        ).cache()
325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378
        print("最终合并结果如下:")
        self.df_save.show(10, True)

        self.df_asin_detail.unpersist()
        self.df_result_list_json.unpersist()
        self.df_together_asin.unpersist()
        self.df_sp_initial_seen_asins_json.unpersist()
        self.df_sp_4stars_initial_seen_asins_json.unpersist()
        self.df_sp_delivery_initial_seen_asins_json.unpersist()
        self.df_compare_similar_asin_json.unpersist()
        self.df_bundles_this_asins_json.unpersist()

    # 数据落盘
    def save_data(self):
        # 确保df字段与hive表字段结构统一
        hive_tb_cols = [f.name for f in self.spark.table(f"{self.hive_tb}").schema]
        for col in hive_tb_cols:
            if col not in self.df_save.columns:
                self.df_save = self.df_save.withColumn(col, F.lit(None))

        # 分区字段处理
        self.df_save = self.df_save.withColumn(
            'site_name', F.lit(self.site_name)
        ).withColumn(
            'date_type', F.lit(self.date_type)
        ).withColumn(
            'date_info', F.lit(self.date_info)
        ).select(*hive_tb_cols).replace('', None)

        print(f"清除hdfs目录中:{self.hdfs_path}")
        HdfsUtils.delete_file_in_folder(self.hdfs_path)
        print(f"当前存储的表名为:{self.hive_tb},分区为:{self.partitions_by}")
        self.df_save.repartition(40).write.saveAsTable(name=self.hive_tb, format='hive', mode='append', partitionBy=self.partitions_by)
        print("success")

    def run(self):
        # 读取数据
        self.read_data()
        # 处理result_list_json字段
        self.handle_result_list_json()
        # 处理其他流量字段
        self.handle_other_field()
        # 合并所有df
        self.handle_merge_df()
        # 数据落盘
        self.save_data()


if __name__ == '__main__':
    site_name = sys.argv[1]
    date_type = sys.argv[2]
    date_info = sys.argv[3]
    handle_obj = DimAsinRelatedTraffic(site_name=site_name, date_type=date_type, date_info=date_info)
    handle_obj.run()