import sys from sqlalchemy import create_engine from pyspark.sql import SparkSession from collections import OrderedDict import time class EsStDetail(object): def __init__(self, site_name='us', date_type="week", year=2022, week=1): self.site_name = site_name self.date_type = date_type self.year = year self.week = week # self.date_info = f"{self.year}-{self.week}" if self.date_type in ['4_week']: self.table_name = f"dwt_asin_last_4_week" self.table_trend_name = f"ads_asin_trend_last_4_week" self.date = time.strftime("%Y-%m-%d", time.localtime()) self.es_table_name = f"{self.site_name}_st_detail_last_4_week_copy" self.date_info = f"{self.year}-{self.week}" if self.date_type in ['month']: self.table_name = f"dwt_asin_month" # 月 self.table_trend_name = f"ads_asin_detail_trend_month" import calendar # 导入库 lastDay = calendar.monthrange(self.year, self.week)[1] # 指定年月的最后一天,即指定年月的整月总天数 self.date = f"{self.year}-0{self.week}-{lastDay}" if self.week < 10 else f"{self.year}-{self.week}-{lastDay}" print("lastDay:", lastDay) # self.date_info = f"{self.year}_{self.week}" self.es_table_name = f"{self.site_name}_st_detail_{self.date_type}_{self.date_info}" print("self.date:", self.date) # # us_st_detail_month_2022_6 if self.site_name == 'us': self.engine = create_engine( f'mysql+pymysql://adv_yswg:HmRCMUjt03M33Lze@rm-wz9yg9bsb2zf01ea4yo.mysql.rds.aliyuncs.com:3306/selection?charset=utf8mb4') # , pool_recycle=3600 self.es_port = '9200' else: if self.site_name in ['uk', 'de']: self.es_port = '9201' else: self.es_port = '9202' self.engine = create_engine( f'mysql+pymysql://adv_yswg:HmRCMUjt03M33Lze@rm-wz9yg9bsb2zf01ea4yo.mysql.rds.aliyuncs.com:3306/selection_{self.site_name}?charset=utf8mb4') # , pool_recycle=3600 self.df_read = object() self.df_spark = object() # 配置es的连接对象 self.es_url = '120.79.147.190' self.es_user = 'elastic' self.es_pass = 'selection2021.+' # 创建spark对象 print(f"当前同步:{self.table_name}:, {self.site_name}-{self.year}-{self.week}") self.spark = SparkSession.builder. \ appName(f"{self.table_name}:, {self.site_name}-{self.year}-{self.week}"). \ config("spark.sql.warehouse.dir", f"hdfs://hadoop5:8020/home/big_data_selection"). \ config("spark.metastore.uris", "thrift://hadoop6:9083"). \ config("spark.network.timeout", 10000000). \ config("spark.sql.parquet.compression.codec", "lzo"). \ enableHiveSupport(). \ getOrCreate() self.spark.sql("set hive.exec.dynamic.partition.mode=nonstrict") self.spark.sql('''set mapred.output.compress=true''') self.spark.sql('''set hive.exec.compress.output=true''') self.spark.sql('''set mapred.output.compression.codec=com.hadoop.compression.lzo.LzopCodec''') self.spark.sql(f"use selection_off_line;") self.partition_type = "dt" def read_data(self): #sql = f"select * from {self.table_name} where site='{self.site_name}' and dt='{self.date_info}';" sql = f""" select t1.asin,ao_val,zr_counts,sp_counts,sb_counts,vi_counts,bs_counts,ac_counts,tr_counts,er_counts,nvl(orders,0) as orders,nvl(bsr_orders,0) as bsr_orders,nvl(sales,0) as bsr_orders_sale,is_self,pt_category_id,one_category_id,title,title_len,price,rating,total_comments, t1.buy_box_seller_type,page_inventory,volume,weight,rank,if(launch_time<'1970-01-01 00:00:00',to_date('1970-01-01 00:00:00'),launch_time) as launch_time,img_num,img_type,activity_type, one_two_val,three_four_val,five_six_val,eight_val,qa_num,brand_name as brand,t1.variation_num,one_star,two_star, three_star,four_star,five_star,low_star,together_asin,account_name,account_id,site_name,bsr_type,bsr_best_orders_type,zr_best_orders_type, rank_rise, cast(rank_change as double) rank_change, cast(ao_rise as double) ao_rise, cast(ao_change as double) ao_change, cast(price_rise as double) price_rise, cast(price_change as double) price_change, orders_rise, cast(orders_change as double) orders_change, cast(rating_rise as double) rating_rise, cast(rating_change as double) rating_change, comments_rise,cast(comments_change as double) comments_change, bsr_orders_rise, cast(bsr_orders_change as double) bsr_orders_change, cast(sales_rise as double) sales_rise, cast(sales_change as double) sales_change, variation_rise, cast(variation_change as double) variation_change, size_type, rating_type, t1.site_name_type, launch_time_type, weight_type, ao_type as ao_val_type, rank_type, price_type, '{self.date_info}' as wmqtype from ( select asin,ao_val,zr_counts,sp_counts,sb_counts,vi_counts,bs_counts,ac_counts,tr_counts,er_counts,bsr_orders, orders,sales,is_self,pt_category_id,one_category_id,title,title_len,price,rating,total_comments, buy_box_seller_type,page_inventory,volume,weight,rank,launch_time,img_num,img_type,activity_type, one_two_val,three_four_val,five_six_val,eight_val,qa_num,brand_name,variation_num,one_star,two_star, three_star,four_star,five_star,low_star,together_asin,account_name,account_id,site_name,bsr_type,bsr_best_orders_type,zr_best_orders_type, case when buy_box_seller_type = 1 then 4 when buy_box_seller_type != 1 and site_name like 'US%' then 1 when buy_box_seller_type != 1 and site_name like 'CN%' then 2 else 3 end site_name_type, case when rating is null then 0 when rating >= 4.5 then 1 when rating < 4.5 and rating >= 4 then 2 when rating < 4 and rating >= 3.5 then 3 when rating < 3.5 and rating >= 3 then 4 else 5 end rating_type, case when weight is null then 0 when weight < 0.2 then 1 when weight >= 0.2 and weight < 0.4 then 2 when weight >= 0.4 and weight < 0.6 then 3 when weight >= 0.6 and weight < 1 then 4 when weight >= 1 and weight < 2 then 5 else 6 end weight_type, case when rank is null then 0 when rank >= 1 and rank <= 999 then 1 when rank >= 1000 and rank <= 4999 then 2 when rank >= 5000 and rank <= 9999 then 3 when rank >= 10000 and rank <= 19999 then 4 when rank >= 20000 and rank <= 29999 then 5 when rank >= 30000 and rank <= 49999 then 6 when rank >= 50000 and rank <= 69999 then 7 else 8 end rank_type, case when price is null then 0 when price < 10 then 1 when price >= 10 and price < 15 then 2 when price >= 15 and price < 20 then 3 when price >= 20 and price < 30 then 4 when price >= 30 and price < 50 then 5 else 6 end price_type, case when ao_val is null then 0 when ao_val >= 0 and ao_val < 0.1 then 2 when ao_val >= 0.1 and ao_val < 0.2 then 2 when ao_val >= 0.2 and ao_val < 0.4 then 3 when ao_val >= 0.4 and ao_val < 0.8 then 4 when ao_val >= 0.8 and ao_val < 1.2 then 5 when ao_val >= 1.2 and ao_val < 2 then 6 else 7 end ao_type, case when launch_time is null then 0 when datediff('{self.date}',launch_time) <= 30 then 1 when months_between('{self.date}',launch_time) >=1 and months_between('{self.date}',launch_time) <=3 then 2 when months_between('{self.date}',launch_time) >3 and months_between('{self.date}',launch_time) <=6 then 3 when months_between('{self.date}',launch_time) >6 and months_between('{self.date}',launch_time) <=12 then 4 when months_between('{self.date}',launch_time) >12 and months_between('{self.date}',launch_time) <=24 then 5 when months_between('{self.date}',launch_time) >24 and months_between('{self.date}',launch_time) <=36 then 6 else 7 end launch_time_type from {self.table_name} where site = '{self.site_name}' and dt = '{self.date_info}' )t1 left join ( select asin, rank_rise, rank_change, ao_rise, ao_change, price_rise, price_change, orders_rise, orders_change, rating_rise, rating_change, comments_rise,comments_change, bsr_orders_rise, bsr_orders_change, sales_rise, sales_change, variation_num, variation_rise, variation_change from {self.table_trend_name} where site='{self.site_name}' and dt='{self.date_info}' )t2 on t1.asin=t2.asin left join (select asin,size_type from dwt_asin_size where site_dt='{self.site_name}-9999-99' )t3 on t1.asin=t3.asin group by t1.asin,ao_val,zr_counts,sp_counts,sb_counts,vi_counts,bs_counts,ac_counts,tr_counts,er_counts,bsr_orders, orders,sales,is_self,pt_category_id,one_category_id,title,title_len,price,rating,total_comments, t1.buy_box_seller_type,page_inventory,volume,weight,rank,launch_time,img_num,img_type,activity_type, one_two_val,three_four_val,five_six_val,eight_val,qa_num,brand_name,t1.variation_num,one_star,two_star, three_star,four_star,five_star,low_star,together_asin,account_name,account_id,site_name,bsr_type,bsr_best_orders_type,zr_best_orders_type, rank_rise, rank_change, ao_rise, ao_change, price_rise, price_change, orders_rise, orders_change, rating_rise, rating_change, comments_rise,comments_change, bsr_orders_rise, bsr_orders_change, sales_rise, sales_change, variation_rise, variation_change, size_type, rating_type, t1.site_name_type, launch_time_type, weight_type, ao_type, rank_type, price_type """ print("sql:", sql) self.df_spark = self.spark.sql(sqlQuery=sql) self.df_spark = self.df_spark.cache() self.df_spark.show(10) print("self.df_spark.count:", self.df_spark.count()) print("分区数1:", self.df_spark.rdd.getNumPartitions()) self.df_spark = self.df_spark.repartition(20) print("分区数2:", self.df_spark.rdd.getNumPartitions()) def save_data(self): # 将结果写入es options = OrderedDict() options['es.nodes'] = self.es_url options['es.port'] = self.es_port options['es.net.http.auth.user'] = self.es_user options['es.net.http.auth.pass'] = self.es_pass options['es.mapping.id'] = "asin" options['es.resource'] = f'{self.es_table_name}/_doc' # 连接es的超时时间设置。默认1m # options['es.http.timeout'] = '10000m' options['es.nodes.wan.only'] = 'true' # # # 默认重试3次,为负值的话为无限重试(慎用) # # options['es.batch.write.retry.count'] = '15' # # 默认重试等待时间是 10s # options['es.batch.write.retry.wait'] = '60' # # 以下参数可以控制单次批量写入的数据量大小和条数(二选一) # options['es.batch.size.bytes'] = '20mb' # options['es.batch.size.entries'] = '20000' self.df_spark.write.format('org.elasticsearch.spark.sql').options(**options).mode('append').save() def run(self): self.read_data() self.save_data() if __name__ == '__main__': #site_name = sys.argv[1] # 参数1:站点 #date_type = sys.argv[2] # 参数2:week/month/quarter #year = int(sys.argv[3]) # 参数2:year #week = int(sys.argv[4]) # 参数3:week site_name = 'us' date_type = '4_week' year = 2022 week = 42 # handle_obj = EsBrandAnalytics(site_name=site_name, year=year) handle_obj = EsStDetail(site_name=site_name, date_type=date_type, year=year, week=week) handle_obj.run()