update_st_keepa_syn.py 3.29 KB
import os
import sys
import pandas as pd

sys.path.append(os.path.dirname(sys.path[0]))
from utils.spark_util import SparkUtil
from pyspark.sql.functions import col, lit
from utils.db_util import DbTypes, DBUtil
from pyspark.sql.types import StructType, StructField, StringType


class UpdateStKeepaSyn(object):

    def __init__(self, site_name="us", date_type="month", date_info="2022-01"):
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        app_name = f"{self.__class__.__name__}:{site_name}:{date_type}:{date_info}"
        self.spark = SparkUtil.get_spark_session(app_name)
        self.year = date_info.split('-')[0]
        self.schema = StructType([StructField('asin', StringType(), False)])
        # 数据库连接
        self.engine_pg14 = DBUtil.get_db_engine(db_type=DbTypes.postgresql_14.name, site_name=self.site_name)
        self.engine_pg15 = DBUtil.get_db_engine(db_type=DbTypes.postgresql.name, site_name=self.site_name)
        # df对象
        self.df_pg14 = self.spark.sql(f"select 1+1;")
        self.df_pg15 = self.spark.sql(f"select 1+1;")
        self.df_save = self.spark.sql(f"select 1+1;")

    def run(self):
        self.read_data()
        self.handle_data()
        self.save_data()

    def read_data(self):
        dt_info = self.date_info.replace('-', '_')
        sql_pg14 = f"""
            select asin from {self.site_name}_all_syn_st_month_{dt_info};
        """
        df_asin = pd.read_sql(sql_pg14, con=self.engine_pg14)
        self.df_pg14 = self.spark.createDataFrame(df_asin, schema=self.schema)
        self.df_pg14 = self.df_pg14.drop_duplicates(['asin']).repartition(80, 'asin').cache()
        print("从pg14的syn表读取最新月份asin:")
        self.df_pg14.show(10)

        sql_pg15 = f"""
            select asin from {self.site_name}_st_keepa_syn_{self.year};
        """
        df_asin = pd.read_sql(sql_pg15, con=self.engine_pg15)
        self.df_pg15 = self.spark.createDataFrame(df_asin, schema=self.schema)
        self.df_pg15 = self.df_pg15.drop_duplicates(['asin']).repartition(80, 'asin').cache()
        print("从pg15的keepa表读取所有asin:")
        self.df_pg15.show(10)

    def handle_data(self):
        # 找出df_pg14中存在,df_pg15中不存在的asin
        self.df_save = self.df_pg14.join(
            self.df_pg15, on='asin', how='left_anti'
        )
        self.df_save = self.df_save.withColumn(
            'state', lit(7)
        ).withColumn(
            'asin_trun_4', col('asin').substr(1, 4)
        ).cache()
        self.df_pg14.unpersist()
        self.df_pg15.unpersist()
        self.df_save.show(10)
        print("本月新增asin数量为:")
        print(self.df_save.count())

    def save_data(self):
        self.df_save.write.format("jdbc") \
            .option("url", "jdbc:postgresql://192.168.10.224:5433/selection") \
            .option("dbtable", f"{self.site_name}_st_keepa_syn_{self.year}") \
            .option("user", "yswg_postgres") \
            .option("password", "yswg_postgres") \
            .mode("append") \
            .save()


if __name__ == "__main__":
    site_name = sys.argv[1]
    date_type = sys.argv[2]
    date_info = sys.argv[3]
    handle_obj = UpdateStKeepaSyn(site_name=site_name, date_type=date_type, date_info=date_info)
    handle_obj.run()