clean_weight.py 5.77 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
import os
import sys
import traceback

sys.path.append(os.path.dirname(sys.path[0]))
import numpy as np
import pandas as pd
from sqlalchemy import create_engine
from yswg_utils.common_udf import parse_weight_str


class CleanWeight(object):

    def __init__(self, site_name='us', year=2023, week=18):
        self.site_name = site_name
        self.year = year
        self.week = week
        self.week = f'0{self.week}' if int(self.week) < 10 else f'{self.week}'
        # 数据库连接参数
        self.db_params = {
            "pg_us": {
                "host": "192.168.10.216",  # 数据库主机地址
                "port": 5432,  # 数据库端口号
                "dbname": "selection" if self.site_name == 'us' else f"selection_{self.site_name}",  # 数据库名称
                "user": "postgres",  # 数据库用户名
                "password": "fazAqRRVV9vDmwDNRNb593ht5TxYVrfTyHJSJ3BS"  # 数据库密码
            },
            "mysql_others": {
                "host": "rm-wz9yg9bsb2zf01ea4yo.mysql.rds.aliyuncs.com",  # 数据库主机地址
                "port": 3306,  # 数据库端口号
                "dbname": "selection" if self.site_name == 'us' else f"selection_{self.site_name}",  # 数据库名称
                "user": "adv_yswg",  # 数据库用户名
                "password": "HCL1zcUgQesaaXNLbL37O5KhpSAy0c"  # 数据库密码
            }
        }
        self.engine_read, self.engine_save = self.create_connection()

    @staticmethod
    def get_weight(weight_str, site_name):
        # 提取到公共方法中 直接复制的
        return parse_weight_str(weight_str, site_name)

    def create_connection(self):
        # 建立数据库连接
        if self.site_name == 'us' and ((int(self.week) >= 18 and int(self.year) >= 2023) or (int(self.year) >= 2024)):
            db_params = self.db_params['pg_us']
            connection_string = f"postgresql+psycopg2://{db_params['user']}:{db_params['password']}@{db_params['host']}:{db_params['port']}/{db_params['dbname']}"
        else:
            db_params = self.db_params['mysql_others']
            connection_string = f"mysql+pymysql://{db_params['user']}:{db_params['password']}@{db_params['host']}:{db_params['port']}/{db_params['dbname']}"
        db_params = self.db_params['pg_us']
        connection_string_save = f"postgresql+psycopg2://{db_params['user']}:{db_params['password']}@{db_params['host']}:{db_params['port']}/{db_params['dbname']}"
        engine_save = create_engine(connection_string_save)
        return create_engine(connection_string), engine_save

    def read_data(self):
        print("开始读取数据")
        week_params = f"{int(self.week)}" if self.site_name == 'us' else f"{self.week}"
        sql = f"select asin, weight, weight_str from {self.site_name}_asin_detail_{self.year}_{week_params};"  # where weight_str is not null
        print("sql:", sql)
        return pd.read_sql(sql, con=self.engine_read)

    def handle_data(self):
        df = self.read_data()
        if df.shape[0] == 0:
            print("site_name, year, week:", self.site_name, self.year, self.week, "数据为空,退出")
        print("df.shape:", df.shape)
        print("开始处理数据")
        df.weight_str = df.weight_str.apply(lambda x: str(x).lower())
        # df['weight_info'] = df['weight_str'].apply(self.get_weight)
        df['weight_info'] = df.apply(lambda row: self.get_weight(row['weight_str'], self.site_name), axis=1)  # 传递多个参数
        # df[['weight', 'weight_type']] = df['weight_info'].str.split(',', expand=True)
        # tuple 展开
        df[['weight', 'weight_type']] = df['weight_info'].apply(pd.Series)

        df.weight = df.weight.apply(lambda x: np.nan if str(x) == 'none' else x)
        df.weight = df.weight.astype("float64")
        df.weight = df.weight.apply(lambda x: 0.001 if x <= 0.001 else x)
        df.weight_str = df.weight_str.apply(lambda x: np.nan if str(x) == 'none' else x)
        df = df.drop(columns=['weight_info'])
        df['date_info'] = f'{self.year}-{self.week}'
        return df

    def save_data(self):
        df = self.handle_data()
        print("开始存储数据: 先清空对应week的分区表")
        print(df.weight_type.value_counts(dropna=False))
        with self.engine_save.begin() as conn:
            sql = f"truncate {self.site_name}_asin_weight_{self.year}_{self.week};"
            print("清空sql:", sql)
            conn.execute(sql)
        df.to_sql(f"{self.site_name}_asin_weight_{self.year}_{self.week}", con=self.engine_save, if_exists='append', index=False,
                  chunksize=df.shape[0] // 10)


if __name__ == '__main__':
    site_name = sys.argv[1]  # 参数1:站点
    year = int(sys.argv[2])  # 参数2:类型:day/week/4_week/month/quarter
    week = int(sys.argv[3])  # 参数3:年-月-日/年-周/年-月/年-季, 比如: 2022-1
    handle_obj = CleanWeight(site_name=site_name, year=year, week=week)
    handle_obj.save_data()
    quit()

    site_name = 'de'
    site_name_list = ['us', 'de', 'uk', 'es', 'fr', 'it']
    week_list = [16, 17, 18, 19]
    year = 2023
    week = 19
    while True:
        try:
            for week in week_list:
                for site_name in site_name_list:
                    try:
                        handle_obj = CleanWeight(site_name=site_name, year=year, week=week)
                        handle_obj.save_data()
                    except Exception as e:
                        print("error_info:", traceback.format_exc(), e)
                        if site_name_list[-1] == site_name and week_list[-1] == week:
                            print("不满足运行条件,结束")
                            quit()
                        continue
            break
        except Exception as e:
            print("error_info:", traceback.format_exc(), e)
            continue