clean_weight.py 7.36 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142
import sys
import traceback

import numpy as np
import pandas as pd
from sqlalchemy import create_engine
import re


class CleanWeight(object):

    def __init__(self, site_name='us', year=2023, week=18):
        self.site_name = site_name
        self.year = year
        self.week = week
        self.week = f'0{self.week}' if int(self.week) < 10 else f'{self.week}'
        # 数据库连接参数
        self.db_params = {
            "pg_us": {
                "host": "192.168.10.216",  # 数据库主机地址
                "port": 5432,  # 数据库端口号
                "dbname": "selection" if self.site_name == 'us' else f"selection_{self.site_name}",  # 数据库名称
                "user": "postgres",  # 数据库用户名
                "password": "fazAqRRVV9vDmwDNRNb593ht5TxYVrfTyHJSJ3BS"  # 数据库密码
            },
            "mysql_others": {
                "host": "rm-wz9yg9bsb2zf01ea4yo.mysql.rds.aliyuncs.com",  # 数据库主机地址
                "port": 3306,  # 数据库端口号
                "dbname": "selection" if self.site_name == 'us' else f"selection_{self.site_name}",  # 数据库名称
                "user": "adv_yswg",  # 数据库用户名
                "password": "HCL1zcUgQesaaXNLbL37O5KhpSAy0c"  # 数据库密码
            }
        }
        self.engine_read, self.engine_save = self.create_connection()

    @staticmethod
    def get_weight(weight_str, site_name):
        weight_type = 'pounds' if site_name == 'us' else 'grams'
        if 'pounds' in weight_str:
            match = re.search(r"(\d+\.{0,}\d{0,})\D{0,}pounds", weight_str)
            val = round(float(match.group(1)), 3) if site_name == 'us' and match else round(float(match.group(1)) * 1000 * 0.454, 3) if match else np.nan
        elif 'ounces' in weight_str:
            match = re.search(r"(\d+\.{0,}\d{0,})\D{0,}ounces", weight_str)
            val = round(float(match.group(1)) / 16, 3) if site_name == 'us' and match else round(float(match.group(1)) / 16 * 1000 * 0.454, 3) if match else np.nan
        # elif 'kilograms' in weight_str or ' kilogramos' in weight_str:
        elif any(substring in weight_str for substring in ['kilogram', ' kg']):
            weight_str = weight_str.replace(' kg', ' kilogram')
            match = re.search(r"(\d+\.{0,}\d{0,})\D{0,}kilogram", weight_str)
            val = round(float(match.group(1)) / 0.454, 3) if site_name == 'us' and match else round(float(match.group(1)) * 1000, 3) if match else np.nan
        # elif 'milligrams' in weight_str:
        elif any(substring in weight_str for substring in ['milligrams']):
            match = re.search(r"(\d+\.{0,}\d{0,})\D{0,}milligrams", weight_str)
            val = round(float(match.group(1)) / 1000 / 1000 / 0.454, 3) if site_name == 'us' and match else round(float(match.group(1)) / 1000, 3) if match else np.nan
        elif ' gram' in weight_str:
            match = re.search(r"(\d+\.{0,}\d{0,})\D{0,} gram", weight_str)
            val = round(float(match.group(1)) / 1000 / 0.454, 3) if site_name == 'us' and match else round(float(match.group(1)), 3) if match else np.nan
        elif ' g' in weight_str:
            match = re.search(r"(\d+\.{0,}\d{0,})\D{0,} g", weight_str)
            # val = round(float(match.group(1)) / 1000 / 0.454, 3) if site_name == 'us' else round(float(match.group(1)), 3) if match else np.nan
            val = round(float(match.group(1)) / 1000 / 0.454, 3) if site_name == 'us' and match else round(
                float(match.group(1)), 3) if match else np.nan
        else:
            val = 'none'
            weight_type = 'none'
        # val = val * 1000 * 0.454 if site_name != 'us' and val != 'none' else val
        return f"{val},{weight_type}"

    def create_connection(self):
        # 建立数据库连接
        if self.site_name == 'us' and int(self.week) >= 18 and int(self.year) >= 2023:
            db_params = self.db_params['pg_us']
            connection_string = f"postgresql+psycopg2://{db_params['user']}:{db_params['password']}@{db_params['host']}:{db_params['port']}/{db_params['dbname']}"
        else:
            db_params = self.db_params['mysql_others']
            connection_string = f"mysql+pymysql://{db_params['user']}:{db_params['password']}@{db_params['host']}:{db_params['port']}/{db_params['dbname']}"
        db_params = self.db_params['pg_us']
        connection_string_save = f"postgresql+psycopg2://{db_params['user']}:{db_params['password']}@{db_params['host']}:{db_params['port']}/{db_params['dbname']}"
        engine_save = create_engine(connection_string_save)
        return create_engine(connection_string), engine_save

    def read_data(self):
        print("开始读取数据")
        sql = f"select asin, weight, weight_str from {self.site_name}_asin_detail_{self.year}_{self.week};"  # where weight_str is not null
        print("sql:", sql)
        return pd.read_sql(sql, con=self.engine_read)

    def handle_data(self):
        df = self.read_data()
        if df.shape[0] == 0:
            print("site_name, year, week:", self.site_name, self.year, self.week, "数据为空,退出")
        print("df.shape:", df.shape)
        print("开始处理数据")
        df.weight_str = df.weight_str.apply(lambda x: str(x).lower())
        # df['weight_info'] = df['weight_str'].apply(self.get_weight)
        df['weight_info'] = df.apply(lambda row: self.get_weight(row['weight_str'], self.site_name), axis=1)  # 传递多个参数
        df[['weight', 'weight_type']] = df['weight_info'].str.split(',', expand=True)
        df.weight = df.weight.apply(lambda x: np.nan if str(x) == 'none' else x)
        df.weight = df.weight.astype("float64")
        df.weight = df.weight.apply(lambda x: 0.001 if x <= 0.001 else x)
        df.weight_str = df.weight_str.apply(lambda x: np.nan if str(x) == 'none' else x)
        df = df.drop(columns=['weight_info'])
        df['date_info'] = f'{self.year}-{self.week}'
        return df

    def save_data(self):
        df = self.handle_data()
        print("开始存储数据")
        print(df.weight_type.value_counts(dropna=False))
        df.to_sql(f"{self.site_name}_asin_weight_{self.year}_{self.week}", con=self.engine_save, if_exists='append', index=False, chunksize=df.shape[0] // 10)


if __name__ == '__main__':
    site_name = sys.argv[1]  # 参数1:站点
    year = int(sys.argv[2])  # 参数2:类型:day/week/4_week/month/quarter
    week = int(sys.argv[3])  # 参数3:年-月-日/年-周/年-月/年-季, 比如: 2022-1
    handle_obj = CleanWeight(site_name=site_name, year=year, week=week)
    handle_obj.save_data()
    quit()

    site_name = 'de'
    site_name_list = ['us', 'de', 'uk', 'es', 'fr', 'it']
    week_list = [16, 17, 18, 19]
    year = 2023
    week = 19
    while True:
        try:
            for week in week_list:
                for site_name in site_name_list:
                    try:
                        handle_obj = CleanWeight(site_name=site_name, year=year, week=week)
                        handle_obj.save_data()
                    except Exception as e:
                        print("error_info:", traceback.format_exc(), e)
                        if site_name_list[-1] == site_name and week_list[-1] == week:
                            print("不满足运行条件,结束")
                            quit()
                        continue
            break
        except Exception as e:
            print("error_info:", traceback.format_exc(), e)
            continue