import sys

from sqlalchemy import create_engine
from pyspark.sql import SparkSession
from collections import OrderedDict
import time


class EsStDetail(object):

    def __init__(self, site_name='us', date_type="week", year=2022, week=1):
        self.site_name = site_name
        self.date_type = date_type
        self.year = year
        self.week = week
        # self.date_info = f"{self.year}-{self.week}"
        if self.date_type in ['4_week']:
            self.table_name = f"dwt_asin_last_4_week"
            self.table_trend_name = f"ads_asin_trend_last_4_week"
            self.date = time.strftime("%Y-%m-%d", time.localtime())
            self.es_table_name = f"{self.site_name}_st_detail_last_4_week_copy"
            self.date_info = f"{self.year}-{self.week}"
        if self.date_type in ['month']:
            self.table_name = f"dwt_asin_month"  # 月
            self.table_trend_name = f"ads_asin_detail_trend_month"
            import calendar  # 导入库
            lastDay = calendar.monthrange(self.year, self.week)[1]  # 指定年月的最后一天,即指定年月的整月总天数
            self.date = f"{self.year}-0{self.week}-{lastDay}" if self.week < 10 else f"{self.year}-{self.week}-{lastDay}"
            print("lastDay:", lastDay)  #
            self.date_info = f"{self.year}_{self.week}"
            self.es_table_name = f"{self.site_name}_st_detail_{self.date_type}_{self.date_info}"
        print("self.date:", self.date)  #
        # us_st_detail_month_2022_6
        if self.site_name == 'us':
            self.engine = create_engine(
                f'mysql+pymysql://adv_yswg:HmRCMUjt03M33Lze@rm-wz9yg9bsb2zf01ea4yo.mysql.rds.aliyuncs.com:3306/selection?charset=utf8mb4')  # , pool_recycle=3600
            self.es_port = '9200'
        else:
            if self.site_name in ['uk', 'de']:
                self.es_port = '9201'
            else:
                self.es_port = '9202'
            self.engine = create_engine(
                f'mysql+pymysql://adv_yswg:HmRCMUjt03M33Lze@rm-wz9yg9bsb2zf01ea4yo.mysql.rds.aliyuncs.com:3306/selection_{self.site_name}?charset=utf8mb4')  # , pool_recycle=3600
        self.df_read = object()
        self.df_spark = object()
        # 配置es的连接对象
        self.es_url = '120.79.147.190'
        self.es_user = 'elastic'
        self.es_pass = 'selection2021.+'

        # 创建spark对象
        print(f"当前同步:{self.table_name}:, {self.site_name}-{self.year}-{self.week}")
        self.spark = SparkSession.builder. \
            appName(f"{self.table_name}:, {self.site_name}-{self.year}-{self.week}"). \
            config("spark.sql.warehouse.dir", f"hdfs://hadoop5:8020/home/big_data_selection"). \
            config("spark.metastore.uris", "thrift://hadoop6:9083"). \
            config("spark.network.timeout", 10000000). \
            config("spark.sql.parquet.compression.codec", "lzo"). \
            enableHiveSupport(). \
            getOrCreate()
        self.spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
        self.spark.sql('''set mapred.output.compress=true''')
        self.spark.sql('''set hive.exec.compress.output=true''')
        self.spark.sql('''set mapred.output.compression.codec=com.hadoop.compression.lzo.LzopCodec''')
        self.spark.sql(f"use selection_off_line;")
        self.partition_type = "dt"

    def read_data(self):
        #sql = f"select * from {self.table_name} where site='{self.site_name}' and dt='{self.date_info}';"
        sql = f"""
        select
        t1.asin,ao_val,zr_counts,sp_counts,sb_counts,vi_counts,bs_counts,ac_counts,tr_counts,er_counts,nvl(orders,0) as orders,nvl(bsr_orders,0) as bsr_orders,nvl(sales,0) as bsr_orders_sale,is_self,pt_category_id,one_category_id,title,title_len,price,rating,total_comments,
        t1.buy_box_seller_type,page_inventory,volume,weight,rank,if(launch_time<'1970-01-01 00:00:00',to_date('1970-01-01 00:00:00'),launch_time) as launch_time,img_num,img_type,activity_type,
        one_two_val,three_four_val,five_six_val,eight_val,qa_num,brand_name as brand,t1.variation_num,one_star,two_star,
        three_star,four_star,five_star,low_star,together_asin,account_name,account_id,site_name,bsr_type,bsr_best_orders_type,zr_best_orders_type,
rank_rise, cast(rank_change as double) rank_change, cast(ao_rise as double) ao_rise, cast(ao_change as double) ao_change,  cast(price_rise as double) price_rise,  cast(price_change as double) price_change, orders_rise,  cast(orders_change as double) orders_change,  cast(rating_rise as double) rating_rise,  cast(rating_change as double) rating_change,
    comments_rise,cast(comments_change as double) comments_change, bsr_orders_rise, cast(bsr_orders_change as double) bsr_orders_change, cast(sales_rise as double) sales_rise, cast(sales_change as double) sales_change,  variation_rise, cast(variation_change as double) variation_change,
size_type, rating_type, t1.site_name_type, launch_time_type, weight_type, ao_type as ao_val_type, rank_type, price_type, '{self.date_info}' as wmqtype
from (
         select asin,ao_val,zr_counts,sp_counts,sb_counts,vi_counts,bs_counts,ac_counts,tr_counts,er_counts,bsr_orders,
                orders,sales,is_self,pt_category_id,one_category_id,title,title_len,price,rating,total_comments,
                buy_box_seller_type,page_inventory,volume,weight,rank,launch_time,img_num,img_type,activity_type,
                one_two_val,three_four_val,five_six_val,eight_val,qa_num,brand_name,variation_num,one_star,two_star,
                three_star,four_star,five_star,low_star,together_asin,account_name,account_id,site_name,bsr_type,bsr_best_orders_type,zr_best_orders_type,
                case
                     when buy_box_seller_type = 1 then 4
                     when buy_box_seller_type != 1 and site_name like 'US%' then 1
                     when buy_box_seller_type != 1 and site_name like 'CN%' then 2
                     else 3 end  site_name_type,
                case
                              when rating is null then 0
                              when rating >= 4.5 then 1
                              when rating < 4.5 and rating >= 4 then 2
                              when rating < 4 and rating >= 3.5 then 3
                              when rating < 3.5 and rating >= 3 then 4
                              else 5 end              rating_type,
                          case
                              when weight is null then 0
                              when weight < 0.2 then 1
                              when weight >= 0.2 and weight < 0.4 then 2
                              when weight >= 0.4 and weight < 0.6 then 3
                              when weight >= 0.6 and weight < 1 then 4
                              when weight >= 1 and weight < 2 then 5
                              else 6 end              weight_type,
                          case
                              when rank is null then 0
                              when rank >= 1 and rank <= 999 then 1
                              when rank >= 1000 and rank <= 4999 then 2
                              when rank >= 5000 and rank <= 9999 then 3
                              when rank >= 10000 and rank <= 19999 then 4
                              when rank >= 20000 and rank <= 29999 then 5
                              when rank >= 30000 and rank <= 49999 then 6
                              when rank >= 50000 and rank <= 69999 then 7
                              else 8 end              rank_type,
                          case
                              when price is null then 0
                              when price < 10 then 1
                              when price >= 10 and price < 15 then 2
                              when price >= 15 and price < 20 then 3
                              when price >= 20 and price < 30 then 4
                              when price >= 30 and price < 50 then 5
                              else 6 end              price_type,
                          case
                              when ao_val is null then 0
                              when ao_val >= 0 and ao_val < 0.1 then 2
                              when ao_val >= 0.1 and ao_val < 0.2 then 2
                              when ao_val >= 0.2 and ao_val < 0.4 then 3
                              when ao_val >= 0.4 and ao_val < 0.8 then 4
                              when ao_val >= 0.8 and ao_val < 1.2 then 5
                              when ao_val >= 1.2 and ao_val < 2 then 6
                              else 7 end              ao_type,
                    case
                     when  launch_time is null then 0
                     when datediff('{self.date}',launch_time) <= 30 then 1
                     when months_between('{self.date}',launch_time) >=1 and
                          months_between('{self.date}',launch_time) <=3 then 2
                     when months_between('{self.date}',launch_time) >3 and
                          months_between('{self.date}',launch_time) <=6 then 3
                     when months_between('{self.date}',launch_time) >6 and
                          months_between('{self.date}',launch_time) <=12 then 4
                     when months_between('{self.date}',launch_time) >12 and
                          months_between('{self.date}',launch_time) <=24 then 5
                     when months_between('{self.date}',launch_time) >24 and
                          months_between('{self.date}',launch_time) <=36 then 6
                     else 7 end  launch_time_type

         from {self.table_name}
         where site = '{self.site_name}'
           and dt = '{self.date_info}'
     )t1
left join
    (
        select
            asin, rank_rise, rank_change, ao_rise, ao_change, price_rise, price_change, orders_rise, orders_change, rating_rise, rating_change,
            comments_rise,comments_change, bsr_orders_rise, bsr_orders_change, sales_rise, sales_change, variation_num, variation_rise, variation_change
        from {self.table_trend_name}
        where site='{self.site_name}' and dt='{self.date_info}'
        )t2 on t1.asin=t2.asin
left join
    (select asin,size_type from dwt_asin_size where site_dt='{self.site_name}-9999-99' )t3 on t1.asin=t3.asin
group by t1.asin,ao_val,zr_counts,sp_counts,sb_counts,vi_counts,bs_counts,ac_counts,tr_counts,er_counts,bsr_orders,
                orders,sales,is_self,pt_category_id,one_category_id,title,title_len,price,rating,total_comments,
                t1.buy_box_seller_type,page_inventory,volume,weight,rank,launch_time,img_num,img_type,activity_type,
                one_two_val,three_four_val,five_six_val,eight_val,qa_num,brand_name,t1.variation_num,one_star,two_star,
                three_star,four_star,five_star,low_star,together_asin,account_name,account_id,site_name,bsr_type,bsr_best_orders_type,zr_best_orders_type,
        rank_rise, rank_change, ao_rise, ao_change, price_rise, price_change, orders_rise, orders_change, rating_rise, rating_change,
            comments_rise,comments_change, bsr_orders_rise, bsr_orders_change, sales_rise, sales_change,  variation_rise, variation_change,
        size_type, rating_type, t1.site_name_type, launch_time_type, weight_type, ao_type, rank_type, price_type
        """

        print("sql:", sql)
        self.df_spark = self.spark.sql(sqlQuery=sql)
        self.df_spark = self.df_spark.cache()
        self.df_spark.show(10)
        print("self.df_spark.count:", self.df_spark.count())
        print("分区数1:", self.df_spark.rdd.getNumPartitions())
        self.df_spark = self.df_spark.repartition(20)
        print("分区数2:", self.df_spark.rdd.getNumPartitions())

    def save_data(self):
        # 将结果写入es
        options = OrderedDict()
        options['es.nodes'] = self.es_url
        options['es.port'] = self.es_port
        options['es.net.http.auth.user'] = self.es_user
        options['es.net.http.auth.pass'] = self.es_pass
        options['es.mapping.id'] = "asin"
        options['es.resource'] = f'{self.es_table_name}/_doc'
        # 连接es的超时时间设置。默认1m
        # options['es.http.timeout'] = '10000m'
        options['es.nodes.wan.only'] = 'true'
        # # # 默认重试3次,为负值的话为无限重试(慎用)
        # # options['es.batch.write.retry.count'] = '15'
        # # 默认重试等待时间是 10s
        # options['es.batch.write.retry.wait'] = '60'
        # # 以下参数可以控制单次批量写入的数据量大小和条数(二选一)
        # options['es.batch.size.bytes'] = '20mb'
        # options['es.batch.size.entries'] = '20000'
        self.df_spark.write.format('org.elasticsearch.spark.sql').options(**options).mode('append').save()

    def run(self):
        self.read_data()
        self.save_data()


if __name__ == '__main__':
    #site_name = sys.argv[1]  # 参数1:站点
    #date_type = sys.argv[2]  # 参数2:week/month/quarter
    #year = int(sys.argv[3])  # 参数2:year
    #week = int(sys.argv[4])  # 参数3:week

    site_name = 'us'
    date_type = '4_week'
    year = 2022
    week = 42

    # handle_obj = EsBrandAnalytics(site_name=site_name, year=year)
    handle_obj = EsStDetail(site_name=site_name, date_type=date_type, year=year, week=week)
    handle_obj.run()