1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import os
import sys
import traceback
sys.path.append(os.path.dirname(sys.path[0]))
import numpy as np
import pandas as pd
from sqlalchemy import create_engine
from yswg_utils.common_udf import parse_weight_str
class CleanWeight(object):
def __init__(self, site_name='us', year=2023, week=18):
self.site_name = site_name
self.year = year
self.week = week
self.week = f'0{self.week}' if int(self.week) < 10 else f'{self.week}'
# 数据库连接参数
self.db_params = {
"pg_us": {
"host": "192.168.10.216", # 数据库主机地址
"port": 5432, # 数据库端口号
"dbname": "selection" if self.site_name == 'us' else f"selection_{self.site_name}", # 数据库名称
"user": "postgres", # 数据库用户名
"password": "fazAqRRVV9vDmwDNRNb593ht5TxYVrfTyHJSJ3BS" # 数据库密码
},
"mysql_others": {
"host": "rm-wz9yg9bsb2zf01ea4yo.mysql.rds.aliyuncs.com", # 数据库主机地址
"port": 3306, # 数据库端口号
"dbname": "selection" if self.site_name == 'us' else f"selection_{self.site_name}", # 数据库名称
"user": "adv_yswg", # 数据库用户名
"password": "HCL1zcUgQesaaXNLbL37O5KhpSAy0c" # 数据库密码
}
}
self.engine_read, self.engine_save = self.create_connection()
@staticmethod
def get_weight(weight_str, site_name):
# 提取到公共方法中 直接复制的
return parse_weight_str(weight_str, site_name)
def create_connection(self):
# 建立数据库连接
if self.site_name == 'us' and ((int(self.week) >= 18 and int(self.year) >= 2023) or (int(self.year) >= 2024)):
db_params = self.db_params['pg_us']
connection_string = f"postgresql+psycopg2://{db_params['user']}:{db_params['password']}@{db_params['host']}:{db_params['port']}/{db_params['dbname']}"
else:
db_params = self.db_params['mysql_others']
connection_string = f"mysql+pymysql://{db_params['user']}:{db_params['password']}@{db_params['host']}:{db_params['port']}/{db_params['dbname']}"
db_params = self.db_params['pg_us']
connection_string_save = f"postgresql+psycopg2://{db_params['user']}:{db_params['password']}@{db_params['host']}:{db_params['port']}/{db_params['dbname']}"
engine_save = create_engine(connection_string_save)
return create_engine(connection_string), engine_save
def read_data(self):
print("开始读取数据")
week_params = f"{int(self.week)}" if self.site_name == 'us' else f"{self.week}"
sql = f"select asin, weight, weight_str from {self.site_name}_asin_detail_{self.year}_{week_params};" # where weight_str is not null
print("sql:", sql)
return pd.read_sql(sql, con=self.engine_read)
def handle_data(self):
df = self.read_data()
if df.shape[0] == 0:
print("site_name, year, week:", self.site_name, self.year, self.week, "数据为空,退出")
print("df.shape:", df.shape)
print("开始处理数据")
df.weight_str = df.weight_str.apply(lambda x: str(x).lower())
# df['weight_info'] = df['weight_str'].apply(self.get_weight)
df['weight_info'] = df.apply(lambda row: self.get_weight(row['weight_str'], self.site_name), axis=1) # 传递多个参数
# df[['weight', 'weight_type']] = df['weight_info'].str.split(',', expand=True)
# tuple 展开
df[['weight', 'weight_type']] = df['weight_info'].apply(pd.Series)
df.weight = df.weight.apply(lambda x: np.nan if str(x) == 'none' else x)
df.weight = df.weight.astype("float64")
df.weight = df.weight.apply(lambda x: 0.001 if x <= 0.001 else x)
df.weight_str = df.weight_str.apply(lambda x: np.nan if str(x) == 'none' else x)
df = df.drop(columns=['weight_info'])
df['date_info'] = f'{self.year}-{self.week}'
return df
def save_data(self):
df = self.handle_data()
print("开始存储数据: 先清空对应week的分区表")
print(df.weight_type.value_counts(dropna=False))
with self.engine_save.begin() as conn:
sql = f"truncate {self.site_name}_asin_weight_{self.year}_{self.week};"
print("清空sql:", sql)
conn.execute(sql)
df.to_sql(f"{self.site_name}_asin_weight_{self.year}_{self.week}", con=self.engine_save, if_exists='append', index=False,
chunksize=df.shape[0] // 10)
if __name__ == '__main__':
site_name = sys.argv[1] # 参数1:站点
year = int(sys.argv[2]) # 参数2:类型:day/week/4_week/month/quarter
week = int(sys.argv[3]) # 参数3:年-月-日/年-周/年-月/年-季, 比如: 2022-1
handle_obj = CleanWeight(site_name=site_name, year=year, week=week)
handle_obj.save_data()
quit()
site_name = 'de'
site_name_list = ['us', 'de', 'uk', 'es', 'fr', 'it']
week_list = [16, 17, 18, 19]
year = 2023
week = 19
while True:
try:
for week in week_list:
for site_name in site_name_list:
try:
handle_obj = CleanWeight(site_name=site_name, year=year, week=week)
handle_obj.save_data()
except Exception as e:
print("error_info:", traceback.format_exc(), e)
if site_name_list[-1] == site_name and week_list[-1] == week:
print("不满足运行条件,结束")
quit()
continue
break
except Exception as e:
print("error_info:", traceback.format_exc(), e)
continue