import os
import sys

from sqlalchemy.dialects.postgresql import pypostgresql

sys.path.append(os.path.dirname(sys.path[0]))

from utils.spark_util import SparkUtil
from utils.db_util import DBUtil
from utils.StarRocksHelper import StarRocksHelper


if __name__ == '__main__':
    spark = SparkUtil.get_spark_session("us_st_keepa_syn_2024_export")

    # 从SR数据库中读取已有数据
    sql = """
    select distinct asin from selection.us_asin_latest_detail where date_info = '2024-06' and (asin_launch_time>'2024-07-19' or asin_launch_time<'1990-01-01')
    """
    df_sr = StarRocksHelper.spark_import_with_sql(spark, sql).repartition(80, 'asin').cache()
    print("starrocks读取:")
    df_sr.show(10)

    sql = """
    select asin from tmp_us_st_keepa_syn_2024;
    """
    df_pg = spark.sql(sql).drop_duplicates(['asin']).repartition(80, 'asin').cache()
    print("pg读取:")
    df_pg.show(10)

    df = df_sr.subtract(df_pg)
    print(df.count())
    df_sr.unpersist()
    df_pg.unpersist()

    update_asin = df.select("asin").rdd.map(lambda row: row[0]).collect()
    print(update_asin)
    pg_engine = DBUtil.get_db_engine('postgresql', 'us')
    with pg_engine.begin() as conn:
        update_query = f"""
        UPDATE us_st_keepa_syn_2024 SET state = 5 WHERE asin IN {tuple(update_asin)}
        """
        conn.execute(update_query)

    spark.stop()