dwd_asin_to_pg.py 14.4 KB
import os
import random
import sys
import time
import traceback

import pandas as pd
from pyspark.storagelevel import StorageLevel
sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from utils.templates import Templates
# from ..utils.templates import Templates
# from AmazonSpider.pyspark_job.utils.templates import Templates
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
# 分组排序的udf窗口函数
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from utils.db_util import DbTypes, DBUtil
from utils.common_util import CommonUtil


class DwdAsinToPg(Templates):

    def __init__(self, site_name="us", date_type="week", date_info="2022-1"):
        super().__init__()
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        self.db_save = f"dwd_asin_to_pg"
        self.spark = self.create_spark_object(app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}")
        self.df_save = self.spark.sql(f"select 1+1;")
        self.df_st_asin_today = self.spark.sql(f"select 1+1;")
        self.df_st_asin_last_5_day = self.spark.sql(f"select 1+1;")
        self.df_asin_variation = self.spark.sql(f"select 1+1;")
        self.df_asin_stable = self.spark.sql(f"select 1+1;")
        self.partitions_by = ['site_name', 'date_type', 'date_info']
        self.date_info_tuple = "('2022-11-02')"
        self.date_info_tuple_last_5_day = "('2022-11-02')"
        self.reset_partitions(partitions_num=1)
        # self.date_info_tuple = ('2022-11-01', '2022-11-02', '2022-11-03', '2022-11-04', '2022-11-05', '2022-11-06')
        self.date_today = ''
        self.date_last_5_day_tuple = tuple()
        self.get_date_info_tuple()
        self.engine_pg14 = DBUtil.get_db_engine(db_type=DbTypes.postgresql_14.name, site_name=self.site_name)
        self.engine_mysql = DBUtil.get_db_engine(db_type=DbTypes.mysql.name, site_name=self.site_name)

    def truncate_or_update_table_syn(self):
        table = f"{self.site_name}_all_syn_st_month_{self.date_info.replace('-', '_')}"
        year, month = self.date_info.split("-")
        sql = f"select count(*) as st_count from {self.site_name}_brand_analytics_month_{year} where year={year} and month={int(month)} ;"
        df = pd.read_sql(sql, con=self.engine_mysql)
        print("sql:", sql, df.shape)
        if list(df.st_count)[0] >= 1_0000:
            # sql = f"select asin from {self.site_name}_all_syn_st_month_{year} where date_info='{self.date_info}'"
            sql = f"select asin from {table} where date_info='{self.date_info}';"
            print("sql:", sql, df.shape)
            pdf_asin = pd.read_sql(sql, con=self.engine_pg14)
            schema = StructType([
                StructField('asin', StringType(), True),
            ])
            df_asin = self.spark.createDataFrame(pdf_asin, schema=schema)
            # self.df_save = self.df_save.join(self.df_save, df_asin.asin == self.df_save.asin, "left_anti")
            df_save_alias = self.df_save.alias("df_save")
            df_asin_alias = df_asin.alias("df_asin")
            self.df_save = df_save_alias.join(df_asin_alias, df_asin_alias.asin == df_save_alias.asin, "left_anti")
            self.df_save.show(10, truncate=False)
            print(f"df_asin: {df_asin.count()}, self.df_save: {self.df_save.count()}")
        else:
            while True:
                try:
                    with self.engine_pg14.begin() as conn:
                        sql_truncate = f"truncate {table};"
                        print("月搜索词没有导入进来, 需要先清空表, sql_truncate:", sql_truncate)
                        conn.execute(sql_truncate)
                    break
                except Exception as e:
                    print(e, traceback.format_exc())
                    time.sleep(random.randint(3, 10))
                    self.engine_pg14 = DBUtil.get_db_engine(db_type=DbTypes.postgresql_14.name,
                                                            site_name=self.site_name)
                    continue

    def truncate_or_update_table_syn_old(self):
        table = f"{self.site_name}_all_syn_st_month_{self.date_info.replace('-', '_')}" if self.site_name == 'us' else f"{self.site_name}_all_syn_st_{self.date_info.replace('-', '_')}"
        if site_name == 'us':
            year, month = self.date_info.split("-")
            sql = f"select count(*) as st_count from {self.site_name}_brand_analytics_month_{year} where year={year} and month={int(month)} ;"
        # else:
        #     year, week = self.date_info.split("-")
        #     sql = f"select count(*) from {self.site_name}_brand_analytics_{year} where week={week};"
            df = pd.read_sql(sql, con=self.engine_mysql)
            print("sql:", sql, df.shape)
            if list(df.st_count)[0] >= 100_0000:
                sql = f"select asin from us_all_syn_st_month_{year} where date_info='{self.date_info}'"
                pdf_asin = pd.read_sql(sql, con=self.engine_pg14)
                schema = StructType([
                    StructField('asin', StringType(), True),
                ])
                df_asin = self.spark.createDataFrame(pdf_asin, schema=schema)
                # self.df_save = self.df_save.join(self.df_save, df_asin.asin == self.df_save.asin, "left_anti")
                df_save_alias = self.df_save.alias("df_save")
                df_asin_alias = df_asin.alias("df_asin")
                self.df_save = df_save_alias.join(df_asin_alias, df_asin_alias.asin == df_save_alias.asin, "left_anti")

                self.df_save.show(10, truncate=False)
                print(f"df_asin: {df_asin.count()}, self.df_save: {self.df_save.count()}")
            else:
                while True:
                    try:
                        with self.engine_pg14.begin() as conn:
                            sql_truncate = f"truncate {table};"
                            print("sql_truncate:", sql_truncate)
                            conn.execute(sql_truncate)
                        break
                    except Exception as e:
                        print(e, traceback.format_exc())
                        time.sleep(random.randint(3, 10))
                        self.engine_pg14 = DBUtil.get_db_engine(db_type=DbTypes.postgresql_14.name,
                                                                site_name=self.site_name)
                        continue
        else:
            while True:
                try:
                    with self.engine_pg14.begin() as conn:
                        sql_truncate = f"truncate {table};"
                        print("sql_truncate:", sql_truncate)
                        conn.execute(sql_truncate)
                    break
                except Exception as e:
                    print(e, traceback.format_exc())
                    time.sleep(random.randint(3, 10))
                    self.engine_pg14 = DBUtil.get_db_engine(db_type=DbTypes.postgresql_14.name, site_name=self.site_name)
                    continue

    def get_date_info_tuple(self):
        self.df_date = self.spark.sql(f"select * from dim_date_20_to_30;")
        df = self.df_date.toPandas()
        if self.date_type == 'day':
            df_today = df.loc[df.date == f'{self.date_info}']
            id_today = list(df_today.id)[0]
            id_last_5_day = id_today - 4
            print("id_today, id_last_5_day:", id_today, id_last_5_day)
            df_last_5_day = df.loc[(df.id < id_today) & (df.id >= id_last_5_day)]
            self.date_last_5_day_tuple = tuple(df_last_5_day.date)

    def read_data(self):
        # 测试月流程用
        # sql = f"select asin from us_all_syn_st_month_{2024} where date_info='{2023-12}' limit 100000"
        # print("sql===:", sql)
        # pdf_asin = pd.read_sql(sql, con=self.engine_pg14)
        # schema = StructType([
        #     StructField('asin', StringType(), True),
        # ])
        # df_asin = self.spark.createDataFrame(pdf_asin, schema=schema)
        # df_asin.show(10, truncate=False)

        if self.date_type == 'day':
            print("1.1 读取dim_st_asin_info表(当前日)")
            sql = f"select asin, site_name from dim_st_asin_info where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}';"
            print("sql:", sql)
            self.df_st_asin_today = self.spark.sql(sqlQuery=sql).cache()
            self.df_st_asin_today.show(10, truncate=False)
            self.df_st_asin_today = self.df_st_asin_today.drop_duplicates(["asin"])
            print("self.df_st_asin_today:", self.df_st_asin_today.count())
            print("1.2 读取dim_st_asin_info表(当前日的前6天)")
            sql = f"select asin, 1 as asin_isin_flag from {self.db_save} where site_name='{self.site_name}' and date_type='{self.date_type}'  " \
                  f"and date_info in {self.date_last_5_day_tuple} and date_info >= '2022-11-02';"
            print("sql:", sql)
            self.df_st_asin_last_5_day = self.spark.sql(sqlQuery=sql).cache()
            self.df_st_asin_last_5_day.show(10, truncate=False)
            print("self.df_st_asin_last_5_day去重前:", self.df_st_asin_last_5_day.count())
            self.df_st_asin_last_5_day = self.df_st_asin_last_5_day.drop_duplicates(["asin"])
            print("self.df_st_asin_last_5_day去重后:", self.df_st_asin_last_5_day.count())
        else:
            sql = f"select asin, site_name from dim_st_asin_info where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}';"
            print("sql:", sql)
            self.df_st_asin = self.spark.sql(sqlQuery=sql).cache()
            self.df_st_asin = self.df_st_asin.drop_duplicates(["asin"])
            print("self.df_st_asin.count:", self.df_st_asin.count())

        print("2. 读取dim_asin_variation_info表")
        sql = f"""select asin, 1 as asin_is_variation from dim_asin_variation_info where site_name="{self.site_name}";"""
        self.df_asin_variation = self.spark.sql(sqlQuery=sql).cache()
        self.df_asin_variation.show(10, truncate=False)
        self.df_asin_variation = self.df_asin_variation.drop_duplicates(["asin"])

        print("3. 读取dim_asin_stable_info表")
        sql = f"""select asin, asin_volume as volume, asin_weight_str as weight_str from dim_asin_stable_info where site_name="{self.site_name}";"""
        self.df_asin_stable = self.spark.sql(sqlQuery=sql).cache()
        self.df_asin_stable.show(10, truncate=False)
        self.df_asin_stable = self.df_asin_stable.drop_duplicates(["asin"])

    def handle_data(self):
        if self.date_type == 'day':
            if self.date_info >= '2022-11-02':
                self.df_st_asin_today = self.df_st_asin_today.join(
                    self.df_st_asin_last_5_day, on='asin', how='left'
                )
                print("self.df_st_asin_today合并:", self.df_st_asin_today.count())
                self.df_st_asin_today = self.df_st_asin_today.filter("asin_isin_flag is null")
            print("self.df_st_asin_today新出现:", self.df_st_asin_today.count())
            self.df_save = self.df_st_asin_today.join(
                self.df_asin_variation, on='asin', how='left'
            )
        else:
            self.df_save = self.df_st_asin.join(
                self.df_asin_variation, on='asin', how='left'
            ).join(
                self.df_asin_stable, on='asin', how='left'
            )

        self.df_save = self.df_save.drop("asin_isin_flag")
        self.handle_temp()
        self.truncate_or_update_table_syn()  # 清空表/更新表數據
        self.df_save = self.df_save.withColumn("site_name", F.lit(self.site_name))
        self.df_save = self.df_save.withColumn("date_type", F.lit(self.date_type))
        self.df_save = self.df_save.withColumn("date_info", F.lit(self.date_info))
        self.df_save = self.df_save.fillna({"asin_is_variation": 0})
        self.df_save.show(10, truncate=False)
        print("self.df_save.count:", self.df_save.count())
        users = ["fangxingjun", "wangrui4", "pengyanbing"]
        title = f"dwd_asin_to_pg: {self.site_name}, {self.date_type}, {self.date_info}"
        content = f"整合asin完成--等待导出到pg提供爬虫使用--self.df_save.count: {self.df_save.count()}"
        CommonUtil().send_wx_msg(users=users, title=title, content=content)
        # quit()

    def handle_temp(self):
        if self.site_name == 'us' and self.date_type == 'week' and self.date_info == '2023-44':
            sql = f"select asin from dwd_asin_to_pg where site_name='{self.site_name}' and date_type='month' and date_info='2023-11';"
            print("sql:", sql)
            self.df_asin_month = self.spark.sql(sqlQuery=sql).cache()
            self.df_asin_month = self.df_asin_month.drop_duplicates(["asin"])

            self.df_save = self.df_save.withColumn("data_type", F.lit(100))
            # self.df_save = self.df_save.join(
            #     self.df_asin_month, on='asin'
            # )
            result_df = self.df_asin_month.join(self.df_save, self.df_asin_month.asin == self.df_save.asin, "left_anti")
            print("result_df.count:", result_df.count())

            # 确保两个 DataFrame 有相同的列
            columns1 = self.df_save.columns
            columns2 = result_df.columns
            print(f"columns1:{columns1}, columns2:{columns2}")

            # 为 df1 添加在 df2 中存在但 df1 中缺失的列
            for col in set(columns2) - set(columns1):
                self.df_save = self.df_save.withColumn(col, F.lit(None))

            # 为 df2 添加在 df1 中存在但 df2 中缺失的列
            for col in set(columns1) - set(columns2):
                result_df = result_df.withColumn(col, F.lit(None))
            # self.df_save = self.df_save.join(
            #     result_df
            # )
            print("self.df_save.count11:", self.df_save.count())

            self.df_save = self.df_save.unionByName(result_df)
            print("self.df_save.count22:", self.df_save.count())
            self.site_name = "us"
            self.date_type = "month"
            self.date_info = "2023-11"
        else:
            self.df_save = self.df_save.withColumn("data_type", F.lit(1))


if __name__ == "__main__":
    site_name = sys.argv[1]  # 参数1:站点
    date_type = sys.argv[2]  # 参数2:类型:week/4_week/month/quarter
    date_info = sys.argv[3]  # 参数3:年-周/年-月/年-季, 比如: 2022-1
    handle_obj = DwdAsinToPg(site_name=site_name, date_type=date_type, date_info=date_info)
    handle_obj.run()