dwt_asin_title_history.py 6.83 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185
import json
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))

from pyspark.sql.types import StringType, ArrayType

from utils.common_util import CommonUtil
from utils.redis_utils import RedisUtils
from utils.spark_util import SparkUtil
from pyspark.sql import functions as F, Window

"""
标题历史数据
"""


def partition(lst, size):
    for i in range(0, len(lst), size):
        yield lst[i:i + size]


class DwtAsinTitleHistory(object):

    def __init__(self, site_name, run_type):
        self.site_name = site_name
        self.run_type = run_type
        app_name = ":".join([self.__class__.__name__, self.site_name, self.run_type])
        self.spark = SparkUtil.get_spark_session(app_name)
        self.hive_tb = "dwt_asin_title_history"
        pass

    def merge_add(self, add_sql):
        print("===================当前新增的标题数据来源于:===================")
        print(add_sql)
        add_df = self.spark.sql(add_sql).cache()

        exist_df = self.spark.sql(
            f"""       
     select asin,
           title_time_list,
           title_list
    from dwt_asin_title_history
    where site_name = '{self.site_name}'
            """
        )

        exist_df = exist_df.withColumn("map", F.explode(F.arrays_zip(
            F.from_json(F.col('title_list'), ArrayType(StringType())).alias("title"),
            F.from_json(F.col('title_time_list'), ArrayType(StringType())).alias("time")
        ))).select(
            F.col("asin"),
            F.col("map.time").alias("time"),
            F.col("map.title").alias("title"),
        )

        exist_df = exist_df.unionByName(add_df, allowMissingColumns=True)
        #  根据时间序列判断异同
        exist_df = exist_df.withColumn("lastTitle", F.lag(F.col("title"), 1, None).over(window=Window.partitionBy(["asin"]).orderBy(
            F.col("time").asc()
        )))
        exist_df = exist_df.filter(' title != lastTitle or lastTitle is null')

        exist_df = exist_df.groupby("asin", "title").agg(
            F.min("time").alias("time")
        )

        #  合并成一条
        exist_df = exist_df.groupby(F.col("asin")).agg(
            F.sort_array(F.collect_list(F.struct("title", "time"))).alias("sort_array")
        ).select(
            F.col('asin'),
            F.to_json(F.col("sort_array.time")).alias("title_time_list"),
            F.to_json(F.col("sort_array.title")).alias("title_list"),
            F.lit(self.site_name).alias("site_name")
        )

        CommonUtil.save_or_update_table(
            spark_session=self.spark,
            hive_tb_name=self.hive_tb,
            partition_dict={
                "site_name": self.site_name
            },
            df_save=exist_df
        )
        pass

    def get_redis_set_key(self):
        if self.site_name == 'us':
            return "DwtAsinTitleHistory:set"
        return f"DwtAsinTitleHistory:{self.site_name}:set"

    def run_all(self):
        df_asin_detail_part = CommonUtil.select_partitions_df(self.spark, "ods_asin_detail")
        # for date_type in ['month', 'week', 'day', 'month_week']:
        # for date_type in ['month']:
        batchMap = {
            "month": 10,
            "week": 50,
            "day": 60,
            "month_week": 5,
        }

        redis_set_key = self.get_redis_set_key()
        date_type = CommonUtil.get_sys_arg(3, "week")

        for date_type in [date_type]:
            redis_client = RedisUtils.get_redis_client(decode_responses=True)
            exist_list = redis_client.smembers(redis_set_key)
            part_list = df_asin_detail_part.filter(f" site_name = '{self.site_name}' and date_type = '{date_type}' ").sort(
                F.col("date_info").asc())

            rows = part_list.toPandas().to_dict(orient='records')
            #  过滤掉已经计算过的
            rows = list(filter(lambda it: json.dumps(it) not in exist_list, rows))

            for part in partition(rows, batchMap.get(date_type)):
                site_name = part[0]['site_name']
                date_type = part[0]['date_type']
                date_infos = [it['date_info'] for it in part]
                print("=====================当前时间纬度为===============================")
                print(date_infos)
                if len(date_infos) > 0:
                    sql = f"""
                    select asin, title,  date_format(updated_at,'yyyy-MM-dd') as time
                    from ods_asin_detail
                    where site_name = '{site_name}'
                      and date_type = '{date_type}'
                      and date_info in ({CommonUtil.list_to_insql(date_infos)})
                      and title is not null 
                      and updated_at is not null 
                    """
                    #  合并
                    self.merge_add(sql)
                    print("success")
                    redis_client = RedisUtils.get_redis_client(decode_responses=True)
                    redis_client.sadd(redis_set_key, *[json.dumps(it) for it in part])
                    break
        pass

    def run(self):
        df_asin_detail_part = CommonUtil.select_partitions_df(self.spark, "ods_asin_detail")
        # 计算当前month_week最新5周
        redis_set_key = self.get_redis_set_key()
        redis_client = RedisUtils.get_redis_client(decode_responses=True)
        exist_list = redis_client.smembers(self.get_redis_set_key())

        if site_name == 'us':
            date_type = "month"
        else:
            date_type = "week"

        part_list = df_asin_detail_part.filter(f" site_name = '{self.site_name}' and date_type = '{date_type}' ") \
            .sort(F.col("date_info").desc()).limit(5)

        #  过滤掉已经计算过的
        rows = list(filter(lambda it: json.dumps(it) not in exist_list, part_list.toPandas().to_dict(orient='records')))
        if len(rows) > 0:
            date_infos = [it['date_info'] for it in rows]
            sql = f"""
                    select asin, title,  date_format(updated_at,'yyyy-MM-dd') as time
                    from ods_asin_detail
                    where site_name = '{self.site_name}'
                      and date_type = '{date_type}'
                      and date_info in ({CommonUtil.list_to_insql(date_infos)})
                      and title is not null 
                      and updated_at is not null 
                    """
            self.merge_add(sql)
            print("success")
            redis_client = RedisUtils.get_redis_client(decode_responses=True)
            redis_client.sadd(redis_set_key, *[json.dumps(it) for it in rows])
            pass
        pass


if __name__ == '__main__':
    site_name = CommonUtil.get_sys_arg(1, None)
    run_type = CommonUtil.get_sys_arg(2, None)
    obj = DwtAsinTitleHistory(site_name=site_name, run_type=run_type, )
    if run_type == 'all':
        obj.run_all()
    elif run_type == 'current':
        obj.run()