es_flow_asin.py
9.77 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
import sys
import os
sys.path.append(os.path.dirname(sys.path[0]))
from utils.es_util import EsUtils
from utils.templates_mysql import TemplatesMysql
import pandas as pd
from utils.common_util import CommonUtil
from utils.spark_util import SparkUtil
from utils.db_util import DbTypes, DBUtil
class EsStDetail(TemplatesMysql):
def __init__(self, site_name='us', date_type="week", date_info='2023-01', result_type='formal'):
super().__init__()
self.site_name = site_name
self.date_type = date_type
self.date_info = date_info
self.result_type = result_type
self.df_workflow = pd.DataFrame()
self.table_name = f"dwt_flow_asin"
# 创建spark对象
self.spark = SparkUtil.get_spark_sessionV4(
app_name=f"flow_asin_synchronize: {self.site_name}-{self.date_type}-{self.date_info}",
use_db="big_data_selection")
# DataFrame对象初始化
self.df_synchronize = self.spark.sql("select 1+1")
if self.date_type == '4_week':
self.cur_date = self.get_date_from_week()
else:
self.cur_date = self.date_info
self.year_month = str(self.cur_date).replace("-", "_")
self.engine_mysql = DBUtil.get_db_engine(db_type=DbTypes.mysql.name, site_name="us")
# elasticsearch上索引名称及索引别名准备
self.alias_name = f'{self.site_name}_st_detail_last_4_week' if self.result_type == 'formal' else f'{self.site_name}_st_detail_last_4_week_test'
is_formal = self.result_type == 'formal'
env_suffix = '' if is_formal else '_test'
if self.date_type == '4_week':
print(f"往{'正式' if is_formal else '测试'}环境上同步最近三十天数据")
self.es_index_name = f"{self.site_name}_flow_asin_last30day_{self.cur_date}{env_suffix}"
else:
print(f"往{'正式' if is_formal else '测试'}环境上同步月数据")
self.es_index_name = f"{self.site_name}_st_detail_{self.date_type}_{self.year_month}{env_suffix}"
# 流程表中任务类型和记录表名
self.record_table = 'workflow_everyday'
self.record_type = 'month' if self.date_type == 'month' else '30_day'
self.record_table_name_field = f'{self.site_name}_flow_asin_last_month' if self.date_type == 'month' else f'{self.site_name}_flow_asin_last30day'
# elasticsearch相关配置
self.client = EsUtils.get_es_client()
self.es_options = EsUtils.get_es_options(self.es_index_name)
self.es_body = EsUtils.get_es_body()
# 正式导出需入导出记录表
if result_type == 'formal':
CommonUtil.judge_is_work_hours(site_name=site_name, date_type=date_type, date_info=date_info,
principal='wangrui4', priority=3, export_tools_type=2,
belonging_to_process='流量选品')
def get_date_from_week(self):
df_date = self.spark.sql(f"select * from dim_date_20_to_30;")
df = df_date.toPandas()
df_loc = df.loc[(df.year_week == f'{self.date_info}') & (df.week_day == 7)]
cur_date = list(df_loc.date)[0]
print(cur_date)
return str(cur_date)
# 读取数据
def read_data(self):
sql = f"""
select asin, asin_ao_val as ao_val, asin_zr_counts as zr_counts, asin_sp_counts as sp_counts,
asin_sb_counts as sb_counts, asin_vi_counts as vi_counts, asin_bs_counts as bs_counts,
asin_ac_counts as ac_counts, asin_tr_counts as tr_counts, asin_er_counts as er_counts,
bsr_orders, sales as bsr_orders_sale, asin_title as title, asin_title_len as title_len, asin_price as price,
asin_rating as rating, asin_total_comments as total_comments, asin_buy_box_seller_type as buy_box_seller_type,
asin_page_inventory as page_inventory, asin_volume as volume, asin_weight as weight, asin_color as color,
asin_size as size, asin_style as style, asin_material as material, asin_launch_time as launch_time,
asin_img_num as img_num, parent_asin, asin_img_type as img_type, asin_img_url as img_url,
asin_activity_type as activity_type, act_one_two_val as one_two_val, act_three_four_val as three_four_val,
act_five_six_val as five_six_val, act_eight_val as eight_val, asin_brand_name as brand, variation_num,
one_star, two_star, three_star, four_star, five_star, low_star, together_asin, account_name, account_id,
seller_country_name as site_name, asin_rank_rise as rank_rise, asin_rank_change as rank_change,
asin_ao_rise as ao_rise, asin_ao_change as ao_change, asin_price_rise as price_rise,
asin_price_change as price_change, asin_rating_rise as rating_rise, asin_rating_change as rating_change,
asin_comments_rise as comments_rise, asin_comments_change as comments_change,
asin_bsr_orders_rise as bsr_orders_rise, asin_bsr_orders_change as bsr_orders_change,
asin_sales_rise as sales_rise, asin_sales_change as sales_change, asin_variation_rise as variation_rise,
asin_variation_change as variation_change, asin_size_type as size_type, asin_rating_type as rating_type,
asin_site_name_type as site_name_type, asin_weight_type as weight_type, asin_launch_time_type as launch_time_type,
asin_ao_val_type as ao_val_type, asin_rank_type as rank_type, asin_price_type as price_type, bsr_type,
bsr_best_orders_type, asin_quantity_variation_type as quantity_variation_type, package_quantity, is_movie_label,
is_brand_label, is_alarm_brand, asin_type, asin_crawl_date, category_first_id, category_id, first_category_rank,
current_category_rank, asin_weight_ratio, asin_bought_month, asin_lqs_rating, asin_lqs_rating_detail,
title_matching_degree, asin_lob_info, is_contains_lob_info, is_package_quantity_abnormal, zr_flow_proportion,
matrix_flow_proportion, matrix_ao_val, customer_reviews_json as product_features, img_info,
coalesce(parent_asin, asin) as collapse_asin, follow_sellers_count
from {self.table_name} where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'
"""
print("sql:", sql)
self.df_synchronize = self.spark.sql(sqlQuery=sql)
self.df_synchronize = self.df_synchronize.repartition(40).cache()
self.df_synchronize.show(10, truncate=False)
# 同步数据前的准备工作
def es_prepare(self):
print("当前链接的es节点信息为:" + str(EsUtils.__es_ip__))
EsUtils.create_index(self.es_index_name, self.client, self.es_body)
if self.date_type != 'month':
if not EsUtils.exist_index_alias(self.alias_name, self.client):
EsUtils.create_index_alias(self.es_index_name, self.alias_name, self.client)
else:
index_name_list = EsUtils.get_index_names_associated_alias(self.alias_name, self.client)
if self.es_index_name not in index_name_list:
EsUtils.delete_index_alias(self.alias_name, self.client)
EsUtils.create_index_alias(self.es_index_name, self.alias_name, self.client)
else:
pass
# 同步数据到elasticsearch
def save_data(self):
print(f"当前同步的数据为:{self.table_name}: {self.site_name}-{self.date_type}-{self.date_info}; 数据量为: {self.df_synchronize.count()}")
self.df_synchronize = self.df_synchronize.repartition(60)
self.df_synchronize.write.format('org.elasticsearch.spark.sql').options(**self.es_options).mode('append').save()
# 月数据同步完成后将最近三十天索引切换到月并修改流程表状态
def modify_mission_record_status(self):
# 查询流程表已完成的最新记录时间点
if self.date_type == 'month' and self.result_type == 'formal':
select_sql = f"select id from {self.record_table} where site_name='{self.site_name}' and date_type='{self.date_type}' and report_date='{self.cur_date}' and page='流量选品' and status_val=14 and is_end='是'"
df_is_finished = pd.read_sql(select_sql, self.engine_mysql)
if df_is_finished.empty:
replace_sql = f"""
replace into {self.record_table} (site_name, report_date, status, status_val, table_name, date_type, page, is_end, remark, export_db_type)
VALUES ('{self.site_name}', '{self.cur_date}', '流量选品计算完毕', 14, '{self.record_table_name_field}', '{self.record_type}', '流量选品', '是', '流量选品计算完毕', 'elasticsearch')
"""
DBUtil.exec_sql('mysql', 'us', replace_sql)
if EsUtils.exist_index_alias(self.alias_name, self.client):
print("切换最近三十天索引别名链接到月数据")
EsUtils.delete_index_alias(self.alias_name, self.client)
EsUtils.create_index_alias(self.es_index_name, self.alias_name, self.client)
else:
EsUtils.create_index_alias(self.es_index_name, self.alias_name, self.client)
else:
pass
def run(self):
self.read_data()
self.es_prepare()
self.save_data()
self.modify_mission_record_status()
if __name__ == '__main__':
arguments = sys.argv[1:]
site_name = sys.argv[1] # 参数1:站点
date_type = sys.argv[2] # 参数2:week/month/quarter
date_info = sys.argv[3]
if len(arguments) == 4:
result_type = sys.argv[4]
else:
result_type = 'formal'
handle_obj = EsStDetail(site_name=site_name, date_type=date_type, date_info=date_info, result_type=result_type)
handle_obj.run()