import sys
import os

sys.path.append(os.path.dirname(sys.path[0]))
from utils.es_util import EsUtils
from utils.templates_mysql import TemplatesMysql
import pandas as pd
from utils.common_util import CommonUtil
from utils.spark_util import SparkUtil
from utils.db_util import DbTypes, DBUtil


class EsStDetail(TemplatesMysql):

    def __init__(self, site_name='us', date_type="week", date_info='2023-01', result_type='formal'):
        super().__init__()
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        self.result_type = result_type
        self.df_workflow = pd.DataFrame()
        self.table_name = f"dwt_flow_asin"
        # 创建spark对象
        self.spark = SparkUtil.get_spark_sessionV4(
            app_name=f"flow_asin_synchronize: {self.site_name}-{self.date_type}-{self.date_info}",
            use_db="big_data_selection")
        # DataFrame对象初始化
        self.df_synchronize = self.spark.sql("select 1+1")
        if self.date_type == '4_week':
            self.cur_date = self.get_date_from_week()
        else:
            self.cur_date = self.date_info
        self.year_month = str(self.cur_date).replace("-", "_")
        self.engine_mysql = DBUtil.get_db_engine(db_type=DbTypes.mysql.name, site_name="us")
        # elasticsearch上索引名称及索引别名准备
        self.alias_name = f'{self.site_name}_st_detail_last_4_week' if self.result_type == 'formal' else f'{self.site_name}_st_detail_last_4_week_test'
        is_formal = self.result_type == 'formal'
        env_suffix = '' if is_formal else '_test'
        if self.date_type == '4_week':
            print(f"往{'正式' if is_formal else '测试'}环境上同步最近三十天数据")
            self.es_index_name = f"{self.site_name}_flow_asin_last30day_{self.cur_date}{env_suffix}"
        else:
            print(f"往{'正式' if is_formal else '测试'}环境上同步月数据")
            self.es_index_name = f"{self.site_name}_st_detail_{self.date_type}_{self.year_month}{env_suffix}"
        # 流程表中任务类型和记录表名
        self.record_table = 'workflow_everyday'
        self.record_type = 'month' if self.date_type == 'month' else '30_day'
        self.record_table_name_field = f'{self.site_name}_flow_asin_last_month' if self.date_type == 'month' else f'{self.site_name}_flow_asin_last30day'
        # elasticsearch相关配置
        self.client = EsUtils.get_es_client()
        self.es_options = EsUtils.get_es_options(self.es_index_name)
        self.es_body = EsUtils.get_es_body()

        # 正式导出需入导出记录表
        if result_type == 'formal':
            CommonUtil.judge_is_work_hours(site_name=site_name, date_type=date_type, date_info=date_info,
                                           principal='chenyuanjie', priority=3, export_tools_type=2,
                                           belonging_to_process='流量选品')

    def get_date_from_week(self):
        df_date = self.spark.sql(f"select * from dim_date_20_to_30;")
        df = df_date.toPandas()
        df_loc = df.loc[(df.year_week == f'{self.date_info}') & (df.week_day == 7)]
        cur_date = list(df_loc.date)[0]
        print(cur_date)
        return str(cur_date)

    # 读取数据
    def read_data(self):
        sql = f"""
            select asin, asin_ao_val as ao_val, asin_zr_counts as zr_counts, asin_sp_counts as sp_counts, 
            asin_sb_counts as sb_counts, asin_vi_counts as vi_counts, asin_bs_counts as bs_counts, 
            asin_ac_counts as ac_counts, asin_tr_counts as tr_counts, asin_er_counts as er_counts, 
            bsr_orders, sales as bsr_orders_sale, asin_title as title, asin_title_len as title_len, asin_price as price, 
            asin_rating as rating, asin_total_comments as total_comments, asin_buy_box_seller_type as buy_box_seller_type, 
            asin_page_inventory as page_inventory, asin_volume as volume, asin_weight as weight, asin_color as color, 
            asin_size as size, asin_style as style, asin_material as material, asin_launch_time as launch_time, 
            asin_img_num as img_num, parent_asin, asin_img_type as img_type, asin_img_url as img_url, 
            asin_activity_type as activity_type, act_one_two_val as one_two_val, act_three_four_val as three_four_val, 
            act_five_six_val as five_six_val, act_eight_val as eight_val, asin_brand_name as brand, variation_num, 
            one_star, two_star, three_star, four_star, five_star, low_star, together_asin, account_name, account_id, 
            seller_country_name as site_name, asin_rank_rise as rank_rise, asin_rank_change as rank_change, 
            asin_ao_rise as ao_rise, asin_ao_change as ao_change, asin_price_rise as price_rise, 
            asin_price_change as price_change, asin_rating_rise as rating_rise, asin_rating_change as rating_change, 
            asin_comments_rise as comments_rise, asin_comments_change as comments_change, 
            asin_bsr_orders_rise as bsr_orders_rise, asin_bsr_orders_change as bsr_orders_change, 
            asin_sales_rise as sales_rise, asin_sales_change as sales_change, asin_variation_rise as variation_rise, 
            asin_variation_change as variation_change, asin_size_type as size_type, asin_rating_type as rating_type, 
            asin_site_name_type as site_name_type, asin_weight_type as weight_type, asin_launch_time_type as launch_time_type, 
            asin_ao_val_type as ao_val_type, asin_rank_type as rank_type, asin_price_type as price_type, bsr_type, 
            bsr_best_orders_type, asin_quantity_variation_type as quantity_variation_type, package_quantity, is_movie_label, 
            is_brand_label, is_alarm_brand, asin_type, asin_crawl_date, category_first_id, category_id, first_category_rank, 
            current_category_rank, asin_weight_ratio, asin_bought_month, asin_lqs_rating, asin_lqs_rating_detail, 
            title_matching_degree, asin_lob_info, is_contains_lob_info, is_package_quantity_abnormal, zr_flow_proportion, 
            matrix_flow_proportion, matrix_ao_val, customer_reviews_json as product_features, img_info, 
            coalesce(parent_asin, asin) as collapse_asin, follow_sellers_count 
            from {self.table_name} where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'
        """
        print("sql:", sql)
        self.df_synchronize = self.spark.sql(sqlQuery=sql)
        self.df_synchronize = self.df_synchronize.repartition(40).cache()
        self.df_synchronize.show(10, truncate=False)

    # 同步数据前的准备工作
    def es_prepare(self):
        print("当前链接的es节点信息为:" + str(EsUtils.__es_ip__))
        EsUtils.create_index(self.es_index_name, self.client, self.es_body)
        if self.date_type != 'month':
            if not EsUtils.exist_index_alias(self.alias_name, self.client):
                EsUtils.create_index_alias(self.es_index_name, self.alias_name, self.client)
            else:
                index_name_list = EsUtils.get_index_names_associated_alias(self.alias_name, self.client)
                if self.es_index_name not in index_name_list:
                    EsUtils.delete_index_alias(self.alias_name, self.client)
                    EsUtils.create_index_alias(self.es_index_name, self.alias_name, self.client)
                else:
                    pass

    # 同步数据到elasticsearch
    def save_data(self):
        print(f"当前同步的数据为:{self.table_name}: {self.site_name}-{self.date_type}-{self.date_info}; 数据量为: {self.df_synchronize.count()}")
        self.df_synchronize = self.df_synchronize.repartition(60)
        self.df_synchronize.write.format('org.elasticsearch.spark.sql').options(**self.es_options).mode('append').save()

    # 月数据同步完成后将最近三十天索引切换到月并修改流程表状态
    def modify_mission_record_status(self):
        # 查询流程表已完成的最新记录时间点
        if self.date_type == 'month' and self.result_type == 'formal':
            select_sql = f"select id from {self.record_table} where site_name='{self.site_name}' and date_type='{self.date_type}' and report_date='{self.cur_date}' and page='流量选品' and status_val=14 and is_end='是'"
            df_is_finished = pd.read_sql(select_sql, self.engine_mysql)
            if df_is_finished.empty:
                replace_sql = f"""
                    replace into {self.record_table} (site_name, report_date, status, status_val, table_name, date_type, page, is_end, remark, export_db_type)
                    VALUES ('{self.site_name}', '{self.cur_date}', '流量选品计算完毕', 14,  '{self.record_table_name_field}', '{self.record_type}', '流量选品', '是', '流量选品计算完毕', 'elasticsearch')
                """
                DBUtil.exec_sql('mysql', 'us', replace_sql)
                if EsUtils.exist_index_alias(self.alias_name, self.client):
                    print("切换最近三十天索引别名链接到月数据")
                    EsUtils.delete_index_alias(self.alias_name, self.client)
                    EsUtils.create_index_alias(self.es_index_name, self.alias_name, self.client)
                else:
                    EsUtils.create_index_alias(self.es_index_name, self.alias_name, self.client)
        else:
            pass

    def run(self):
        self.read_data()
        self.es_prepare()
        self.save_data()
        self.modify_mission_record_status()


if __name__ == '__main__':
    arguments = sys.argv[1:]
    site_name = sys.argv[1]  # 参数1:站点
    date_type = sys.argv[2]  # 参数2:week/month/quarter
    date_info = sys.argv[3]
    if len(arguments) == 4:
        result_type = sys.argv[4]
    else:
        result_type = 'formal'
    handle_obj = EsStDetail(site_name=site_name, date_type=date_type, date_info=date_info, result_type=result_type)
    handle_obj.run()