dwt_asin_title_history.py 6.83 KB
import json
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))

from pyspark.sql.types import StringType, ArrayType

from utils.common_util import CommonUtil
from utils.redis_utils import RedisUtils
from utils.spark_util import SparkUtil
from pyspark.sql import functions as F, Window

"""
标题历史数据
"""


def partition(lst, size):
    for i in range(0, len(lst), size):
        yield lst[i:i + size]


class DwtAsinTitleHistory(object):

    def __init__(self, site_name, run_type):
        self.site_name = site_name
        self.run_type = run_type
        app_name = ":".join([self.__class__.__name__, self.site_name, self.run_type])
        self.spark = SparkUtil.get_spark_session(app_name)
        self.hive_tb = "dwt_asin_title_history"
        pass

    def merge_add(self, add_sql):
        print("===================当前新增的标题数据来源于:===================")
        print(add_sql)
        add_df = self.spark.sql(add_sql).cache()

        exist_df = self.spark.sql(
            f"""       
     select asin,
           title_time_list,
           title_list
    from dwt_asin_title_history
    where site_name = '{self.site_name}'
            """
        )

        exist_df = exist_df.withColumn("map", F.explode(F.arrays_zip(
            F.from_json(F.col('title_list'), ArrayType(StringType())).alias("title"),
            F.from_json(F.col('title_time_list'), ArrayType(StringType())).alias("time")
        ))).select(
            F.col("asin"),
            F.col("map.time").alias("time"),
            F.col("map.title").alias("title"),
        )

        exist_df = exist_df.unionByName(add_df, allowMissingColumns=True)
        #  根据时间序列判断异同
        exist_df = exist_df.withColumn("lastTitle", F.lag(F.col("title"), 1, None).over(window=Window.partitionBy(["asin"]).orderBy(
            F.col("time").asc()
        )))
        exist_df = exist_df.filter(' title != lastTitle or lastTitle is null')

        exist_df = exist_df.groupby("asin", "title").agg(
            F.min("time").alias("time")
        )

        #  合并成一条
        exist_df = exist_df.groupby(F.col("asin")).agg(
            F.sort_array(F.collect_list(F.struct("title", "time"))).alias("sort_array")
        ).select(
            F.col('asin'),
            F.to_json(F.col("sort_array.time")).alias("title_time_list"),
            F.to_json(F.col("sort_array.title")).alias("title_list"),
            F.lit(self.site_name).alias("site_name")
        )

        CommonUtil.save_or_update_table(
            spark_session=self.spark,
            hive_tb_name=self.hive_tb,
            partition_dict={
                "site_name": self.site_name
            },
            df_save=exist_df
        )
        pass

    def get_redis_set_key(self):
        if self.site_name == 'us':
            return "DwtAsinTitleHistory:set"
        return f"DwtAsinTitleHistory:{self.site_name}:set"

    def run_all(self):
        df_asin_detail_part = CommonUtil.select_partitions_df(self.spark, "ods_asin_detail")
        # for date_type in ['month', 'week', 'day', 'month_week']:
        # for date_type in ['month']:
        batchMap = {
            "month": 10,
            "week": 50,
            "day": 60,
            "month_week": 5,
        }

        redis_set_key = self.get_redis_set_key()
        date_type = CommonUtil.get_sys_arg(3, "week")

        for date_type in [date_type]:
            redis_client = RedisUtils.get_redis_client(decode_responses=True)
            exist_list = redis_client.smembers(redis_set_key)
            part_list = df_asin_detail_part.filter(f" site_name = '{self.site_name}' and date_type = '{date_type}' ").sort(
                F.col("date_info").asc())

            rows = part_list.toPandas().to_dict(orient='records')
            #  过滤掉已经计算过的
            rows = list(filter(lambda it: json.dumps(it) not in exist_list, rows))

            for part in partition(rows, batchMap.get(date_type)):
                site_name = part[0]['site_name']
                date_type = part[0]['date_type']
                date_infos = [it['date_info'] for it in part]
                print("=====================当前时间纬度为===============================")
                print(date_infos)
                if len(date_infos) > 0:
                    sql = f"""
                    select asin, title,  date_format(updated_at,'yyyy-MM-dd') as time
                    from ods_asin_detail
                    where site_name = '{site_name}'
                      and date_type = '{date_type}'
                      and date_info in ({CommonUtil.list_to_insql(date_infos)})
                      and title is not null 
                      and updated_at is not null 
                    """
                    #  合并
                    self.merge_add(sql)
                    print("success")
                    redis_client = RedisUtils.get_redis_client(decode_responses=True)
                    redis_client.sadd(redis_set_key, *[json.dumps(it) for it in part])
                    break
        pass

    def run(self):
        df_asin_detail_part = CommonUtil.select_partitions_df(self.spark, "ods_asin_detail")
        # 计算当前month_week最新5周
        redis_set_key = self.get_redis_set_key()
        redis_client = RedisUtils.get_redis_client(decode_responses=True)
        exist_list = redis_client.smembers(self.get_redis_set_key())

        if site_name == 'us':
            date_type = "month"
        else:
            date_type = "week"

        part_list = df_asin_detail_part.filter(f" site_name = '{self.site_name}' and date_type = '{date_type}' ") \
            .sort(F.col("date_info").desc()).limit(5)

        #  过滤掉已经计算过的
        rows = list(filter(lambda it: json.dumps(it) not in exist_list, part_list.toPandas().to_dict(orient='records')))
        if len(rows) > 0:
            date_infos = [it['date_info'] for it in rows]
            sql = f"""
                    select asin, title,  date_format(updated_at,'yyyy-MM-dd') as time
                    from ods_asin_detail
                    where site_name = '{self.site_name}'
                      and date_type = '{date_type}'
                      and date_info in ({CommonUtil.list_to_insql(date_infos)})
                      and title is not null 
                      and updated_at is not null 
                    """
            self.merge_add(sql)
            print("success")
            redis_client = RedisUtils.get_redis_client(decode_responses=True)
            redis_client.sadd(redis_set_key, *[json.dumps(it) for it in rows])
            pass
        pass


if __name__ == '__main__':
    site_name = CommonUtil.get_sys_arg(1, None)
    run_type = CommonUtil.get_sys_arg(2, None)
    obj = DwtAsinTitleHistory(site_name=site_name, run_type=run_type, )
    if run_type == 'all':
        obj.run_all()
    elif run_type == 'current':
        obj.run()