import sys import os sys.path.append(os.path.dirname(sys.path[0])) from utils.es_util import EsUtils from utils.templates_mysql import TemplatesMysql import pandas as pd from utils.common_util import CommonUtil from utils.spark_util import SparkUtil from utils.db_util import DbTypes, DBUtil class EsStDetail(TemplatesMysql): def __init__(self, site_name='us', date_type="week", date_info='2023-01', result_type='formal'): super().__init__() self.site_name = site_name self.date_type = date_type self.date_info = date_info self.result_type = result_type self.df_workflow = pd.DataFrame() self.table_name = f"dwt_flow_asin" # 创建spark对象 self.spark = SparkUtil.get_spark_sessionV4( app_name=f"flow_asin_synchronize: {self.site_name}-{self.date_type}-{self.date_info}", use_db="big_data_selection") # DataFrame对象初始化 self.df_synchronize = self.spark.sql("select 1+1") if self.date_type == '4_week': self.cur_date = self.get_date_from_week() else: self.cur_date = self.date_info self.year_month = str(self.cur_date).replace("-", "_") self.engine_mysql = DBUtil.get_db_engine(db_type=DbTypes.mysql.name, site_name="us") # elasticsearch上索引名称及索引别名准备 self.alias_name = f'{self.site_name}_st_detail_last_4_week' if self.result_type == 'formal' else f'{self.site_name}_st_detail_last_4_week_test' is_formal = self.result_type == 'formal' env_suffix = '' if is_formal else '_test' if self.date_type == '4_week': print(f"往{'正式' if is_formal else '测试'}环境上同步最近三十天数据") self.es_index_name = f"{self.site_name}_flow_asin_last30day_{self.cur_date}{env_suffix}" else: print(f"往{'正式' if is_formal else '测试'}环境上同步月数据") self.es_index_name = f"{self.site_name}_st_detail_{self.date_type}_{self.year_month}{env_suffix}" # 流程表中任务类型和记录表名 self.record_table = 'workflow_everyday' self.record_type = 'month' if self.date_type == 'month' else '30_day' self.record_table_name_field = f'{self.site_name}_flow_asin_last_month' if self.date_type == 'month' else f'{self.site_name}_flow_asin_last30day' # elasticsearch相关配置 self.client = EsUtils.get_es_client() self.es_options = EsUtils.get_es_options(self.es_index_name) self.es_body = EsUtils.get_es_body() # 正式导出需入导出记录表 if result_type == 'formal': CommonUtil.judge_is_work_hours(site_name=site_name, date_type=date_type, date_info=date_info, principal='chenyuanjie', priority=3, export_tools_type=2, belonging_to_process='流量选品') def get_date_from_week(self): df_date = self.spark.sql(f"select * from dim_date_20_to_30;") df = df_date.toPandas() df_loc = df.loc[(df.year_week == f'{self.date_info}') & (df.week_day == 7)] cur_date = list(df_loc.date)[0] print(cur_date) return str(cur_date) # 读取数据 def read_data(self): sql = f""" select asin, asin_ao_val as ao_val, asin_zr_counts as zr_counts, asin_sp_counts as sp_counts, asin_sb_counts as sb_counts, asin_vi_counts as vi_counts, asin_bs_counts as bs_counts, asin_ac_counts as ac_counts, asin_tr_counts as tr_counts, asin_er_counts as er_counts, bsr_orders, sales as bsr_orders_sale, asin_title as title, asin_title_len as title_len, asin_price as price, asin_rating as rating, asin_total_comments as total_comments, asin_buy_box_seller_type as buy_box_seller_type, asin_page_inventory as page_inventory, asin_volume as volume, asin_weight as weight, asin_color as color, asin_size as size, asin_style as style, asin_material as material, asin_launch_time as launch_time, asin_img_num as img_num, parent_asin, asin_img_type as img_type, asin_img_url as img_url, asin_activity_type as activity_type, act_one_two_val as one_two_val, act_three_four_val as three_four_val, act_five_six_val as five_six_val, act_eight_val as eight_val, asin_brand_name as brand, variation_num, one_star, two_star, three_star, four_star, five_star, low_star, together_asin, account_name, account_id, seller_country_name as site_name, asin_rank_rise as rank_rise, asin_rank_change as rank_change, asin_ao_rise as ao_rise, asin_ao_change as ao_change, asin_price_rise as price_rise, asin_price_change as price_change, asin_rating_rise as rating_rise, asin_rating_change as rating_change, asin_comments_rise as comments_rise, asin_comments_change as comments_change, asin_bsr_orders_rise as bsr_orders_rise, asin_bsr_orders_change as bsr_orders_change, asin_sales_rise as sales_rise, asin_sales_change as sales_change, asin_variation_rise as variation_rise, asin_variation_change as variation_change, asin_size_type as size_type, asin_rating_type as rating_type, asin_site_name_type as site_name_type, asin_weight_type as weight_type, asin_launch_time_type as launch_time_type, asin_ao_val_type as ao_val_type, asin_rank_type as rank_type, asin_price_type as price_type, bsr_type, bsr_best_orders_type, asin_quantity_variation_type as quantity_variation_type, package_quantity, is_movie_label, is_brand_label, is_alarm_brand, asin_type, asin_crawl_date, category_first_id, category_id, first_category_rank, current_category_rank, asin_weight_ratio, asin_bought_month, asin_lqs_rating, asin_lqs_rating_detail, title_matching_degree, asin_lob_info, is_contains_lob_info, is_package_quantity_abnormal, zr_flow_proportion, matrix_flow_proportion, matrix_ao_val, customer_reviews_json as product_features, img_info, coalesce(parent_asin, asin) as collapse_asin, follow_sellers_count from {self.table_name} where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}' """ print("sql:", sql) self.df_synchronize = self.spark.sql(sqlQuery=sql) self.df_synchronize = self.df_synchronize.repartition(40).cache() self.df_synchronize.show(10, truncate=False) # 同步数据前的准备工作 def es_prepare(self): print("当前链接的es节点信息为:" + str(EsUtils.__es_ip__)) EsUtils.create_index(self.es_index_name, self.client, self.es_body) if self.date_type != 'month': if not EsUtils.exist_index_alias(self.alias_name, self.client): EsUtils.create_index_alias(self.es_index_name, self.alias_name, self.client) else: index_name_list = EsUtils.get_index_names_associated_alias(self.alias_name, self.client) if self.es_index_name not in index_name_list: EsUtils.delete_index_alias(self.alias_name, self.client) EsUtils.create_index_alias(self.es_index_name, self.alias_name, self.client) else: pass # 同步数据到elasticsearch def save_data(self): print(f"当前同步的数据为:{self.table_name}: {self.site_name}-{self.date_type}-{self.date_info}; 数据量为: {self.df_synchronize.count()}") self.df_synchronize = self.df_synchronize.repartition(60) self.df_synchronize.write.format('org.elasticsearch.spark.sql').options(**self.es_options).mode('append').save() # 月数据同步完成后将最近三十天索引切换到月并修改流程表状态 def modify_mission_record_status(self): # 查询流程表已完成的最新记录时间点 if self.date_type == 'month' and self.result_type == 'formal': select_sql = f"select id from {self.record_table} where site_name='{self.site_name}' and date_type='{self.date_type}' and report_date='{self.cur_date}' and page='流量选品' and status_val=14 and is_end='是'" df_is_finished = pd.read_sql(select_sql, self.engine_mysql) if df_is_finished.empty: replace_sql = f""" replace into {self.record_table} (site_name, report_date, status, status_val, table_name, date_type, page, is_end, remark, export_db_type) VALUES ('{self.site_name}', '{self.cur_date}', '流量选品计算完毕', 14, '{self.record_table_name_field}', '{self.record_type}', '流量选品', '是', '流量选品计算完毕', 'elasticsearch') """ DBUtil.exec_sql('mysql', 'us', replace_sql) if EsUtils.exist_index_alias(self.alias_name, self.client): print("切换最近三十天索引别名链接到月数据") EsUtils.delete_index_alias(self.alias_name, self.client) EsUtils.create_index_alias(self.es_index_name, self.alias_name, self.client) else: EsUtils.create_index_alias(self.es_index_name, self.alias_name, self.client) else: pass def run(self): self.read_data() self.es_prepare() self.save_data() self.modify_mission_record_status() if __name__ == '__main__': arguments = sys.argv[1:] site_name = sys.argv[1] # 参数1:站点 date_type = sys.argv[2] # 参数2:week/month/quarter date_info = sys.argv[3] if len(arguments) == 4: result_type = sys.argv[4] else: result_type = 'formal' handle_obj = EsStDetail(site_name=site_name, date_type=date_type, date_info=date_info, result_type=result_type) handle_obj.run()