Commit 814f1b7b by Peng

no message

parent a5677f43
...@@ -81,9 +81,7 @@ class Save_asin_detail(BaseUtils): ...@@ -81,9 +81,7 @@ class Save_asin_detail(BaseUtils):
with self.engine_pg.begin() as conn: with self.engine_pg.begin() as conn:
sql_read = f"SELECT asin, id, date_info, asin_is_variation,data_type,volume,weight_str FROM {self.db_syn}_{self.month} WHERE STATE = 1 ORDER BY id FOR UPDATE SKIP LOCKED LIMIT {self.read_size}" sql_read = f"SELECT asin, id, date_info, asin_is_variation,data_type,volume,weight_str FROM {self.db_syn}_{self.month} WHERE STATE = 1 ORDER BY id FOR UPDATE SKIP LOCKED LIMIT {self.read_size}"
print(sql_read) print(sql_read)
a = conn.execute(sql_read) self.df_read = self.engine_pg.read_sql(sql_read)
self.df_read = pd.DataFrame(a, columns=['asin', 'id', 'date_info', 'asin_is_variation', 'data_type',
'volume', 'weight_str'])
self.df_read.drop_duplicates(['asin'], inplace=True) self.df_read.drop_duplicates(['asin'], inplace=True)
if self.df_read.shape[0] > 0: if self.df_read.shape[0] > 0:
self.index_tuple = tuple(self.df_read['id']) self.index_tuple = tuple(self.df_read['id'])
...@@ -123,9 +121,7 @@ class Save_asin_detail(BaseUtils): ...@@ -123,9 +121,7 @@ class Save_asin_detail(BaseUtils):
# sql_read = f"-- SELECT asin, id, date_info, asin_is_variation,data_type,volume,weight_str FROM {self.db_syn}_{self.month} WHERE STATE = 1 and id BETWEEN {minid} AND {maxid} limit {self.read_size} for update" # sql_read = f"-- SELECT asin, id, date_info, asin_is_variation,data_type,volume,weight_str FROM {self.db_syn}_{self.month} WHERE STATE = 1 and id BETWEEN {minid} AND {maxid} limit {self.read_size} for update"
sql_read = f"SELECT asin, id, date_info, asin_is_variation,data_type,volume,weight_str FROM {self.db_syn}_{self.month} WHERE state = 1 AND id BETWEEN {minid} AND {maxid} ORDER BY id FOR UPDATE SKIP LOCKED LIMIT {self.read_size};" sql_read = f"SELECT asin, id, date_info, asin_is_variation,data_type,volume,weight_str FROM {self.db_syn}_{self.month} WHERE state = 1 AND id BETWEEN {minid} AND {maxid} ORDER BY id FOR UPDATE SKIP LOCKED LIMIT {self.read_size};"
print(sql_read) print(sql_read)
a = conn.execute(sql_read) self.df_read = self.engine_pg.read_sql(sql_read)
self.df_read = pd.DataFrame(a, columns=['asin', 'id', 'date_info', 'asin_is_variation',
'data_type', 'volume', 'weight_str'])
self.df_read.drop_duplicates(['asin'], inplace=True) self.df_read.drop_duplicates(['asin'], inplace=True)
if self.df_read.shape[0] > 0: if self.df_read.shape[0] > 0:
# 使用默认值填充空值 # 使用默认值填充空值
...@@ -214,16 +210,16 @@ class Save_asin_detail(BaseUtils): ...@@ -214,16 +210,16 @@ class Save_asin_detail(BaseUtils):
df.weight_str = df.weight_str.apply(lambda x: str(x)[:250] if x is not None else None) # 截取字符 df.weight_str = df.weight_str.apply(lambda x: str(x)[:250] if x is not None else None) # 截取字符
print(f'存储pg:{self.site_name}_asin_detail_month_{report_info}') print(f'存储pg:{self.site_name}_asin_detail_month_{report_info}')
# df.to_excel(r'2025-7-08-1_srs_search_term_asin.xlsx', index=False) # df.to_csv(r'2025-7-30_srs_search_term_asin.csv', index=False)
df.to_sql(f"{self.site_name}_asin_detail_month_{report_info}", con=self.engine_pg, self.engine_pg.to_sql(df,f"{self.site_name}_asin_detail_month_{report_info}",
if_exists='append', if_exists='append')
index=False)
break break
except Exception as e: except Exception as e:
traceback.print_exc() # ★ 打印完整栈到终端
self.engine = self.mysql_connect() self.engine = self.mysql_connect()
self.engine_pg = self.pg_connect() self.engine_pg = self.pg_connect()
time.sleep(random.uniform(10, 20.5)) time.sleep(random.uniform(10, 20.5))
print(f"存储'{self.site_name} 存储詳情數據 数据'失败,等待5s继续", e, f"\n{traceback.format_exc()}") print(f"打印完整栈到终端 存储'{self.site_name} 存储詳情數據 数据'失败,等待5s继续", e, f"\n{traceback.format_exc()}")
time.sleep(5) time.sleep(5)
continue continue
...@@ -273,10 +269,8 @@ class Save_asin_detail(BaseUtils): ...@@ -273,10 +269,8 @@ class Save_asin_detail(BaseUtils):
if df_asin_bsr_pg.shape[0] > 0: if df_asin_bsr_pg.shape[0] > 0:
date_info_ = list(df_asin_bsr_pg.date_info)[0].replace('-', '_') date_info_ = list(df_asin_bsr_pg.date_info)[0].replace('-', '_')
print(f'{self.site_name}_bs_category_asin_detail_month_{date_info_}') print(f'{self.site_name}_bs_category_asin_detail_month_{date_info_}')
df_asin_bsr_pg.to_sql(f'{self.site_name}_bs_category_asin_detail_month_{date_info_}', self.engine_pg.to_sql(df_asin_bsr_pg,f'{self.site_name}_bs_category_asin_detail_month_{date_info_}',
con=self.engine_pg, if_exists='append')
if_exists='append',
index=False)
bs_category_asin_list_pg = [] bs_category_asin_list_pg = []
break break
except Exception as e: except Exception as e:
...@@ -337,9 +331,8 @@ class Save_asin_detail(BaseUtils): ...@@ -337,9 +331,8 @@ class Save_asin_detail(BaseUtils):
else: else:
sql_delete = f"delete from {self.db_seller_asin_account} where asin in {tuple(set(df_seller_asin_account.asin))};" sql_delete = f"delete from {self.db_seller_asin_account} where asin in {tuple(set(df_seller_asin_account.asin))};"
conn.execute(sql_delete) conn.execute(sql_delete)
df_seller_asin_account.to_sql(self.db_seller_asin_account, con=self.engine, self.engine.to_sql(df_seller_asin_account,self.db_seller_asin_account,
if_exists='append', if_exists='append')
index=False)
buyBoxname_asin_list = [] buyBoxname_asin_list = []
break break
except Exception as e: except Exception as e:
...@@ -402,6 +395,7 @@ class Save_asin_detail(BaseUtils): ...@@ -402,6 +395,7 @@ class Save_asin_detail(BaseUtils):
self.engine_pg = self.pg_connect() self.engine_pg = self.pg_connect()
two_dimensional_list = [[x] for x in asin_list] two_dimensional_list = [[x] for x in asin_list]
df_asin = pd.DataFrame(data=two_dimensional_list, columns=['asin']) df_asin = pd.DataFrame(data=two_dimensional_list, columns=['asin'])
df_asin.drop_duplicates(['asin'], inplace=True) # 去重
with self.engine_pg.begin() as conn: with self.engine_pg.begin() as conn:
if len(set(df_asin.asin)) == 1: if len(set(df_asin.asin)) == 1:
sql_delete = f"delete from {self.site_name}_all_syn_st_asin where asin in ('{tuple(df_asin.asin)[0]}');" sql_delete = f"delete from {self.site_name}_all_syn_st_asin where asin in ('{tuple(df_asin.asin)[0]}');"
...@@ -409,8 +403,7 @@ class Save_asin_detail(BaseUtils): ...@@ -409,8 +403,7 @@ class Save_asin_detail(BaseUtils):
sql_delete = f"delete from {self.site_name}_all_syn_st_asin where asin in {tuple(set(df_asin.asin))};" sql_delete = f"delete from {self.site_name}_all_syn_st_asin where asin in {tuple(set(df_asin.asin))};"
conn.execute(sql_delete) conn.execute(sql_delete)
df_asin['state'] = state df_asin['state'] = state
df_asin.to_sql(f'{self.site_name}_all_syn_st_asin', con=self.engine_pg, if_exists='append', self.engine_pg.to_sql(df_asin,f'{self.site_name}_all_syn_st_asin',if_exists='append')
index=False)
break break
except Exception as e: except Exception as e:
self.engine = self.mysql_connect() self.engine = self.mysql_connect()
......
...@@ -3,7 +3,7 @@ import os ...@@ -3,7 +3,7 @@ import os
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from amazon_params.params import DB_CONN_DICT from amazon_params.params import DB_CONN_DICT
from amazon_params.params import PG_CONN_DICT, PG_CONN_DICT_6,PG_CONN_DICT_21 from amazon_params.params import PG_CONN_DICT
from sqlalchemy import create_engine from sqlalchemy import create_engine
from sqlalchemy.pool import NullPool from sqlalchemy.pool import NullPool
import pymysql import pymysql
...@@ -96,38 +96,6 @@ class connect_db(): ...@@ -96,38 +96,6 @@ class connect_db():
time.sleep(nums * 20) time.sleep(nums * 20)
continue continue
def pg6_db(self):
nums = 0
while True:
nums += 1
try:
if self.site_name == 'us':
db = 'selection'
else:
db = f'selection_{self.site_name}'
self.engine_pg = create_engine(
f"postgresql+psycopg2://{PG_CONN_DICT_6['pg_user']}:{PG_CONN_DICT_6['pg_pwd']}@{PG_CONN_DICT_6['pg_host']}:{PG_CONN_DICT_6['pg_port']}/{db}",
encoding='utf-8')
return self.engine_pg
except Exception as e:
print("error_pg_connect:", e, f"\n{traceback.format_exc()}")
time.sleep(nums * 20)
continue
def pg_21_db(self):
nums = 0
while True:
try:
if self.site_name == 'us':
db = 'selection'
else:
db = f'selection_{self.site_name}'
engine_pg6 = create_engine(
f"postgresql+psycopg2://{PG_CONN_DICT_21['pg_user']}:{PG_CONN_DICT_21['pg_pwd']}@{PG_CONN_DICT_21['pg_host']}:{PG_CONN_DICT_21['pg_port']}/{db}",
encoding='utf-8', connect_args={"connect_timeout": 10}, poolclass=NullPool)
return engine_pg6
except Exception as e:
print("error_mysql_connect11111111111111111111111:", e, f"\n{traceback.format_exc()}")
continue
def mysql_engine(self): def mysql_engine(self):
nums = 0 nums = 0
......
...@@ -443,7 +443,7 @@ class ParseAsinUs(object): ...@@ -443,7 +443,7 @@ class ParseAsinUs(object):
if href_asin_list: if href_asin_list:
bundle_component_asin_list = [] bundle_component_asin_list = []
for href_asin in href_asin_list: for href_asin in href_asin_list:
i_asin_list = re.findall(r'(?:[A-Z0-9]{10}|[0-9]{10})', href_asin) i_asin_list = re.findall(r'/dp/(.*)', href_asin)
bundle_component_asin_list.append(i_asin_list[0]) bundle_component_asin_list.append(i_asin_list[0])
if bundle_component_asin_list: if bundle_component_asin_list:
bundle_component_asin_list = list(set(bundle_component_asin_list)) bundle_component_asin_list = list(set(bundle_component_asin_list))
...@@ -460,9 +460,13 @@ class ParseAsinUs(object): ...@@ -460,9 +460,13 @@ class ParseAsinUs(object):
bundle_asin_review = bundle_review_list[0] if bundle_review_list else None bundle_asin_review = bundle_review_list[0] if bundle_review_list else None
bundle_starslist = self.response_s.xpath( bundle_starslist = self.response_s.xpath(
rf"//a[contains(@href,'{bundle_component_asin}')]/i[contains(@class,'component-details-component-review-stars')]/@class") rf"//a[contains(@href,'{bundle_component_asin}')]/i[contains(@class,'component-details-component-review-stars')]/@class")
print('bundle_starslist::', bundle_starslist)
bundle_stars = bundle_starslist[0] if bundle_starslist else None bundle_stars = bundle_starslist[0] if bundle_starslist else None
if bundle_stars:
bundle_stars_list = re.findall(r'a-star-(.*?) ', bundle_stars) bundle_stars_list = re.findall(r'a-star-(.*?) ', bundle_stars)
bundle_asin_star = bundle_stars_list[0].replace('-', '.') if bundle_stars_list else None bundle_asin_star = bundle_stars_list[0].replace('-', '.') if bundle_stars_list else None
else:
bundle_asin_star = None
bundle_asin_price_list = self.response_s.xpath( bundle_asin_price_list = self.response_s.xpath(
f"//a[contains(@href,'{bundle_component_asin}')]/parent::div/following-sibling::div[contains(@class,'component-details-component-prices')]/span/text()") f"//a[contains(@href,'{bundle_component_asin}')]/parent::div/following-sibling::div[contains(@class,'component-details-component-prices')]/span/text()")
bundle_asin_price = bundle_asin_price_list[0] if bundle_asin_price_list else None bundle_asin_price = bundle_asin_price_list[0] if bundle_asin_price_list else None
......
...@@ -112,8 +112,9 @@ class Requests_param_val(BaseUtils): ...@@ -112,8 +112,9 @@ class Requests_param_val(BaseUtils):
with self.engine.begin() as conn: with self.engine.begin() as conn:
sql_read = f'SELECT cookies,id FROM {self.db_cookies} limit 350;' sql_read = f'SELECT cookies,id FROM {self.db_cookies} limit 350;'
print("获取cookie:", sql_read) print("获取cookie:", sql_read)
a = conn.execute(sql_read) # a = conn.execute(sql_read)
df_read = pd.DataFrame(a, columns=['cookies', 'id']) # df_read = pd.DataFrame(a, columns=['cookies', 'id'])
df_read = self.engine.read_sql(sql_read)
clientPriceList = list(df_read.cookies + "|-|" + df_read.id.astype("U")) clientPriceList = list(df_read.cookies + "|-|" + df_read.id.astype("U"))
for ck in clientPriceList: for ck in clientPriceList:
cookie_dict[ck.split("|-|")[1]] = ck.split("|-|")[0] cookie_dict[ck.split("|-|")[1]] = ck.split("|-|")[0]
...@@ -129,11 +130,12 @@ class Requests_param_val(BaseUtils): ...@@ -129,11 +130,12 @@ class Requests_param_val(BaseUtils):
def db_column(self, site): def db_column(self, site):
if site in ('us', 'de', 'uk'): if site in ('us', 'de', 'uk'):
asin_detail_table = f'select * from {site}_asin_detail_month_2025 limit 0' asin_detail_table = f'select * from {site}_asin_detail_month_2025 limit 1'
else: else:
asin_detail_table = f'select * from {site}_asin_detail_2025 limit 0' asin_detail_table = f'select * from {site}_asin_detail_2025 limit 1'
print(asin_detail_table) print(asin_detail_table)
df = pd.read_sql(asin_detail_table, con=self.engine_pg) # df = pd.read_sql(asin_detail_table, con=self.engine_pg)
df = self.engine_pg.read_sql(asin_detail_table)
# 获取字段名称 # 获取字段名称
columns_list = list(df.columns) columns_list = list(df.columns)
columns_list.remove('id') columns_list.remove('id')
...@@ -347,4 +349,4 @@ class Requests_param_val(BaseUtils): ...@@ -347,4 +349,4 @@ class Requests_param_val(BaseUtils):
md5_hex_digest = md5_hash.hexdigest() md5_hex_digest = md5_hash.hexdigest()
return md5_hex_digest return md5_hex_digest
if __name__ == '__main__': if __name__ == '__main__':
Requests_param_val(site_name='de').get_minid_maxid() Requests_param_val(site_name='us').get_minid_maxid()
import json
import pandas as pd import pandas as pd
import requests, time import numpy as np
import orjson, requests, time
from typing import List from typing import List
# -------- 映射字典 -------- # -------- 映射字典 --------
...@@ -20,15 +23,70 @@ db_type_alias_map = { ...@@ -20,15 +23,70 @@ db_type_alias_map = {
"postgresql_15_outer": "postgresql_15_outer", # pg15正式库-外网 "postgresql_15_outer": "postgresql_15_outer", # pg15正式库-外网
"postgresql_cluster": "postgresql_cluster", # pg集群-内网 "postgresql_cluster": "postgresql_cluster", # pg集群-内网
"postgresql_cluster_outer": "postgresql_cluster_outer", # pg集群-外网 "postgresql_cluster_outer": "postgresql_cluster_outer", # pg集群-外网
"doris": "doris", # pg集群-内网 "doris": "doris", # doris集群-内网
} }
DEFAULT_SERVERS = [ DEFAULT_SERVERS = [
"http://192.168.200.210:7777", # "http://192.168.10.217:7777", # 内网
"http://192.168.1.102:7777", "http://61.145.136.61:7779", # 外网
] ]
# --------------------------- # ---------------------------
def df_to_json_records(df: pd.DataFrame) -> list:
"""保证 DataFrame 可安全序列化为 JSON records(处理 NaN / ±Inf)"""
df_clean = df.copy()
# 1️⃣ 替换 ±Inf -> NaN
num_cols = df_clean.select_dtypes(include=[np.number]).columns
if len(num_cols):
df_clean[num_cols] = df_clean[num_cols].replace([np.inf, -np.inf], np.nan)
# 2️⃣ 替换 NaN -> None(注意:有时 astype(object) 不彻底,需用 applymap)
df_clean = df_clean.applymap(lambda x: None if pd.isna(x) else x)
# 3️⃣ 转为 dict records
return df_clean.to_dict("records")
def clean_json_field_for_orjson(v):
"""清洗单个 JSON 字段的值,使其符合 orjson 要求并避免空字典入库"""
if v is None or pd.isna(v):
return None
# 1️⃣ 如果是空字典对象,返回 None
if isinstance(v, dict) and not v:
return None
# 2️⃣ 如果是空字符串或仅为 "{}",返回 None
if isinstance(v, str):
stripped = v.strip()
if not stripped or stripped == "{}":
return None
try:
parsed = json.loads(stripped)
if isinstance(parsed, dict) and not parsed:
return None
return json.dumps(parsed, ensure_ascii=False)
except Exception:
return v # 非 JSON 字符串则原样保留
return v
def fully_clean_for_orjson(df: pd.DataFrame) -> pd.DataFrame:
"""全面清洗 DataFrame 以符合 orjson 要求"""
df = df.replace([np.inf, -np.inf], np.nan)
df = df.applymap(lambda x: None if pd.isna(x) else x)
# 找出所有可能为 JSON 字符串的字段
json_like_cols = [col for col in df.columns if col.endswith('_json')]
# 针对每个 JSON-like 字段,应用清洗函数
for col in json_like_cols:
df[col] = df[col].apply(clean_json_field_for_orjson)
return df
class RemoteTransaction: class RemoteTransaction:
...@@ -40,8 +98,18 @@ class RemoteTransaction: ...@@ -40,8 +98,18 @@ class RemoteTransaction:
self.urls = urls self.urls = urls
self.sql_queue = [] self.sql_queue = []
def execute(self, sql: str): # def execute(self, sql: str):
self.sql_queue.append(sql) # self.sql_queue.append(sql)
def execute(self, sql: str, params=None):
"""
params 可取:
• None → 纯文本 SQL
• dict → 单条参数化 e.g. {"id":1,"name":"a"}
• list/tuple → 批量 executemany
- list[dict] ↔ INSERT .. VALUES (:id,:name)
- list[tuple] ↔ INSERT .. VALUES (%s,%s)
"""
self.sql_queue.append({"sql": sql, "params": params})
def __enter__(self): return self def __enter__(self): return self
...@@ -74,14 +142,43 @@ class RemoteEngine: ...@@ -74,14 +142,43 @@ class RemoteEngine:
for url in self.urls: for url in self.urls:
for _ in range(self.retries): for _ in range(self.retries):
try: try:
json_bytes = orjson.dumps(payload)
r = self.session.post(f"{url}/{endpoint}", r = self.session.post(f"{url}/{endpoint}",
json=payload, timeout=10) data=json_bytes,
headers={"Content-Type": "application/json"},
timeout=60)
# r = self.session.post(f"{url}/{endpoint}",
# json=payload, timeout=10)
r.raise_for_status() r.raise_for_status()
return r.json() return r.json()
except Exception as e: except Exception as e:
print(f"[WARN] {endpoint} fail @ {url}: {e}") print(f"[WARN] {endpoint} fail @ {url}: {e}")
time.sleep(1) time.sleep(1)
raise RuntimeError(f"All servers failed for {endpoint}") raise RuntimeError(f"All servers failed for {endpoint}")
# def _request(self, endpoint: str, payload):
# # 用 orjson,“allow_nan” 会把 NaN/Inf 写成 null
# # json_bytes = orjson.dumps(payload,
# # option=orjson.OPT_NON_STR_KEYS | orjson.OPT_NAIVE_UTC | orjson.OPT_OMIT_MICROSECOND | orjson.OPT_ALLOW_INF_AND_NAN)
# json_bytes = orjson.dumps(
# payload,
# option=orjson.OPT_NON_STR_KEYS | orjson.OPT_NAIVE_UTC | orjson.OPT_ALLOW_INF_AND_NAN
# )
#
# headers = {"Content-Type": "application/json"}
#
# for url in self.urls:
# for _ in range(self.retries):
# try:
# r = self.session.post(f"{url}/{endpoint}",
# data=json_bytes, headers=headers,
# timeout=15)
# r.raise_for_status()
# return r.json()
# except Exception as e:
# print(f"[WARN] {endpoint} fail @ {url}: {e}")
# time.sleep(1)
# raise RuntimeError(f"All servers failed for {endpoint}")
# ---------- 公共 API ---------- # ---------- 公共 API ----------
def read_sql(self, sql: str) -> pd.DataFrame: def read_sql(self, sql: str) -> pd.DataFrame:
...@@ -92,11 +189,13 @@ class RemoteEngine: ...@@ -92,11 +189,13 @@ class RemoteEngine:
return pd.DataFrame(data["result"]) return pd.DataFrame(data["result"])
def to_sql(self, df: pd.DataFrame, table: str, if_exists="append"): def to_sql(self, df: pd.DataFrame, table: str, if_exists="append"):
return self._request("insert", return self._request("insert",
{"db": self.db, {"db": self.db,
"table": table, "table": table,
"if_exists": if_exists, "if_exists": if_exists,
"data": df.to_dict("records"), "data": fully_clean_for_orjson(df=df).to_dict("records"),
# "data": df_to_json_records(df), # ← 清洗后的 records
"site_name": self.database}) "site_name": self.database})
def begin(self): def begin(self):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment