export_dwt_st_info.py 6.7 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
import os
import sys
import pandas as pd

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from utils.templates_mysql import TemplatesMysql


class ExportDwtStInfo(TemplatesMysql):

    def __init__(self, site_name='us', date_type="week", date_info='2022-1'):
        """
        默认导入所有站点的所有data_type类型的表
        """
        super().__init__()
        self.site_name = site_name
        if self.site_name == "all":
            self.site_name_list = ["us", "uk", "de", "es", "fr", "it"]
        else:
            self.site_name_list = [self.site_name]
        self.date_type = date_type
        self.date_info = date_info
        if self.date_type in ["week", "4_week"]:
            self.year_week = date_info
            self.year = int(self.year_week.split("-")[0])
            self.week = int(self.year_week.split("-")[-1])
            self.params_sql = f" and week={self.week}"
            self.table_save = f"{self.site_name}_brand_st_info_{self.date_type}"
            self.table_update = f"{self.site_name}_brand_analytics_{self.year}"
        else:
            if self.date_type in ["month"]:
                self.year_month = date_info
                self.year = int(self.year_month.split("-")[0])
                self.month = int(self.year_month.split("-")[-1])
                self.params_sql = f" and month={self.month}"
            else:
                self.year_quarter = date_info
                self.year = int(self.year_quarter.split("-")[0])
                self.quarter = int(self.year_quarter.split("-")[-1])
                self.params_sql = f" and quarter={self.quarter}"
            self.table_save = f"{self.site_name}_brand_st_info_{self.date_type}"
            self.table_update = f"{self.site_name}_brand_analytics_{self.date_type}"
        self.path_sh = f"/opt/module/spark/demo/sqoop_script/dwd/export_dwt_st_info.sh"
        self.df_table_counts = pd.DataFrame()

    def export_data(self, site_name):
        with self.engine.begin() as conn:
            sql_delete = f"delete from {self.table_save} where year={self.year} {self.params_sql}"
            conn.execute(sql_delete)
        print(f"开始导出{site_name}站点的数据")
        os.system(f"{self.path_sh} {site_name} {self.date_type} {self.date_info.split('-')[0]} {self.date_info.split('-')[1]}")
        # os.system(f"{self.path_sh} {site_name} {self.date_type} {self.year} {self.week}")

    def check_data(self, site_name):
        self.site_name = site_name
        self.engine = self.mysql_connect()
        sql_read = f"select count(*) as table_counts from {self.table_save} where year={self.year} {self.params_sql}"
        self.df_table_counts = pd.read_sql(sql_read, con=self.engine)
        table_counts = list(self.df_table_counts.table_counts)[0]
        print("table_counts:", table_counts)
        if table_counts == 0:
            self.export_data(site_name=site_name)

    def update_data(self):
        with self.engine.begin() as conn:
            # conn.execute(f"set @week={self.week};")
            print(f"1. {self.site_name}--更新ao_val等9大指标")  # , a.orders=b.st_search_sum 暂时舍弃,需要重新计算
            if self.date_type in ["week", "4_week"]:
                params = f"b.week={self.week} and b.year={self.year}"
            elif self.date_type in ["month"]:
                params = f"b.month={self.month} and b.year={self.year}"
            else:
                params = f"b.quarter={self.quarter} and b.year={self.year}"
            sql_update_1 = f"""UPDATE {self.table_update} a, {self.table_save} b 
                                set a.ao_val=b.st_ao_val, a.ao_val_rank=b.st_ao_val_rank, a.ao_val_rate=b.st_ao_val_rate,  a.is_ascending_text=b.st_is_ascending_text,  a.is_search_text=b.st_is_search_text,  
                                a.quantity_being_sold=b.st_quantity_being_sold, 
                                a.bsr_orders=b.st_asin_bs_orders_sum,  a.category_id=b.st_asin_bs_cate_current_id 
                                WHERE {params} and a.id=b.st_brand_id;"""
            print(f"sql_update_1:", sql_update_1)
            conn.execute(sql_update_1)
            print(f"2. {self.site_name}--更新is_first_text")
            sql_update_2 = f"""UPDATE {self.table_update} a, {self.table_save} b 
                                set a.is_first_text=b.st_is_first_text WHERE a.rank<=700000 and {params} and b.st_is_first_text=1 and a.id=b.st_brand_id;"""
            print(f"sql_update_2:", sql_update_2)
            conn.execute(sql_update_2)
            print(f"3. 更新工作流表workflow_exhibition的data_type=7(ao_val计算)的状态为3")
            sql_update_3 = f"update selection.workflow_exhibition set status=3 " \
                           f"where week='{self.year}_{self.week}' and site_name='{self.site_name}' and data_type=7"
            conn.execute(sql_update_3)

    def run(self):
        for site_name in self.site_name_list:
            self.site_name = site_name
            if self.date_type in ["week", "4_week"]:
                self.year_week = date_info
                self.year = int(self.year_week.split("-")[0])
                self.week = int(self.year_week.split("-")[-1])
                self.params_sql = f" and week={self.week}"
                self.table_save = f"{self.site_name}_brand_st_info_{self.date_type}"
                # if self.site_name == 'us':
                #     self.table_save = f"{self.site_name}_brand_st_info_{self.date_type}_copy1"
                self.table_update = f"{self.site_name}_brand_analytics_{self.year}"
            else:
                if self.date_type in ["month"]:
                    self.year_month = date_info
                    self.year = int(self.year_month.split("-")[0])
                    self.month = int(self.year_month.split("-")[-1])
                else:
                    self.year_quarter = date_info
                    self.year = int(self.year_quarter.split("-")[0])
                    self.quarter = int(self.year_quarter.split("-")[-1])
                self.table_save = f"{self.site_name}_brand_st_info_{self.date_type}"
                self.table_update = f"{self.site_name}_brand_analytics_{self.date_type}"
            self.engine = self.mysql_connect()
            self.export_data(site_name=site_name)
            self.check_data(site_name=site_name)
            self.update_data()


if __name__ == '__main__':
    site_name = sys.argv[1]  # 参数1:site_name列表-->all:所有站点
    date_type = sys.argv[2]  # 参数2:类型:week/4_week/month/quarter
    date_info = sys.argv[3]  # 参数2:年-周, 比如: 2022-1
    # handle_obj = ExportDwtStInfo(site_name_flag=site_name_flag, year_week=year_week)
    handle_obj = ExportDwtStInfo(site_name=site_name, date_type=date_type, date_info=date_info)
    handle_obj.run()