spark_pg2pg.py 1.16 KB
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))

from utils.common_util import CommonUtil
from utils.spark_util import SparkUtil
from pyspark.sql.functions import row_number, lit
from pyspark.sql.window import Window
from pyspark.sql.types import StringType, ArrayType
from urllib.parse import quote
from datetime import datetime


if __name__ == '__main__':
    date_info = CommonUtil.get_sys_arg(1, None)
    year, month, day = date_info.split("-")
    table = f"us_merchantwords_brand_analytics_2024_{month}_{day}"
    spark = SparkUtil.get_spark_session(f"us_merchantwords_brand_analytics_2024:pg2pg,{date_info}")

    df = spark.read.format("jdbc") \
        .option("url", "jdbc:postgresql://113.100.143.162:5432/selection") \
        .option("dbtable", table) \
        .option("user", "yswg_postgres") \
        .option("password", "yswg_postgres") \
        .load()

    df.write.format("jdbc") \
        .option("url", "jdbc:postgresql://113.100.143.162:5443/selection") \
        .option("dbtable", table) \
        .option("user", "yswg_postgres") \
        .option("password", "yswg_postgres") \
        .mode("append") \
        .save()

    spark.stop()