dwd_day_asin.py 5.97 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))
from pyspark.sql.types import BooleanType
from utils.db_util import DBUtil
from utils.common_util import CommonUtil, DateTypes
from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil
from pyspark.sql import functions as F
from yswg_utils.common_udf import category_craw_flag

"""
获取当前榜单系统中没有asin详情的数据
"""


class DwdDayAsin(object):

    def __init__(self, site_name, date_info):
        self.site_name = site_name
        self.date_info = date_info
        app_name = f"{self.__class__.__name__}:{site_name}:{date_info}"
        self.spark = SparkUtil.get_spark_session(app_name)
        self.udf_category_craw_flag = F.udf(category_craw_flag, BooleanType())
        self.hive_tb = "dwd_day_asin"

    def run(self):
        # 判断前一日asin有没有导入没有的话警告
        df_self_asin_part = CommonUtil.select_partitions_df(self.spark, "ods_self_asin_detail")
        last_day = CommonUtil.get_day_offset(self.date_info, -1)
        last_7day = CommonUtil.get_day_offset(self.date_info, -7)

        flag = df_self_asin_part.filter(
            f"date_type = '{DateTypes.day.name}' and site_name = '{self.site_name}' and date_info = '{last_day}'").count()

        if flag == 0:
            CommonUtil.send_wx_msg(['wujicang', 'hezhe'], "异常提醒", f"{last_day}日asin无数据,请检查")

        month = CommonUtil.reformat_date(date_info, "%Y-%m-%d", "%Y-%m", )
        sql = f"""
with asin_list1 as (
	select asin, category_first_id
	from dwt_bsr_asin_detail
	where title is null
	  and site_name = '{self.site_name}'
	  and date_info = '{month}'
),
	 asin_list2 as (
		 select asin, category_first_id
		 from dwt_nsr_asin_detail
		 where title is null
		   and site_name = '{self.site_name}'
		   and date_info = '{month}'
	 ),
	 asin_exist as (
		 select asin
		 from dim_cal_asin_history_detail
		 where site_name = '{self.site_name}'
	 ),
--  失效的asin
	 asin_lost as (
        select asin
        from dwd_day_asin
        where date_info >= '{last_7day}'
          and date_info < '{date_info}'
          and site_name = '{self.site_name}'
          and queue_name = 'day_queue'
          and craw_state = 2
	 ),
	 asin_all as (
		 select asin, category_first_id
		 from asin_list1
		 union all
		 select asin, category_first_id
		 from asin_list2
	 )

select asin_all.asin, category_first_id
from asin_all
		 left anti join asin_exist on asin_all.asin = asin_exist.asin
		 left anti join asin_lost  on asin_all.asin = asin_lost.asin
"""
        #  left anti join 类似 not exist
        print("======================查询sql如下======================")
        print(sql)
        df_save = self.spark.sql(sql)
        #  过滤不爬的asin
        df_save = df_save.filter(self.udf_category_craw_flag(F.col("category_first_id")))
        # limit 限制数量
        df_save = df_save.dropDuplicates(['asin'])
        count_all = df_save.count()
        count = 40 * 10000
        df_save = df_save.limit(count)
        df_save = df_save.select(
            "asin",
            F.lit("day_queue").alias("queue_name"),
            F.lit(0).alias("craw_state"),
            F.lit(self.site_name).alias("site_name"),
            F.lit(self.date_info).alias("date_info"),
        ).orderBy(F.col("asin").desc())

        count = df_save.count()

        partition_dict = {
            "site_name": self.site_name,
            "date_info": self.date_info,
        }
        hdfs_path = CommonUtil.build_hdfs_path(self.hive_tb, partition_dict=partition_dict)
        HdfsUtils.delete_file_in_folder(hdfs_path)
        df_save = df_save.repartition(1)
        partition_by = list(partition_dict.keys())
        print(f"当前存储的表名为:{self.hive_tb},分区为{partition_by}", )
        df_save.write.saveAsTable(name=self.hive_tb, format='hive', mode='append', partitionBy=partition_by)
        CommonUtil.send_wx_msg(['wujicang'], "提醒", f"{date_info}日asin计算成功,条数为{count}/{count_all}")

    def run_update(self):
        # 获取未爬取的数据集合
        craw_sql = f"""
        select asin
        from {self.site_name}_self_all_syn
        where data_type = 4
          and date_info = '{self.date_info}'
          and state in (4, 12, 13)
        """
        conn_info = DBUtil.get_connection_info("mysql", self.site_name)
        craw_asin_df = SparkUtil.read_jdbc_query(
            session=self.spark,
            url=conn_info["url"],
            pwd=conn_info["pwd"],
            username=conn_info["username"],
            query=craw_sql
        ).cache()
        craw_asin_df = craw_asin_df.dropDuplicates(['asin'])
        # 标记
        sql = f"""
select asin, queue_name, craw_state,date_info,site_name
from dwd_day_asin
where date_info = '{self.date_info}'
  and site_name = '{self.site_name}'
  and queue_name = 'day_queue'
"""
        df_update = self.spark.sql(sql)
        if df_update.count() == 0:
            return
        df_update = df_update.join(craw_asin_df, on=['asin'], how="left").select(
            df_update['asin'].alias("asin"),
            F.col("queue_name"),
            F.when(craw_asin_df['asin'].isNotNull(), F.lit("2")).otherwise(F.lit("1")).alias("craw_state"),
            F.col("date_info"),
            F.col("site_name"),
        )
        # 更新
        partition_dict = {
            "site_name": self.site_name,
            "date_info": self.date_info,
        }
        #  更新表
        CommonUtil.save_or_update_table(
            spark_session=self.spark,
            hive_tb_name=self.hive_tb,
            partition_dict=partition_dict,
            df_save=df_update
        )
        pass


if __name__ == '__main__':
    site_name = CommonUtil.get_sys_arg(1, None)
    date_info = CommonUtil.get_sys_arg(2, None)
    update_flag = CommonUtil.get_sys_arg(len(sys.argv) - 1, None) == 'update'
    obj = DwdDayAsin(site_name, date_info)
    if update_flag:
        #  更新数据
        obj.run_update()
    else:
        # 构建数据
        obj.run()