1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import sys
import traceback
import numpy as np
import pandas as pd
from sqlalchemy import create_engine
import re
class CleanWeight(object):
def __init__(self, site_name='us', year=2023, week=18):
self.site_name = site_name
self.year = year
self.week = week
self.week = f'0{self.week}' if int(self.week) < 10 else f'{self.week}'
# 数据库连接参数
self.db_params = {
"pg_us": {
"host": "192.168.10.216", # 数据库主机地址
"port": 5432, # 数据库端口号
"dbname": "selection" if self.site_name == 'us' else f"selection_{self.site_name}", # 数据库名称
"user": "postgres", # 数据库用户名
"password": "fazAqRRVV9vDmwDNRNb593ht5TxYVrfTyHJSJ3BS" # 数据库密码
},
"mysql_others": {
"host": "rm-wz9yg9bsb2zf01ea4yo.mysql.rds.aliyuncs.com", # 数据库主机地址
"port": 3306, # 数据库端口号
"dbname": "selection" if self.site_name == 'us' else f"selection_{self.site_name}", # 数据库名称
"user": "adv_yswg", # 数据库用户名
"password": "HCL1zcUgQesaaXNLbL37O5KhpSAy0c" # 数据库密码
}
}
self.engine_read, self.engine_save = self.create_connection()
@staticmethod
def get_weight(weight_str, site_name):
weight_type = 'pounds' if site_name == 'us' else 'grams'
if 'pounds' in weight_str:
match = re.search(r"(\d+\.{0,}\d{0,})\D{0,}pounds", weight_str)
val = round(float(match.group(1)), 3) if site_name == 'us' and match else round(float(match.group(1)) * 1000 * 0.454, 3) if match else np.nan
elif 'ounces' in weight_str:
match = re.search(r"(\d+\.{0,}\d{0,})\D{0,}ounces", weight_str)
val = round(float(match.group(1)) / 16, 3) if site_name == 'us' and match else round(float(match.group(1)) / 16 * 1000 * 0.454, 3) if match else np.nan
# elif 'kilograms' in weight_str or ' kilogramos' in weight_str:
elif any(substring in weight_str for substring in ['kilogram', ' kg']):
weight_str = weight_str.replace(' kg', ' kilogram')
match = re.search(r"(\d+\.{0,}\d{0,})\D{0,}kilogram", weight_str)
val = round(float(match.group(1)) / 0.454, 3) if site_name == 'us' and match else round(float(match.group(1)) * 1000, 3) if match else np.nan
# elif 'milligrams' in weight_str:
elif any(substring in weight_str for substring in ['milligrams']):
match = re.search(r"(\d+\.{0,}\d{0,})\D{0,}milligrams", weight_str)
val = round(float(match.group(1)) / 1000 / 1000 / 0.454, 3) if site_name == 'us' and match else round(float(match.group(1)) / 1000, 3) if match else np.nan
elif ' gram' in weight_str:
match = re.search(r"(\d+\.{0,}\d{0,})\D{0,} gram", weight_str)
val = round(float(match.group(1)) / 1000 / 0.454, 3) if site_name == 'us' and match else round(float(match.group(1)), 3) if match else np.nan
elif ' g' in weight_str:
match = re.search(r"(\d+\.{0,}\d{0,})\D{0,} g", weight_str)
# val = round(float(match.group(1)) / 1000 / 0.454, 3) if site_name == 'us' else round(float(match.group(1)), 3) if match else np.nan
val = round(float(match.group(1)) / 1000 / 0.454, 3) if site_name == 'us' and match else round(
float(match.group(1)), 3) if match else np.nan
else:
val = 'none'
weight_type = 'none'
# val = val * 1000 * 0.454 if site_name != 'us' and val != 'none' else val
return f"{val},{weight_type}"
def create_connection(self):
# 建立数据库连接
if self.site_name == 'us' and int(self.week) >= 18 and int(self.year) >= 2023:
db_params = self.db_params['pg_us']
connection_string = f"postgresql+psycopg2://{db_params['user']}:{db_params['password']}@{db_params['host']}:{db_params['port']}/{db_params['dbname']}"
else:
db_params = self.db_params['mysql_others']
connection_string = f"mysql+pymysql://{db_params['user']}:{db_params['password']}@{db_params['host']}:{db_params['port']}/{db_params['dbname']}"
db_params = self.db_params['pg_us']
connection_string_save = f"postgresql+psycopg2://{db_params['user']}:{db_params['password']}@{db_params['host']}:{db_params['port']}/{db_params['dbname']}"
engine_save = create_engine(connection_string_save)
return create_engine(connection_string), engine_save
def read_data(self):
print("开始读取数据")
sql = f"select asin, weight, weight_str from {self.site_name}_asin_detail_{self.year}_{self.week};" # where weight_str is not null
print("sql:", sql)
return pd.read_sql(sql, con=self.engine_read)
def handle_data(self):
df = self.read_data()
if df.shape[0] == 0:
print("site_name, year, week:", self.site_name, self.year, self.week, "数据为空,退出")
print("df.shape:", df.shape)
print("开始处理数据")
df.weight_str = df.weight_str.apply(lambda x: str(x).lower())
# df['weight_info'] = df['weight_str'].apply(self.get_weight)
df['weight_info'] = df.apply(lambda row: self.get_weight(row['weight_str'], self.site_name), axis=1) # 传递多个参数
df[['weight', 'weight_type']] = df['weight_info'].str.split(',', expand=True)
df.weight = df.weight.apply(lambda x: np.nan if str(x) == 'none' else x)
df.weight = df.weight.astype("float64")
df.weight = df.weight.apply(lambda x: 0.001 if x <= 0.001 else x)
df.weight_str = df.weight_str.apply(lambda x: np.nan if str(x) == 'none' else x)
df = df.drop(columns=['weight_info'])
df['date_info'] = f'{self.year}-{self.week}'
return df
def save_data(self):
df = self.handle_data()
print("开始存储数据")
print(df.weight_type.value_counts(dropna=False))
df.to_sql(f"{self.site_name}_asin_weight_{self.year}_{self.week}", con=self.engine_save, if_exists='append', index=False, chunksize=df.shape[0] // 10)
if __name__ == '__main__':
site_name = sys.argv[1] # 参数1:站点
year = int(sys.argv[2]) # 参数2:类型:day/week/4_week/month/quarter
week = int(sys.argv[3]) # 参数3:年-月-日/年-周/年-月/年-季, 比如: 2022-1
handle_obj = CleanWeight(site_name=site_name, year=year, week=week)
handle_obj.save_data()
quit()
site_name = 'de'
site_name_list = ['us', 'de', 'uk', 'es', 'fr', 'it']
week_list = [16, 17, 18, 19]
year = 2023
week = 19
while True:
try:
for week in week_list:
for site_name in site_name_list:
try:
handle_obj = CleanWeight(site_name=site_name, year=year, week=week)
handle_obj.save_data()
except Exception as e:
print("error_info:", traceback.format_exc(), e)
if site_name_list[-1] == site_name and week_list[-1] == week:
print("不满足运行条件,结束")
quit()
continue
break
except Exception as e:
print("error_info:", traceback.format_exc(), e)
continue