from sqlalchemy import create_engine from pyspark import pandas as ps import pandas as pd #year, week = 2022, 23 #year_month = f'{year}_6' #site_name = 'us' import sys site_name = sys.argv[1] # 参数1:站点 # year = int(sys.argv[2]) # 参数2:year # week = int(sys.argv[3]) # 参数3:week # print("year, week:", year, week, type(year), type(week)) if site_name == 'us': engine = create_engine(f'mysql+pymysql://adv_yswg:HmRCMUjt03M33Lze@rm-wz9yg9bsb2zf01ea4yo.mysql.rds.aliyuncs.com:3306/selection?charset=utf8mb4') # , pool_recycle=3600 else: engine = create_engine(f'mysql+pymysql://adv_yswg:HmRCMUjt03M33Lze@rm-wz9yg9bsb2zf01ea4yo.mysql.rds.aliyuncs.com:3306/selection_{site_name}?charset=utf8mb4') # , pool_recycle=3600 # sql = f""" # select allat.*,b.one_category_id,IF(ISNULL(ocr.orders),0,ocr.orders) as bsr_orders,IF(ISNULL(ocr.orders_day),0,ocr.orders_day) as orders_day, # one_star,two_star,three_star,four_star,five_star,low_star,GROUP_CONCAT(together_asin) as together_asin,o.brand # from ( # select # a.id,a.asin,a.pt_category_id,a.zr_counts,a.sp_counts,a.sb_counts,a.vi_counts,a.bs_counts,a.tr_counts,a.er_counts,a.ac_counts,IF(ISNULL(ao_val),0,ao_val) as ao_val,IF(ISNULL(orders),0,orders) as orders,state,is_self, # c.title,c.title_len,cast(IF(ISNULL(c.rating),0,c.rating) as decimal(3,1)) as rating,c.price,c.rank,cast(IF(ISNULL(c.total_comments),0,c.total_comments) as SIGNED) as total_comments,IF(c.launch_time<'1970-01-01 00:00:00',STR_TO_DATE('1970-01-01 00:00:00','%%Y-%%m-%%d %%H:%%i:%%s'),c.launch_time) as launch_time, # c.buy_box_seller_type,c.page_inventory,c.volume,c.weight,c.img_num,c.img_type, # c.activity_type,c.one_two_val,c.three_four_val,c.eight_val, # c.qa_num, # c.updated_at as updated_at # from ( # select asin,title,title_len,rating,price,rank,total_comments,launch_time, buy_box_seller_type,page_inventory,volume,weight,updated_at,img_num,img_type, # activity_type,one_two_val,three_four_val,eight_val,qa_num # from {site_name}_asin_detail_{year}_{week} where updated_at >= '2022-06-02 17:25:52' # ) c # inner join {site_name}_asin_st_{year}_{week} a on c.asin=a.asin # ) allat # left JOIN (select * from {site_name}_asin_star where week={week}) m on allat.asin=m.asin # left JOIN {site_name}_bs_category b on allat.pt_category_id=b.id # LEFT JOIN {site_name}_one_category_report ocr on b.one_category_id = ocr.cate_1_id and allat.rank=ocr.rank # left JOIN {site_name}_brand_asin o on allat.asin=o.asin # LEFT JOIN (select * from {site_name}_asin_fb_together where week={week}) n on allat.asin=n.asin # GROUP BY allat.asin; # """ sql = f""" select `id`, `search_term`, `quantity_being_sold`, `order_2021_1`, `order_2021_2`, `order_2021_3`, `order_2021_4`, `order_2021_5`, `order_2021_6`, `order_2021_7`, `order_2021_8`, `order_2021_9`, `order_2021_10`, `order_2021_11`, `order_2021_12`, `order_2021_13`, `order_2021_14`, `order_2021_15`, `order_2021_16`, `order_2021_17`, `order_2021_18`, `order_2021_19`, `order_2021_20`, `order_2021_21`, `order_2021_22`, `order_2021_23`, `order_2021_24`, `order_2021_25`, `order_2021_26`, `order_2021_27`, `order_2021_28`, `order_2021_29`, `order_2021_30`, `order_2021_31`, `order_2021_32`, `order_2021_33`, `order_2021_34`, `order_2021_35`, `order_2021_36`, `order_2021_37`, `order_2021_38`, `order_2021_39`, `order_2021_40`, `order_2021_41`, `order_2021_42`, `order_2021_43`, `order_2021_44`, `order_2021_45`, `order_2021_46`, `order_2021_47`, `order_2021_48`, `order_2021_49`, `order_2021_50`, `order_2021_51`, `order_2021_52`, `avg_order_2021_1_month`, `avg_order_2021_2_month`, `avg_order_2021_3_month`, `avg_order_2021_4_month`, `avg_order_2021_5_month`, `avg_order_2021_6_month`, `avg_order_2021_7_month`, `avg_order_2021_8_month`, `avg_order_2021_9_month`, `avg_order_2021_10_month`, `avg_order_2021_11_month`, `avg_order_2021_12_month`, `avg_order_2021_1_quarter`, `avg_order_2021_2_quarter`, `avg_order_2021_3_quarter`, `avg_order_2021_4_quarter`, `category_id`, `created_time`, `updated_time` from aba_base_year """.replace("%", "%%") df_pd = pd.read_sql(sql=sql, con=engine) print("df_pd.shape:", df_pd.shape) df_spark = ps.from_pandas(df_pd).to_spark() print("分区数1:", df_spark.rdd.getNumPartitions()) df_spark = df_spark.repartition(25) print("分区数2:", df_spark.rdd.getNumPartitions()) from pyspark.sql import SparkSession es_url: str = '120.79.147.190' # es_port = '9200' if site_name in ['us']: es_port = '9200' elif site_name in ['uk', 'de']: es_port = '9201' else: es_port = '9202' es_user = 'elastic' es_pass = 'selection2021.+' es_resource = 'us_test2/_doc' app_name = 'es connection' db_name = 'big_data_selection' # spark = SparkSession.builder. \ # appName(f"{app_name}"). \ # config("spark.sql.warehouse.dir", f"hdfs://hadoop5:8020/home/big_data_selection"). \ # config("spark.metastore.uris", "thrift://hadoop4:9083"). \ # config("spark.network.timeout", 1000000). \ # config("spark.sql.parquet.compression.codec", "lzo"). \ # enableHiveSupport(). \ # getOrCreate() # spark.sql("set hive.exec.dynamic.partition.mode=nonstrict") # spark.sql('''set mapred.output.compress=true''') # spark.sql('''set hive.exec.compress.output=true''') # spark.sql('''set mapred.output.compression.codec=com.hadoop.compression.lzo.LzopCodec''') # spark.sql(f"show databases;").show() # spark.sql(f"use {db_name};") # # df_read = spark.sql("select id, rank, search_num, search_sum from ods_rank_search_rate_repeat where site_name='us'") # print("分区数:", df_read.rdd.getNumPartitions()) # df_read.show(10, truncate=False) from collections import OrderedDict # 将结果写入es options = OrderedDict() options['es.nodes'] = es_url # es_port = "9201" options['es.port'] = es_port options['es.net.http.auth.user'] = es_user options['es.net.http.auth.pass'] = es_pass options['es.mapping.id'] = "id" options['es.resource'] = f'{site_name}_aba_base_year_2021/_doc' # 连接es的超时时间设置。默认1m # options['es.http.timeout'] = '10000m' options['es.nodes.wan.only'] = 'true' # # 默认重试3次,为负值的话为无限重试(慎用) # options['es.batch.write.retry.count'] = '15' # # 默认重试等待时间是 10s # options['es.batch.write.retry.wait'] = '60' # # 以下参数可以控制单次批量写入的数据量大小和条数(二选一) # options['es.batch.size.bytes'] = '20mb' # options['es.batch.size.entries'] = '20000' df_spark.write.format('org.elasticsearch.spark.sql').options(**options).mode('append').save()