1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
import sys
from sqlalchemy import create_engine
from pyspark.sql import SparkSession
from collections import OrderedDict
import time
class EsStDetail(object):
def __init__(self, site_name='us', date_type="week", year=2022, week=1):
self.site_name = site_name
self.date_type = date_type
self.year = year
self.week = week
# self.date_info = f"{self.year}-{self.week}"
if self.date_type in ['4_week']:
self.table_name = f"dwt_asin_last_4_week"
self.table_trend_name = f"ads_asin_trend_last_4_week"
self.date = time.strftime("%Y-%m-%d", time.localtime())
self.es_table_name = f"{self.site_name}_st_detail_last_4_week_copy"
self.date_info = f"{self.year}-{self.week}"
if self.date_type in ['month']:
self.table_name = f"dwt_asin_month" # 月
self.table_trend_name = f"ads_asin_detail_trend_month"
import calendar # 导入库
lastDay = calendar.monthrange(self.year, self.week)[1] # 指定年月的最后一天,即指定年月的整月总天数
self.date = f"{self.year}-0{self.week}-{lastDay}" if self.week < 10 else f"{self.year}-{self.week}-{lastDay}"
print("lastDay:", lastDay) #
self.date_info = f"{self.year}_{self.week}"
self.es_table_name = f"{self.site_name}_st_detail_{self.date_type}_{self.date_info}"
print("self.date:", self.date) #
# us_st_detail_month_2022_6
if self.site_name == 'us':
self.engine = create_engine(
f'mysql+pymysql://adv_yswg:HmRCMUjt03M33Lze@rm-wz9yg9bsb2zf01ea4yo.mysql.rds.aliyuncs.com:3306/selection?charset=utf8mb4') # , pool_recycle=3600
self.es_port = '9200'
else:
if self.site_name in ['uk', 'de']:
self.es_port = '9201'
else:
self.es_port = '9202'
self.engine = create_engine(
f'mysql+pymysql://adv_yswg:HmRCMUjt03M33Lze@rm-wz9yg9bsb2zf01ea4yo.mysql.rds.aliyuncs.com:3306/selection_{self.site_name}?charset=utf8mb4') # , pool_recycle=3600
self.df_read = object()
self.df_spark = object()
# 配置es的连接对象
self.es_url = '120.79.147.190'
self.es_user = 'elastic'
self.es_pass = 'selection2021.+'
# 创建spark对象
print(f"当前同步:{self.table_name}:, {self.site_name}-{self.year}-{self.week}")
self.spark = SparkSession.builder. \
appName(f"{self.table_name}:, {self.site_name}-{self.year}-{self.week}"). \
config("spark.sql.warehouse.dir", f"hdfs://hadoop5:8020/home/big_data_selection"). \
config("spark.metastore.uris", "thrift://hadoop6:9083"). \
config("spark.network.timeout", 10000000). \
config("spark.sql.parquet.compression.codec", "lzo"). \
enableHiveSupport(). \
getOrCreate()
self.spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
self.spark.sql('''set mapred.output.compress=true''')
self.spark.sql('''set hive.exec.compress.output=true''')
self.spark.sql('''set mapred.output.compression.codec=com.hadoop.compression.lzo.LzopCodec''')
self.spark.sql(f"use selection_off_line;")
self.partition_type = "dt"
def read_data(self):
#sql = f"select * from {self.table_name} where site='{self.site_name}' and dt='{self.date_info}';"
sql = f"""
select
t1.asin,ao_val,zr_counts,sp_counts,sb_counts,vi_counts,bs_counts,ac_counts,tr_counts,er_counts,nvl(orders,0) as orders,nvl(bsr_orders,0) as bsr_orders,nvl(sales,0) as bsr_orders_sale,is_self,pt_category_id,one_category_id,title,title_len,price,rating,total_comments,
t1.buy_box_seller_type,page_inventory,volume,weight,rank,if(launch_time<'1970-01-01 00:00:00',to_date('1970-01-01 00:00:00'),launch_time) as launch_time,img_num,img_type,activity_type,
one_two_val,three_four_val,five_six_val,eight_val,qa_num,brand_name as brand,t1.variation_num,one_star,two_star,
three_star,four_star,five_star,low_star,together_asin,account_name,account_id,site_name,bsr_type,bsr_best_orders_type,zr_best_orders_type,
rank_rise, cast(rank_change as double) rank_change, cast(ao_rise as double) ao_rise, cast(ao_change as double) ao_change, cast(price_rise as double) price_rise, cast(price_change as double) price_change, orders_rise, cast(orders_change as double) orders_change, cast(rating_rise as double) rating_rise, cast(rating_change as double) rating_change,
comments_rise,cast(comments_change as double) comments_change, bsr_orders_rise, cast(bsr_orders_change as double) bsr_orders_change, cast(sales_rise as double) sales_rise, cast(sales_change as double) sales_change, variation_rise, cast(variation_change as double) variation_change,
size_type, rating_type, t1.site_name_type, launch_time_type, weight_type, ao_type as ao_val_type, rank_type, price_type, '{self.date_info}' as wmqtype
from (
select asin,ao_val,zr_counts,sp_counts,sb_counts,vi_counts,bs_counts,ac_counts,tr_counts,er_counts,bsr_orders,
orders,sales,is_self,pt_category_id,one_category_id,title,title_len,price,rating,total_comments,
buy_box_seller_type,page_inventory,volume,weight,rank,launch_time,img_num,img_type,activity_type,
one_two_val,three_four_val,five_six_val,eight_val,qa_num,brand_name,variation_num,one_star,two_star,
three_star,four_star,five_star,low_star,together_asin,account_name,account_id,site_name,bsr_type,bsr_best_orders_type,zr_best_orders_type,
case
when buy_box_seller_type = 1 then 4
when buy_box_seller_type != 1 and site_name like 'US%' then 1
when buy_box_seller_type != 1 and site_name like 'CN%' then 2
else 3 end site_name_type,
case
when rating is null then 0
when rating >= 4.5 then 1
when rating < 4.5 and rating >= 4 then 2
when rating < 4 and rating >= 3.5 then 3
when rating < 3.5 and rating >= 3 then 4
else 5 end rating_type,
case
when weight is null then 0
when weight < 0.2 then 1
when weight >= 0.2 and weight < 0.4 then 2
when weight >= 0.4 and weight < 0.6 then 3
when weight >= 0.6 and weight < 1 then 4
when weight >= 1 and weight < 2 then 5
else 6 end weight_type,
case
when rank is null then 0
when rank >= 1 and rank <= 999 then 1
when rank >= 1000 and rank <= 4999 then 2
when rank >= 5000 and rank <= 9999 then 3
when rank >= 10000 and rank <= 19999 then 4
when rank >= 20000 and rank <= 29999 then 5
when rank >= 30000 and rank <= 49999 then 6
when rank >= 50000 and rank <= 69999 then 7
else 8 end rank_type,
case
when price is null then 0
when price < 10 then 1
when price >= 10 and price < 15 then 2
when price >= 15 and price < 20 then 3
when price >= 20 and price < 30 then 4
when price >= 30 and price < 50 then 5
else 6 end price_type,
case
when ao_val is null then 0
when ao_val >= 0 and ao_val < 0.1 then 2
when ao_val >= 0.1 and ao_val < 0.2 then 2
when ao_val >= 0.2 and ao_val < 0.4 then 3
when ao_val >= 0.4 and ao_val < 0.8 then 4
when ao_val >= 0.8 and ao_val < 1.2 then 5
when ao_val >= 1.2 and ao_val < 2 then 6
else 7 end ao_type,
case
when launch_time is null then 0
when datediff('{self.date}',launch_time) <= 30 then 1
when months_between('{self.date}',launch_time) >=1 and
months_between('{self.date}',launch_time) <=3 then 2
when months_between('{self.date}',launch_time) >3 and
months_between('{self.date}',launch_time) <=6 then 3
when months_between('{self.date}',launch_time) >6 and
months_between('{self.date}',launch_time) <=12 then 4
when months_between('{self.date}',launch_time) >12 and
months_between('{self.date}',launch_time) <=24 then 5
when months_between('{self.date}',launch_time) >24 and
months_between('{self.date}',launch_time) <=36 then 6
else 7 end launch_time_type
from {self.table_name}
where site = '{self.site_name}'
and dt = '{self.date_info}'
)t1
left join
(
select
asin, rank_rise, rank_change, ao_rise, ao_change, price_rise, price_change, orders_rise, orders_change, rating_rise, rating_change,
comments_rise,comments_change, bsr_orders_rise, bsr_orders_change, sales_rise, sales_change, variation_num, variation_rise, variation_change
from {self.table_trend_name}
where site='{self.site_name}' and dt='{self.date_info}'
)t2 on t1.asin=t2.asin
left join
(select asin,size_type from dwt_asin_size where site_dt='{self.site_name}-9999-99' )t3 on t1.asin=t3.asin
group by t1.asin,ao_val,zr_counts,sp_counts,sb_counts,vi_counts,bs_counts,ac_counts,tr_counts,er_counts,bsr_orders,
orders,sales,is_self,pt_category_id,one_category_id,title,title_len,price,rating,total_comments,
t1.buy_box_seller_type,page_inventory,volume,weight,rank,launch_time,img_num,img_type,activity_type,
one_two_val,three_four_val,five_six_val,eight_val,qa_num,brand_name,t1.variation_num,one_star,two_star,
three_star,four_star,five_star,low_star,together_asin,account_name,account_id,site_name,bsr_type,bsr_best_orders_type,zr_best_orders_type,
rank_rise, rank_change, ao_rise, ao_change, price_rise, price_change, orders_rise, orders_change, rating_rise, rating_change,
comments_rise,comments_change, bsr_orders_rise, bsr_orders_change, sales_rise, sales_change, variation_rise, variation_change,
size_type, rating_type, t1.site_name_type, launch_time_type, weight_type, ao_type, rank_type, price_type
"""
print("sql:", sql)
self.df_spark = self.spark.sql(sqlQuery=sql)
self.df_spark = self.df_spark.cache()
self.df_spark.show(10)
print("self.df_spark.count:", self.df_spark.count())
print("分区数1:", self.df_spark.rdd.getNumPartitions())
self.df_spark = self.df_spark.repartition(20)
print("分区数2:", self.df_spark.rdd.getNumPartitions())
def save_data(self):
# 将结果写入es
options = OrderedDict()
options['es.nodes'] = self.es_url
options['es.port'] = self.es_port
options['es.net.http.auth.user'] = self.es_user
options['es.net.http.auth.pass'] = self.es_pass
options['es.mapping.id'] = "asin"
options['es.resource'] = f'{self.es_table_name}/_doc'
# 连接es的超时时间设置。默认1m
# options['es.http.timeout'] = '10000m'
options['es.nodes.wan.only'] = 'true'
# # # 默认重试3次,为负值的话为无限重试(慎用)
# # options['es.batch.write.retry.count'] = '15'
# # 默认重试等待时间是 10s
# options['es.batch.write.retry.wait'] = '60'
# # 以下参数可以控制单次批量写入的数据量大小和条数(二选一)
# options['es.batch.size.bytes'] = '20mb'
# options['es.batch.size.entries'] = '20000'
self.df_spark.write.format('org.elasticsearch.spark.sql').options(**options).mode('append').save()
def run(self):
self.read_data()
self.save_data()
if __name__ == '__main__':
#site_name = sys.argv[1] # 参数1:站点
#date_type = sys.argv[2] # 参数2:week/month/quarter
#year = int(sys.argv[3]) # 参数2:year
#week = int(sys.argv[4]) # 参数3:week
site_name = 'us'
date_type = '4_week'
year = 2022
week = 42
# handle_obj = EsBrandAnalytics(site_name=site_name, year=year)
handle_obj = EsStDetail(site_name=site_name, date_type=date_type, year=year, week=week)
handle_obj.run()