import json import os import sys sys.path.append(os.path.dirname(sys.path[0])) from pyspark.sql.types import StringType, ArrayType from utils.common_util import CommonUtil from utils.redis_utils import RedisUtils from utils.spark_util import SparkUtil from pyspark.sql import functions as F, Window """ 标题历史数据 """ def partition(lst, size): for i in range(0, len(lst), size): yield lst[i:i + size] class DwtAsinTitleHistory(object): def __init__(self, site_name, run_type): self.site_name = site_name self.run_type = run_type app_name = ":".join([self.__class__.__name__, self.site_name, self.run_type]) self.spark = SparkUtil.get_spark_session(app_name) self.hive_tb = "dwt_asin_title_history" pass def merge_add(self, add_sql): print("===================当前新增的标题数据来源于:===================") print(add_sql) add_df = self.spark.sql(add_sql).cache() exist_df = self.spark.sql( f""" select asin, title_time_list, title_list from dwt_asin_title_history where site_name = '{self.site_name}' """ ) exist_df = exist_df.withColumn("map", F.explode(F.arrays_zip( F.from_json(F.col('title_list'), ArrayType(StringType())).alias("title"), F.from_json(F.col('title_time_list'), ArrayType(StringType())).alias("time") ))).select( F.col("asin"), F.col("map.time").alias("time"), F.col("map.title").alias("title"), ) exist_df = exist_df.unionByName(add_df, allowMissingColumns=True) # 根据时间序列判断异同 exist_df = exist_df.withColumn("lastTitle", F.lag(F.col("title"), 1, None).over(window=Window.partitionBy(["asin"]).orderBy( F.col("time").asc() ))) exist_df = exist_df.filter(' title != lastTitle or lastTitle is null') exist_df = exist_df.groupby("asin", "title").agg( F.min("time").alias("time") ) # 合并成一条 exist_df = exist_df.groupby(F.col("asin")).agg( F.sort_array(F.collect_list(F.struct("title", "time"))).alias("sort_array") ).select( F.col('asin'), F.to_json(F.col("sort_array.time")).alias("title_time_list"), F.to_json(F.col("sort_array.title")).alias("title_list"), F.lit(self.site_name).alias("site_name") ) CommonUtil.save_or_update_table( spark_session=self.spark, hive_tb_name=self.hive_tb, partition_dict={ "site_name": self.site_name }, df_save=exist_df ) pass def get_redis_set_key(self): if self.site_name == 'us': return "DwtAsinTitleHistory:set" return f"DwtAsinTitleHistory:{self.site_name}:set" def run_all(self): df_asin_detail_part = CommonUtil.select_partitions_df(self.spark, "ods_asin_detail") # for date_type in ['month', 'week', 'day', 'month_week']: # for date_type in ['month']: batchMap = { "month": 10, "week": 50, "day": 60, "month_week": 5, } redis_set_key = self.get_redis_set_key() date_type = CommonUtil.get_sys_arg(3, "week") for date_type in [date_type]: redis_client = RedisUtils.get_redis_client(decode_responses=True) exist_list = redis_client.smembers(redis_set_key) part_list = df_asin_detail_part.filter(f" site_name = '{self.site_name}' and date_type = '{date_type}' ").sort( F.col("date_info").asc()) rows = part_list.toPandas().to_dict(orient='records') # 过滤掉已经计算过的 rows = list(filter(lambda it: json.dumps(it) not in exist_list, rows)) for part in partition(rows, batchMap.get(date_type)): site_name = part[0]['site_name'] date_type = part[0]['date_type'] date_infos = [it['date_info'] for it in part] print("=====================当前时间纬度为===============================") print(date_infos) if len(date_infos) > 0: sql = f""" select asin, title, date_format(updated_at,'yyyy-MM-dd') as time from ods_asin_detail where site_name = '{site_name}' and date_type = '{date_type}' and date_info in ({CommonUtil.list_to_insql(date_infos)}) and title is not null and updated_at is not null """ # 合并 self.merge_add(sql) print("success") redis_client = RedisUtils.get_redis_client(decode_responses=True) redis_client.sadd(redis_set_key, *[json.dumps(it) for it in part]) break pass def run(self): df_asin_detail_part = CommonUtil.select_partitions_df(self.spark, "ods_asin_detail") # 计算当前month_week最新5周 redis_set_key = self.get_redis_set_key() redis_client = RedisUtils.get_redis_client(decode_responses=True) exist_list = redis_client.smembers(self.get_redis_set_key()) if site_name == 'us': date_type = "month" else: date_type = "week" part_list = df_asin_detail_part.filter(f" site_name = '{self.site_name}' and date_type = '{date_type}' ") \ .sort(F.col("date_info").desc()).limit(5) # 过滤掉已经计算过的 rows = list(filter(lambda it: json.dumps(it) not in exist_list, part_list.toPandas().to_dict(orient='records'))) if len(rows) > 0: date_infos = [it['date_info'] for it in rows] sql = f""" select asin, title, date_format(updated_at,'yyyy-MM-dd') as time from ods_asin_detail where site_name = '{self.site_name}' and date_type = '{date_type}' and date_info in ({CommonUtil.list_to_insql(date_infos)}) and title is not null and updated_at is not null """ self.merge_add(sql) print("success") redis_client = RedisUtils.get_redis_client(decode_responses=True) redis_client.sadd(redis_set_key, *[json.dumps(it) for it in rows]) pass pass if __name__ == '__main__': site_name = CommonUtil.get_sys_arg(1, None) run_type = CommonUtil.get_sys_arg(2, None) obj = DwtAsinTitleHistory(site_name=site_name, run_type=run_type, ) if run_type == 'all': obj.run_all() elif run_type == 'current': obj.run()