import os import sys from sqlalchemy.dialects.postgresql import pypostgresql sys.path.append(os.path.dirname(sys.path[0])) from utils.spark_util import SparkUtil from utils.db_util import DBUtil from utils.StarRocksHelper import StarRocksHelper if __name__ == '__main__': spark = SparkUtil.get_spark_session("us_st_keepa_syn_2024_export") # 从SR数据库中读取已有数据 sql = """ select distinct asin from selection.us_asin_latest_detail where date_info = '2024-06' and (asin_launch_time>'2024-07-19' or asin_launch_time<'1990-01-01') """ df_sr = StarRocksHelper.spark_import_with_sql(spark, sql).repartition(80, 'asin').cache() print("starrocks读取:") df_sr.show(10) sql = """ select asin from tmp_us_st_keepa_syn_2024; """ df_pg = spark.sql(sql).drop_duplicates(['asin']).repartition(80, 'asin').cache() print("pg读取:") df_pg.show(10) df = df_sr.subtract(df_pg) print(df.count()) df_sr.unpersist() df_pg.unpersist() update_asin = df.select("asin").rdd.map(lambda row: row[0]).collect() print(update_asin) pg_engine = DBUtil.get_db_engine('postgresql', 'us') with pg_engine.begin() as conn: update_query = f""" UPDATE us_st_keepa_syn_2024 SET state = 5 WHERE asin IN {tuple(update_asin)} """ conn.execute(update_query) spark.stop()