import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from pyspark.storagelevel import StorageLevel
from utils.templates import Templates
# from ..utils.templates import Templates
# from AmazonSpider.pyspark_job.utils.templates_test import Templates
from pyspark.sql.types import StringType
# 分组排序的udf窗口函数
from pyspark.sql.window import Window
from pyspark.sql import functions as F


class OdsBrandAnalytics(Templates):

    def __init__(self, site_name='us', date_type="month", date_info='2022-01'):
        super().__init__()
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        self.date_info2 = date_info
        # self.db_save = f'ods_brand_analytics'
        self.spark = self.create_spark_object(
            app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}")

    def read_data(self):
        sql = f"select search_term, asin from dwd_st_asin_measure where site_name='{self.site_name}' " \
              f"and date_type='{self.date_type}' and date_info='{self.date_info}' and search_term in ('christmas decorations', 'my orders', 'ugly christmas sweater men')"
        print("sql:", sql)
        df_st_asin = self.spark.sql(sqlQuery=sql).cache()
        df_st_asin.show(20, truncate=False)
        sql = f"select search_term, st_ao_val from dwd_st_measure where site_name='{self.site_name}' " \
              f"and date_type='{self.date_type}' and date_info='{self.date_info}' and search_term in ('christmas decorations', 'my orders', 'ugly christmas sweater men')"
        print("sql:", sql)
        df_st = self.spark.sql(sqlQuery=sql).cache()
        df_st.show(20, truncate=False)
        sql = f"select asin, asin_ao_val, asin_sp_counts, asin_sb_counts, asin_zr_counts from dwd_asin_measure where site_name='{self.site_name}' " \
              f"and date_type='{self.date_type}' and date_info='{self.date_info}'"
        print("sql:", sql)
        df_asin = self.spark.sql(sqlQuery=sql).cache()
        df_asin.show(20, truncate=False)
        df_asin.filter("asin='B07G9ZZJSV'").show(20, truncate=False)
        df_save = df_st_asin.join(
            df_st, on='search_term', how='left'
        ).join(
            df_asin, on='asin', how='left'
        )
        # df_save.show(100, truncate=False)
        print(df_save.count())
        df_save = df_save.toPandas()
        df_save.to_csv("/opt/module/spark/demo/py_demo/temp/st_1_10_100_new.csv", index=False)

        quit()


        sql = f"select search_term, st_rank, asin, st_ao_val from dwt_st_asin_info where site_name='{self.site_name}' " \
              f"and date_type='{self.date_type}' and date_info='{self.date_info}' and st_rank in(1, 10, 100)"
        print("sql:", sql)
        df_st_asin = self.spark.sql(sqlQuery=sql).cache()
        df_st_asin.show(20, truncate=False)
        df_st_asin.filter("asin='B07G9ZZJSV'").show(20, truncate=False)
        sql = f"select asin, asin_ao_val, asin_sp_counts, asin_sb_counts, asin_zr_counts from dwd_asin_counts where site_name='{self.site_name}' " \
              f"and date_type='{self.date_type}' and date_info='{self.date_info}'"
        print("sql:", sql)
        df_asin = self.spark.sql(sqlQuery=sql).cache()
        df_asin.show(20, truncate=False)
        df_asin.filter("asin='B07G9ZZJSV'").show(20, truncate=False)
        df_save = df_st_asin.join(
            df_asin, on='asin', how='left'
        )
        # df_save.show(100, truncate=False)
        print(df_save.count())
        df_save = df_save.toPandas()
        df_save.to_csv("/opt/module/spark/demo/py_demo/temp/st_1_10_100.csv", index=False)
        quit()

    def handle_data(self):
        pass


if __name__ == '__main__':
    site_name = sys.argv[1]  # 参数1:站点
    date_type = sys.argv[2]  # 参数2:类型:day/week/4_week/month/quarter
    date_info = sys.argv[3]  # 参数3:年-月-日/年-周/年-月/年-季, 比如: 2022-1
    handle_obj = OdsBrandAnalytics(site_name=site_name, date_type=date_type, date_info=date_info)
    handle_obj.run()