es_aba_base_year.py 6.37 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
from sqlalchemy import create_engine
from pyspark import pandas as ps
import pandas as pd
#year, week = 2022, 23
#year_month = f'{year}_6'
#site_name = 'us'
import sys

site_name = sys.argv[1]  # 参数1:站点
# year = int(sys.argv[2])  # 参数2:year
# week = int(sys.argv[3])  # 参数3:week
# print("year, week:", year, week, type(year), type(week))
if site_name == 'us':
    engine = create_engine(f'mysql+pymysql://adv_yswg:HmRCMUjt03M33Lze@rm-wz9yg9bsb2zf01ea4yo.mysql.rds.aliyuncs.com:3306/selection?charset=utf8mb4')  # , pool_recycle=3600
else:
    engine = create_engine(f'mysql+pymysql://adv_yswg:HmRCMUjt03M33Lze@rm-wz9yg9bsb2zf01ea4yo.mysql.rds.aliyuncs.com:3306/selection_{site_name}?charset=utf8mb4')  # , pool_recycle=3600

# sql = f"""
# select allat.*,b.one_category_id,IF(ISNULL(ocr.orders),0,ocr.orders) as bsr_orders,IF(ISNULL(ocr.orders_day),0,ocr.orders_day) as orders_day,
# one_star,two_star,three_star,four_star,five_star,low_star,GROUP_CONCAT(together_asin) as together_asin,o.brand
# from (
# select
# a.id,a.asin,a.pt_category_id,a.zr_counts,a.sp_counts,a.sb_counts,a.vi_counts,a.bs_counts,a.tr_counts,a.er_counts,a.ac_counts,IF(ISNULL(ao_val),0,ao_val) as ao_val,IF(ISNULL(orders),0,orders) as orders,state,is_self,
# c.title,c.title_len,cast(IF(ISNULL(c.rating),0,c.rating) as decimal(3,1)) as rating,c.price,c.rank,cast(IF(ISNULL(c.total_comments),0,c.total_comments) as SIGNED) as total_comments,IF(c.launch_time<'1970-01-01 00:00:00',STR_TO_DATE('1970-01-01 00:00:00','%%Y-%%m-%%d %%H:%%i:%%s'),c.launch_time) as launch_time,
# c.buy_box_seller_type,c.page_inventory,c.volume,c.weight,c.img_num,c.img_type,
# c.activity_type,c.one_two_val,c.three_four_val,c.eight_val,
# c.qa_num,
# c.updated_at as updated_at
# from (
# select asin,title,title_len,rating,price,rank,total_comments,launch_time, buy_box_seller_type,page_inventory,volume,weight,updated_at,img_num,img_type,
# activity_type,one_two_val,three_four_val,eight_val,qa_num
# from {site_name}_asin_detail_{year}_{week} where updated_at >= '2022-06-02 17:25:52'
# ) c
# inner join {site_name}_asin_st_{year}_{week} a  on c.asin=a.asin
# ) allat
# left JOIN (select * from {site_name}_asin_star where week={week}) m on allat.asin=m.asin
# left JOIN {site_name}_bs_category b on allat.pt_category_id=b.id
# LEFT JOIN {site_name}_one_category_report ocr on b.one_category_id = ocr.cate_1_id and allat.rank=ocr.rank
# left JOIN {site_name}_brand_asin o on allat.asin=o.asin
# LEFT JOIN (select * from {site_name}_asin_fb_together where week={week}) n on allat.asin=n.asin
# GROUP BY allat.asin;
# """

