import sys import traceback import numpy as np import pandas as pd from sqlalchemy import create_engine import re class CleanWeight(object): def __init__(self, site_name='us', year=2023, week=18): self.site_name = site_name self.year = year self.week = week self.week = f'0{self.week}' if int(self.week) < 10 else f'{self.week}' # 数据库连接参数 self.db_params = { "pg_us": { "host": "192.168.10.216", # 数据库主机地址 "port": 5432, # 数据库端口号 "dbname": "selection" if self.site_name == 'us' else f"selection_{self.site_name}", # 数据库名称 "user": "postgres", # 数据库用户名 "password": "fazAqRRVV9vDmwDNRNb593ht5TxYVrfTyHJSJ3BS" # 数据库密码 }, "mysql_others": { "host": "rm-wz9yg9bsb2zf01ea4yo.mysql.rds.aliyuncs.com", # 数据库主机地址 "port": 3306, # 数据库端口号 "dbname": "selection" if self.site_name == 'us' else f"selection_{self.site_name}", # 数据库名称 "user": "adv_yswg", # 数据库用户名 "password": "HCL1zcUgQesaaXNLbL37O5KhpSAy0c" # 数据库密码 } } self.engine_read, self.engine_save = self.create_connection() @staticmethod def get_weight(weight_str, site_name): weight_type = 'pounds' if site_name == 'us' else 'grams' if 'pounds' in weight_str: match = re.search(r"(\d+\.{0,}\d{0,})\D{0,}pounds", weight_str) val = round(float(match.group(1)), 3) if site_name == 'us' and match else round(float(match.group(1)) * 1000 * 0.454, 3) if match else np.nan elif 'ounces' in weight_str: match = re.search(r"(\d+\.{0,}\d{0,})\D{0,}ounces", weight_str) val = round(float(match.group(1)) / 16, 3) if site_name == 'us' and match else round(float(match.group(1)) / 16 * 1000 * 0.454, 3) if match else np.nan # elif 'kilograms' in weight_str or ' kilogramos' in weight_str: elif any(substring in weight_str for substring in ['kilogram', ' kg']): weight_str = weight_str.replace(' kg', ' kilogram') match = re.search(r"(\d+\.{0,}\d{0,})\D{0,}kilogram", weight_str) val = round(float(match.group(1)) / 0.454, 3) if site_name == 'us' and match else round(float(match.group(1)) * 1000, 3) if match else np.nan # elif 'milligrams' in weight_str: elif any(substring in weight_str for substring in ['milligrams']): match = re.search(r"(\d+\.{0,}\d{0,})\D{0,}milligrams", weight_str) val = round(float(match.group(1)) / 1000 / 1000 / 0.454, 3) if site_name == 'us' and match else round(float(match.group(1)) / 1000, 3) if match else np.nan elif ' gram' in weight_str: match = re.search(r"(\d+\.{0,}\d{0,})\D{0,} gram", weight_str) val = round(float(match.group(1)) / 1000 / 0.454, 3) if site_name == 'us' and match else round(float(match.group(1)), 3) if match else np.nan elif ' g' in weight_str: match = re.search(r"(\d+\.{0,}\d{0,})\D{0,} g", weight_str) # val = round(float(match.group(1)) / 1000 / 0.454, 3) if site_name == 'us' else round(float(match.group(1)), 3) if match else np.nan val = round(float(match.group(1)) / 1000 / 0.454, 3) if site_name == 'us' and match else round( float(match.group(1)), 3) if match else np.nan else: val = 'none' weight_type = 'none' # val = val * 1000 * 0.454 if site_name != 'us' and val != 'none' else val return f"{val},{weight_type}" def create_connection(self): # 建立数据库连接 if self.site_name == 'us' and int(self.week) >= 18 and int(self.year) >= 2023: db_params = self.db_params['pg_us'] connection_string = f"postgresql+psycopg2://{db_params['user']}:{db_params['password']}@{db_params['host']}:{db_params['port']}/{db_params['dbname']}" else: db_params = self.db_params['mysql_others'] connection_string = f"mysql+pymysql://{db_params['user']}:{db_params['password']}@{db_params['host']}:{db_params['port']}/{db_params['dbname']}" db_params = self.db_params['pg_us'] connection_string_save = f"postgresql+psycopg2://{db_params['user']}:{db_params['password']}@{db_params['host']}:{db_params['port']}/{db_params['dbname']}" engine_save = create_engine(connection_string_save) return create_engine(connection_string), engine_save def read_data(self): print("开始读取数据") sql = f"select asin, weight, weight_str from {self.site_name}_asin_detail_{self.year}_{self.week};" # where weight_str is not null print("sql:", sql) return pd.read_sql(sql, con=self.engine_read) def handle_data(self): df = self.read_data() if df.shape[0] == 0: print("site_name, year, week:", self.site_name, self.year, self.week, "数据为空,退出") print("df.shape:", df.shape) print("开始处理数据") df.weight_str = df.weight_str.apply(lambda x: str(x).lower()) # df['weight_info'] = df['weight_str'].apply(self.get_weight) df['weight_info'] = df.apply(lambda row: self.get_weight(row['weight_str'], self.site_name), axis=1) # 传递多个参数 df[['weight', 'weight_type']] = df['weight_info'].str.split(',', expand=True) df.weight = df.weight.apply(lambda x: np.nan if str(x) == 'none' else x) df.weight = df.weight.astype("float64") df.weight = df.weight.apply(lambda x: 0.001 if x <= 0.001 else x) df.weight_str = df.weight_str.apply(lambda x: np.nan if str(x) == 'none' else x) df = df.drop(columns=['weight_info']) df['date_info'] = f'{self.year}-{self.week}' return df def save_data(self): df = self.handle_data() print("开始存储数据") print(df.weight_type.value_counts(dropna=False)) df.to_sql(f"{self.site_name}_asin_weight_{self.year}_{self.week}", con=self.engine_save, if_exists='append', index=False, chunksize=df.shape[0] // 10) if __name__ == '__main__': site_name = sys.argv[1] # 参数1:站点 year = int(sys.argv[2]) # 参数2:类型:day/week/4_week/month/quarter week = int(sys.argv[3]) # 参数3:年-月-日/年-周/年-月/年-季, 比如: 2022-1 handle_obj = CleanWeight(site_name=site_name, year=year, week=week) handle_obj.save_data() quit() site_name = 'de' site_name_list = ['us', 'de', 'uk', 'es', 'fr', 'it'] week_list = [16, 17, 18, 19] year = 2023 week = 19 while True: try: for week in week_list: for site_name in site_name_list: try: handle_obj = CleanWeight(site_name=site_name, year=year, week=week) handle_obj.save_data() except Exception as e: print("error_info:", traceback.format_exc(), e) if site_name_list[-1] == site_name and week_list[-1] == week: print("不满足运行条件,结束") quit() continue break except Exception as e: print("error_info:", traceback.format_exc(), e) continue