update_syn_pg14.py 4.88 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录

from utils.templates import Templates
from pyspark.sql import functions as F
from utils.spark_util import SparkUtil
from utils.db_util import DBUtil
from utils.common_util import CommonUtil


class UpdateSynPG14(Templates):

    def __init__(self, site_name, date_type, date_info):
        super().__init__()
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        app_name = f"{self.__class__.__name__}:{site_name}:{date_type}:{date_info}"
        self.spark = SparkUtil.get_spark_session(app_name)

        self.df_existing_asin = self.spark.sql(f"select 1+1;")
        self.df_related_asin = self.spark.sql(f"select 1+1;")
        self.df_asin_variation = self.spark.sql(f"select 1+1;")
        self.df_asin_stable = self.spark.sql(f"select 1+1;")
        self.df_save = self.spark.sql(f"select 1+1;")

    def read_data(self):
        print("读取ods_asin_detail表,获取所有已抓asin")
        sql = f"""
        select asin from ods_asin_detail where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}';
        """
        self.df_existing_asin = self.spark.sql(sqlQuery=sql).drop_duplicates(['asin']).cache()
        print("本月已抓asin如下:")
        self.df_existing_asin.show(10, True)

        print("从dwt_asin_related_traffic表中读取所有关联asin")
        sql = f"""
        select related_asin from dwt_asin_related_traffic where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}';
        """
        self.df_related_asin = self.spark.sql(sqlQuery=sql).cache()
        print("关联asin数据如下:")
        self.df_related_asin.show(10, True)

        print("读取dim_asin_variation_info表")
        sql = f"""
        select asin, 1 as asin_is_variation from dim_asin_variation_info where site_name='{self.site_name}';
        """
        self.df_asin_variation = self.spark.sql(sqlQuery=sql).drop_duplicates(['asin']).cache()
        print("asin_variation数据如下:")
        self.df_asin_variation.show(10, True)

        print("读取dim_asin_stable_info表")
        sql = f"""
        select asin, asin_volume as volume, asin_weight_str as weight_str from dim_asin_stable_info where site_name='{self.site_name}';
        """
        self.df_asin_stable = self.spark.sql(sqlQuery=sql).drop_duplicates(['asin']).cache()
        print("asin重量体积数据如下:")
        self.df_asin_stable.show(10, True)

    def handle_data(self):
        # 解析关联asin字段
        self.df_related_asin = self.df_related_asin.withColumn(
            'asin', F.explode(F.split(F.col('related_asin'), ','))
        ).select('asin').drop_duplicates(['asin'])

        # 找出需要补抓的asin,并关联详情数据
        self.df_save = self.df_related_asin.join(
            self.df_existing_asin, on='asin', how='anti'
        ).join(
            self.df_asin_variation, on='asin', how='left'
        ).join(
            self.df_asin_stable, on='asin', how='left'
        ).fillna({'asin_is_variation': 0})

        # 入库前处理
        self.df_save = self.df_save.filter(
            F.length(F.col('asin')) == 10
        ).withColumn(
            'state', F.lit(1)
        ).withColumn(
            'data_type', F.lit(2)
        ).withColumn(
            'date_info', F.lit(self.date_info)
        ).select(
            'asin', 'state', 'asin_is_variation', 'date_info', 'data_type', 'volume', 'weight_str'
        ).cache()
        # print("最终结果如下:")
        # self.df_save.show(10, True)
        # print(f"需要补抓的asin数据量为:{self.df_save.count()}")

    def save_data(self):
        # 爬虫数据库连接
        con_info = DBUtil.get_connection_info('postgresql_14', self.site_name)
        year_month = str(self.date_info).replace("-", "_")
        table_name = f'{self.site_name}_all_syn_st_month_{year_month}'

        self.df_save.write.format("jdbc") \
            .option("url", con_info["url"]) \
            .option("dbtable", table_name) \
            .option("user", con_info["username"]) \
            .option("password", con_info["pwd"]) \
            .mode("append") \
            .save()

        users = ["chenyuanjie", "pengyanbing"]
        title = f"关联流量:{self.site_name},{self.date_info}"
        content = f"关联流量需补抓的asin已导出到syn表,补抓量:{self.df_save.count()}"
        CommonUtil.send_wx_msg(users=users, title=title, content=content)

        pass


if __name__ == "__main__":
    site_name = sys.argv[1]  # 参数1:站点
    date_type = sys.argv[2]  # 参数2:类型:week/4_week/month/quarter
    date_info = sys.argv[3]  # 参数3:年-周/年-月/年-季, 比如: 2022-1
    handle_obj = UpdateSynPG14(site_name=site_name, date_type=date_type, date_info=date_info)
    handle_obj.run()