import os import sys import pandas as pd sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from utils.templates_mysql import TemplatesMysql class ExportDwtStInfo(TemplatesMysql): def __init__(self, site_name='us', date_type="week", date_info='2022-1'): """ 默认导入所有站点的所有data_type类型的表 """ super().__init__() self.site_name = site_name if self.site_name == "all": self.site_name_list = ["us", "uk", "de", "es", "fr", "it"] else: self.site_name_list = [self.site_name] self.date_type = date_type self.date_info = date_info if self.date_type in ["week", "4_week"]: self.year_week = date_info self.year = int(self.year_week.split("-")[0]) self.week = int(self.year_week.split("-")[-1]) self.params_sql = f" and week={self.week}" self.table_save = f"{self.site_name}_brand_st_info_{self.date_type}" self.table_update = f"{self.site_name}_brand_analytics_{self.year}" else: if self.date_type in ["month"]: self.year_month = date_info self.year = int(self.year_month.split("-")[0]) self.month = int(self.year_month.split("-")[-1]) self.params_sql = f" and month={self.month}" else: self.year_quarter = date_info self.year = int(self.year_quarter.split("-")[0]) self.quarter = int(self.year_quarter.split("-")[-1]) self.params_sql = f" and quarter={self.quarter}" self.table_save = f"{self.site_name}_brand_st_info_{self.date_type}" self.table_update = f"{self.site_name}_brand_analytics_{self.date_type}" self.path_sh = f"/opt/module/spark/demo/sqoop_script/dwd/export_dwt_st_info.sh" self.df_table_counts = pd.DataFrame() def export_data(self, site_name): with self.engine.begin() as conn: sql_delete = f"delete from {self.table_save} where year={self.year} {self.params_sql}" conn.execute(sql_delete) print(f"开始导出{site_name}站点的数据") os.system(f"{self.path_sh} {site_name} {self.date_type} {self.date_info.split('-')[0]} {self.date_info.split('-')[1]}") # os.system(f"{self.path_sh} {site_name} {self.date_type} {self.year} {self.week}") def check_data(self, site_name): self.site_name = site_name self.engine = self.mysql_connect() sql_read = f"select count(*) as table_counts from {self.table_save} where year={self.year} {self.params_sql}" self.df_table_counts = pd.read_sql(sql_read, con=self.engine) table_counts = list(self.df_table_counts.table_counts)[0] print("table_counts:", table_counts) if table_counts == 0: self.export_data(site_name=site_name) def update_data(self): with self.engine.begin() as conn: # conn.execute(f"set @week={self.week};") print(f"1. {self.site_name}--更新ao_val等9大指标") # , a.orders=b.st_search_sum 暂时舍弃,需要重新计算 if self.date_type in ["week", "4_week"]: params = f"b.week={self.week} and b.year={self.year}" elif self.date_type in ["month"]: params = f"b.month={self.month} and b.year={self.year}" else: params = f"b.quarter={self.quarter} and b.year={self.year}" sql_update_1 = f"""UPDATE {self.table_update} a, {self.table_save} b set a.ao_val=b.st_ao_val, a.ao_val_rank=b.st_ao_val_rank, a.ao_val_rate=b.st_ao_val_rate, a.is_ascending_text=b.st_is_ascending_text, a.is_search_text=b.st_is_search_text, a.quantity_being_sold=b.st_quantity_being_sold, a.bsr_orders=b.st_asin_bs_orders_sum, a.category_id=b.st_asin_bs_cate_current_id WHERE {params} and a.id=b.st_brand_id;""" print(f"sql_update_1:", sql_update_1) conn.execute(sql_update_1) print(f"2. {self.site_name}--更新is_first_text") sql_update_2 = f"""UPDATE {self.table_update} a, {self.table_save} b set a.is_first_text=b.st_is_first_text WHERE a.rank<=700000 and {params} and b.st_is_first_text=1 and a.id=b.st_brand_id;""" print(f"sql_update_2:", sql_update_2) conn.execute(sql_update_2) print(f"3. 更新工作流表workflow_exhibition的data_type=7(ao_val计算)的状态为3") sql_update_3 = f"update selection.workflow_exhibition set status=3 " \ f"where week='{self.year}_{self.week}' and site_name='{self.site_name}' and data_type=7" conn.execute(sql_update_3) def run(self): for site_name in self.site_name_list: self.site_name = site_name if self.date_type in ["week", "4_week"]: self.year_week = date_info self.year = int(self.year_week.split("-")[0]) self.week = int(self.year_week.split("-")[-1]) self.params_sql = f" and week={self.week}" self.table_save = f"{self.site_name}_brand_st_info_{self.date_type}" # if self.site_name == 'us': # self.table_save = f"{self.site_name}_brand_st_info_{self.date_type}_copy1" self.table_update = f"{self.site_name}_brand_analytics_{self.year}" else: if self.date_type in ["month"]: self.year_month = date_info self.year = int(self.year_month.split("-")[0]) self.month = int(self.year_month.split("-")[-1]) else: self.year_quarter = date_info self.year = int(self.year_quarter.split("-")[0]) self.quarter = int(self.year_quarter.split("-")[-1]) self.table_save = f"{self.site_name}_brand_st_info_{self.date_type}" self.table_update = f"{self.site_name}_brand_analytics_{self.date_type}" self.engine = self.mysql_connect() self.export_data(site_name=site_name) self.check_data(site_name=site_name) self.update_data() if __name__ == '__main__': site_name = sys.argv[1] # 参数1:site_name列表-->all:所有站点 date_type = sys.argv[2] # 参数2:类型:week/4_week/month/quarter date_info = sys.argv[3] # 参数2:年-周, 比如: 2022-1 # handle_obj = ExportDwtStInfo(site_name_flag=site_name_flag, year_week=year_week) handle_obj = ExportDwtStInfo(site_name=site_name, date_type=date_type, date_info=date_info) handle_obj.run()