sql = f"""
select `id`, `search_term`, `quantity_being_sold`, 
`order_2021_1`, `order_2021_2`, `order_2021_3`, `order_2021_4`, `order_2021_5`, `order_2021_6`, `order_2021_7`, `order_2021_8`, `order_2021_9`, `order_2021_10`, `order_2021_11`, `order_2021_12`, `order_2021_13`, `order_2021_14`, `order_2021_15`, `order_2021_16`, `order_2021_17`, `order_2021_18`, `order_2021_19`, `order_2021_20`, `order_2021_21`, `order_2021_22`, `order_2021_23`, `order_2021_24`, `order_2021_25`, `order_2021_26`, `order_2021_27`, `order_2021_28`, `order_2021_29`, `order_2021_30`, `order_2021_31`, `order_2021_32`, `order_2021_33`, `order_2021_34`, `order_2021_35`, `order_2021_36`, `order_2021_37`, `order_2021_38`, `order_2021_39`, `order_2021_40`, `order_2021_41`, `order_2021_42`, `order_2021_43`, `order_2021_44`, `order_2021_45`, `order_2021_46`, `order_2021_47`, `order_2021_48`, `order_2021_49`, `order_2021_50`, `order_2021_51`, `order_2021_52`, 
`avg_order_2021_1_month`, `avg_order_2021_2_month`, `avg_order_2021_3_month`, `avg_order_2021_4_month`, `avg_order_2021_5_month`, `avg_order_2021_6_month`, `avg_order_2021_7_month`, `avg_order_2021_8_month`, `avg_order_2021_9_month`, `avg_order_2021_10_month`, `avg_order_2021_11_month`, `avg_order_2021_12_month`, 
`avg_order_2021_1_quarter`, `avg_order_2021_2_quarter`, `avg_order_2021_3_quarter`, `avg_order_2021_4_quarter`,
`category_id`, `created_time`, `updated_time`
from aba_base_year
""".replace("%", "%%")

df_pd = pd.read_sql(sql=sql, con=engine)

print("df_pd.shape:", df_pd.shape)

df_spark = ps.from_pandas(df_pd).to_spark()
print("分区数1:", df_spark.rdd.getNumPartitions())
df_spark = df_spark.repartition(25)
print("分区数2:", df_spark.rdd.getNumPartitions())




from pyspark.sql import SparkSession
es_url: str = '120.79.147.190'
# es_port = '9200'
if site_name in ['us']:
    es_port = '9200'
elif site_name in ['uk', 'de']:
    es_port = '9201'
else:
    es_port = '9202'
es_user = 'elastic'
es_pass = 'selection2021.+'
es_resource = 'us_test2/_doc'

app_name = 'es connection'
db_name = 'big_data_selection'


# spark = SparkSession.builder. \
#     appName(f"{app_name}"). \
#     config("spark.sql.warehouse.dir", f"hdfs://hadoop5:8020/home/big_data_selection"). \
#     config("spark.metastore.uris", "thrift://hadoop4:9083"). \
#     config("spark.network.timeout", 1000000). \
#     config("spark.sql.parquet.compression.codec", "lzo"). \
#     enableHiveSupport(). \
#     getOrCreate()
# spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
# spark.sql('''set mapred.output.compress=true''')
# spark.sql('''set hive.exec.compress.output=true''')
# spark.sql('''set mapred.output.compression.codec=com.hadoop.compression.lzo.LzopCodec''')

# spark.sql(f"show databases;").show()
# spark.sql(f"use {db_name};")
#
# df_read = spark.sql("select id, rank, search_num, search_sum from ods_rank_search_rate_repeat where site_name='us'")
# print("分区数:", df_read.rdd.getNumPartitions())
# df_read.show(10, truncate=False)

from collections import OrderedDict

# 将结果写入es
options = OrderedDict()
options['es.nodes'] = es_url
# es_port = "9201"
options['es.port'] = es_port
options['es.net.http.auth.user'] = es_user
options['es.net.http.auth.pass'] = es_pass
options['es.mapping.id'] = "id"
options['es.resource'] = f'{site_name}_aba_base_year_2021/_doc'
# 连接es的超时时间设置。默认1m
# options['es.http.timeout'] = '10000m'
options['es.nodes.wan.only'] = 'true'
# # 默认重试3次,为负值的话为无限重试(慎用)
# options['es.batch.write.retry.count'] = '15'
# # 默认重试等待时间是 10s
# options['es.batch.write.retry.wait'] = '60'
# # 以下参数可以控制单次批量写入的数据量大小和条数(二选一)
# options['es.batch.size.bytes'] = '20mb'
# options['es.batch.size.entries'] = '20000'

df_spark.write.format('org.elasticsearch.spark.sql').options(**options).mode('append').save()