import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))

from utils.common_util import CommonUtil
from utils.spark_util import SparkUtil
from pyspark.sql.functions import row_number, lit
from pyspark.sql.window import Window
from pyspark.sql.types import StringType, ArrayType
from urllib.parse import quote
from datetime import datetime


if __name__ == '__main__':
    date_info = CommonUtil.get_sys_arg(1, None)
    n = CommonUtil.get_sys_arg(2, 0)
    hive_tb = "dwt_merchantwords_st_detail"
    export_tb = "us_merchantwords_search_term_month_syn_2024"
    spark = SparkUtil.get_spark_session(f"export: {hive_tb}")
    # 一次导出400w条数据
    batch_size = (int(n)-1) * 4000000
    start_index = 1 + batch_size
    end_index = 4000000 + batch_size

    # 构建 URL 的函数
    def build_urls(search_term):
        url_template = f"https://www.amazon.com/s?k={{search_term}}&page={{page_number}}"
        search_term_chinese = quote(search_term, 'utf-8')
        search_term_chinese = search_term_chinese.replace("'", '%27').replace("/", '%2F')
        urls = [
            url_template.format(
                search_term=search_term_chinese.replace(' ', '+').replace('&', '%26').replace('#', '%23').replace('(',
                                                                                                                  '%28').replace(
                    ')', '%29'), page_number=1),
            url_template.format(
                search_term=search_term_chinese.replace(' ', '+').replace('&', '%26').replace('#', '%23').replace('(',
                                                                                                                  '%28').replace(
                    ')', '%29'), page_number=2),
            url_template.format(
                search_term=search_term_chinese.replace(' ', '+').replace('&', '%26').replace('#', '%23').replace('(',
                                                                                                                  '%28').replace(
                    ')', '%29'), page_number=3)
        ]
        return urls

    # 将Python函数转换为UDF
    spark.udf.register("build_urls", build_urls, ArrayType(StringType()))

    # 从 PostgreSQL 数据库中读取已有数据
    # df_pg = spark.read.format("jdbc") \
    #     .option("url", "jdbc:postgresql://192.168.10.225:5432/selection") \
    #     .option("dbtable", export_tb) \
    #     .option("user", "yswg_postgres") \
    #     .option("password", "yswg_postgres") \
    #     .load()
    # df_pg = df_pg\
    #     .select("search_term") \
    #     .drop_duplicates(["search_term"]) \
    #     .repartition(70) \
    #     .cache()

    # 从 Hive 表中读取数据
    df_hive = spark.sql(f"SELECT keyword FROM {hive_tb}")
    df_hive = df_hive\
        .withColumn("row_num", row_number().over(Window.orderBy("keyword")))\
        .filter(f"row_num BETWEEN {start_index} AND {end_index}")\
        .select("keyword")\
        .repartition(10) \
        .cache()

    # 过滤掉keyword含有中文的数据
    df_hive = df_hive.filter(~df_hive["keyword"].rlike("[\u4e00-\u9fff]"))
    # 过滤掉已存在于目标数据库中的数据
    # df_hive = df_hive.join(df_pg, df_hive["keyword"] == df_pg["search_term"], "leftanti")

    # 如果没有数据需要导出,退出循环
    if df_hive.count() == 0:
        print("-------数据已全部导出!-------")
        quit()

    df_hive = df_hive.selectExpr("keyword AS search_term")
    df_hive = df_hive.selectExpr("search_term", "explode(build_urls(search_term)) AS url")
    df_hive = df_hive.withColumn("date_info", lit(date_info))

    # 导出数据到 PostgreSQL 数据库
    df_hive.write.format("jdbc") \
        .option("url", "jdbc:postgresql://192.168.10.225:5432/selection") \
        .option("dbtable", export_tb) \
        .option("user", "yswg_postgres") \
        .option("password", "yswg_postgres") \
        .mode("append") \
        .save()

    spark.stop()