Commit 6ecb8bff by Peng

重构下载店铺后台类目分析板块数据。主要是解决浏览器运行时间长出现崩溃页面。还增加完成十次小类数据获取就重启浏览器释放内存。记录上次位置。断点续抓。

parent 6dd760f3
import datetime
import json
import os
import random
import re
import sys
import time
import traceback
import pandas as pd
from lxml import etree
from secure_db_client import get_remote_engine
from selenium.webdriver.common.by import By
from selenium import webdriver
from selenium.webdriver.edge.service import Service
from selenium.webdriver.edge.options import Options
from selenium.common.exceptions import TimeoutException, WebDriverException
syn_state = False
class NeedRestart(Exception):
"""用于触发外层重启 driver,。"""
pass
class dow_category_Product:
# 稳定性参数
PAGELOAD_TIMEOUT = 80
SCRIPT_TIMEOUT = 60
IMPLICIT_WAIT = 0
MAX_ACTION_RETRY = 3
MAX_DRIVER_RESTARTS = 999999 # 长期任务:基本不限制)
def __init__(self, site):
self.site_name = site
self.click_product_name_list = []
self.update_cagetory_state = False
self.engine_mysql = None
self.engine_us_mysql = None
self.engine_pg = None
self.num = 0
week = time.strftime("%W")
yaer = time.strftime('%Y', time.localtime(time.time()))
self.y_w = f"{yaer}-{week}"
# ---------------------- DB ----------------------
def mysql_connect(self, site='us'):
self.engine_mysql = get_remote_engine(site_name=site, db_type='mysql')
self.engine_us_mysql = get_remote_engine(site_name='us', db_type='mysql')
self.engine_pg = get_remote_engine(site_name=site, db_type='postgresql_15_outer')
self.num = 0
week = time.strftime("%W")
yaer = time.strftime('%Y', time.localtime(time.time()))
self.y_w = f"{yaer}-{week}"
# ---------------------- Driver 管理 & 稳定层 ----------------------
def _kill_edge_process(self):
# 保持你原行为:强制关闭 msedge,减少 profile 被锁导致的奇怪崩溃
try:
os.system("taskkill /F /IM msedge.exe")
except Exception as e:
print("强制关闭msedge.exe失败:", e)
def _build_driver(self):
print('初始化')
try:
pr_name = "msedge.exe"
os.system('%s%s' % ("taskkill /F /IM ", pr_name))
except Exception as e:
print("强制关闭chrome.exe失败:", e)
time.sleep(2)
edge_options = Options()
# Windows 下移除 Linux 参数,减少启动崩概率
edge_options.add_argument("--disable-gpu")
# 保持免登录 profile(路径不改)
edge_options.add_argument(r'--user-data-dir=C:\Users\FLA主账号客服维权使用\AppData\Local\Microsoft\Edge\User Data')
edge_options.add_argument('--profile-directory=Default')
# 降低“首次运行/恢复弹窗/扩展”对启动的干扰(不影响登录态)
edge_options.add_argument("--no-first-run")
edge_options.add_argument("--no-default-browser-check")
edge_options.add_argument("--disable-extensions")
edge_options.add_argument("--disable-notifications")
# 避免 DevToolsActivePort 问题的一些机器上更稳
edge_options.add_argument("--remote-debugging-port=0")
# 保留后台节流相关(一般没问题)
edge_options.add_argument("--disable-backgrounding-occluded-windows")
edge_options.add_argument("--disable-renderer-backgrounding")
edge_options.add_argument("--disable-background-timer-throttling")
edge_options.add_argument("--disable-features=CalculateNativeWinOcclusion")
edge_options.add_argument("--remote-allow-origins=*")
edge_options.page_load_strategy = "eager"
service = Service(r"D:\python\msedgedriver.exe")
driver = webdriver.Edge(service=service, options=edge_options)
driver.set_page_load_timeout(self.PAGELOAD_TIMEOUT)
driver.set_script_timeout(self.SCRIPT_TIMEOUT)
driver.implicitly_wait(self.IMPLICIT_WAIT)
return driver
def _is_page_crashed(self, driver):
"""尽量快速判断是否 Edge/Chromium 的崩溃页。"""
try:
title = (driver.title or "").lower()
if "aw, snap" in title or "崩溃" in title or "crashed" in title:
return True
src = driver.page_source or ""
# 常见崩溃页关键字(Edge/Chrome 都可能出现)
crash_keywords = [
"Aw, Snap", "STATUS_ACCESS_VIOLATION", "RESULT_CODE_HUNG",
"This page has crashed", "页面已崩溃", "Renderer process","此页存在问题",
"刷新此页面","错误代码"
]
return any(k in src for k in crash_keywords)
except Exception:
return True # 连 title/page_source 都取不到,基本就是崩了/断了
def _jitter(self, a=0.6, b=1.6):
time.sleep(random.uniform(a, b))
def _safe_action(self, action_name, fn, driver, site=None):
"""
统一的稳定执行器:
- 捕获 Timeout/WebDriver 崩溃类异常
- 先尝试 refresh 恢复
- 不行就抛 NeedRestart,由外层重建 driver 并继续
"""
last_err = None
for attempt in range(1, self.MAX_ACTION_RETRY + 1):
try:
if self._is_page_crashed(driver):
raise WebDriverException("Detected crashed page")
return fn()
except (TimeoutException, WebDriverException) as e:
last_err = e
msg = str(e).lower()
print(f"[{action_name}] 第{attempt}次失败:{e}")
# 常见“页面崩溃/断连/渲染挂掉”关键词
crash_like = any(x in msg for x in [
"page crash", "crash", "renderer", "disconnected",
"not connected to devtools", "session deleted",
"cannot determine loading status", "target window already closed"
])
# 先尝试 refresh(刷新后能继续跑)
if attempt < self.MAX_ACTION_RETRY and crash_like:
try:
print(f"[{action_name}] 尝试 refresh 恢复...")
driver.refresh()
self._jitter(3, 6)
# 恢复后重新定位到 category-insights)
if site:
self._ensure_category_insights(driver, site)
continue
except Exception as e2:
print(f"[{action_name}] refresh 也失败:{e2}")
# 走到这里:说明需要重启 driver
break
except Exception as e:
# 业务解析类异常:你原来大多是 print + continue,这里不强行重启
last_err = e
print(f"[{action_name}] 非 webdriver 异常:{e}\n{traceback.format_exc()}")
raise
raise NeedRestart(f"[{action_name}] 需要重启 driver:{last_err}")
def _select_site_radio(self, driver, site):
# 保持你原来的 shadowRoot 点击逻辑
if site == 'us':
js = ('document.querySelector("#ATVPDKIKX0DER > kat-radiobutton")'
'.shadowRoot.querySelector("div > div.text > slot > kat-label:nth-child(1)").click()')
elif site == 'uk':
js = ('document.querySelector("#A1F83G8C2ARO7P > kat-radiobutton")'
'.shadowRoot.querySelector("div > div.text > slot > kat-label:nth-child(1)").click()')
elif site == 'de':
js = ('document.querySelector("#A1PA6795UKMFR9 > kat-radiobutton")'
'.shadowRoot.querySelector("div > div.text > slot > kat-label:nth-child(1)").click()')
else:
return
driver.execute_script(js)
def _ensure_category_insights(self, driver, site):
def _open():
print('打开首页')
# 你原逻辑:先 home 再 category-insights
driver.get('https://sellercentral.amazon.com/gp/homepage.html/ref=xx_home_logo_xx?')
self._jitter(6, 10)
driver.get('https://sellercentral.amazon.com/selection/category-insights')
self._jitter(6, 10)
return True
self._safe_action("open_category_insights", _open, driver, site=site)
def _click_site():
self._select_site_radio(driver, site)
self._jitter(2, 4)
return True
self._safe_action("select_site", _click_site, driver, site=site)
def get_category(self, site, driver):
# 你原逻辑:get 两次,点站点 radio,保存 category,然后继续
self._ensure_category_insights(driver, site)
time.sleep(1)
html = etree.HTML(driver.page_source)
self.save_category(html)
print(333333333333333333333333)
global syn_state
syn_state = True
Category_list = self.read_category()
if Category_list:
self.get_category_data(Category_list, driver, site)
def cilik_site(self, driver):
# 修复你原 bug:这里不该用未定义的 site 变量
self._select_site_radio(driver, self.site_name)
time.sleep(3)
def get_category_data(self, Category_list, driver, site):
print('Category_list:::', Category_list)
num = 0
for Category in Category_list:
try:
# 关键:每个大循环都做一次“崩溃检测+必要时恢复”
self._safe_action("loop_healthcheck", lambda: True, driver, site=site)
self.cilik_site(driver)
print(Category, ' 22222222222222222222222222222222222222')
if self.update_cagetory_state:
self.click_product_name_list = []
num += 1
Category_name = Category
print("Category_name 名称 11111", Category)
# category radiobutton click(套稳定层)
def _click_category():
driver.execute_script(f"""document.querySelector("kat-radiobutton[label='{Category}']").click()""")
return True
self._safe_action("click_category", _click_category, driver, site=site)
time.sleep(1)
html = etree.HTML(driver.page_source)
Product_Type_list = html.xpath('//h2[contains(text(),"Product Type")]/following-sibling::div/div')
product_nums = 0
for Product_Type in Product_Type_list:
try:
save_Category_list = []
Product_name = Product_Type.xpath('./@id')
print(product_nums, "Product_name3222222222::", Product_name[0].upper())
if Product_name[0] in self.click_product_name_list:
print(product_nums, "已经抓取::", Product_name[0].upper())
continue
self.click_product_name_list.append(Product_name[0])
self.update_cagetory_state = False
def _click_product_type():
driver.execute_script(f"document.querySelector('#{Product_name[0]} > kat-radiobutton').click()")
return True
self._safe_action("click_product_type", _click_product_type, driver, site=site)
time.sleep(1.5)
html = etree.HTML(driver.page_source)
Item_Type_Keyword_id_list = html.xpath(
'//h2[contains(text(),"Item Type Keyword")]/following-sibling::div/div'
)
print('Item_Type_Keyword_id_list::', len(Item_Type_Keyword_id_list))
for Item_Type_Keyword_id in Item_Type_Keyword_id_list:
print('当前请求关键词:', Item_Type_Keyword_id)
try:
most_popular_keyword_list = []
reasons_returns_json = None
most_popular_json_dict = None
Keyword_id = Item_Type_Keyword_id.xpath('./@id')
print("Keyword_id:", Keyword_id)
Keyword = html.xpath(f"//div[@id='{Keyword_id[0]}']/kat-radiobutton/@label")
print('Keyword', Keyword)
def _click_keyword():
driver.find_element(By.XPATH, f'//kat-radiobutton[@value="{Keyword_id[0]}"]').click()
return True
self._safe_action("click_keyword", _click_keyword, driver, site=site)
time.sleep(1.5)
html_1 = etree.HTML(driver.page_source)
most_popular_list = html_1.xpath(
"//div[@class='most-popular-keywords-container']/kat-list//li"
)
if most_popular_list:
for most_popular in most_popular_list:
most_keyword_list = most_popular.xpath('.//div[2]/text()')
most_popular_keyword = most_keyword_list[0] if most_keyword_list else None
most_popular_b_nums_list = most_popular.xpath('.//div/b/text()')
most_popular_b_nums = most_popular_b_nums_list[0] if most_popular_b_nums_list else None
most_popular_dict = {
"most_popular_keywords": most_popular_keyword,
'most_popular_search_nums': most_popular_b_nums
}
most_popular_keyword_list.append(most_popular_dict)
most_popular_json_dict = json.dumps(most_popular_keyword_list)
div_list = html_1.xpath("//div[@class='percentage-list-item-container']/div")
if div_list:
reasons_returns_list = []
for div in div_list:
values = div.xpath("./div[@class='value']/text()")
value = values[0] if values else None
strings = div.xpath("./div[@class='string']/text()")
string = strings[0] if strings else None
reasons_returns_list.append({"value": value, 'string': string})
reasons_returns_json = json.dumps(reasons_returns_list)
ratio_list = html_1.xpath(
'//div[@class="big-text-section-name"][1]/div[@class="big-text"]/text()'
)
if ratio_list:
search_ratio = re.findall(r'(.*?)‰', ratio_list[0])[0]
return_ratio = re.findall(r'(.*?)%', ratio_list[1])[0]
else:
search_ratio = None
return_ratio = None
product_ratio_list = html_1.xpath(
'//div[@class="big-text-section-name"][1]/div[@class="sub-text"]/text()'
)
if product_ratio_list:
product_average = re.findall(r'(.*?)‰', product_ratio_list[0])[0]
return_product_average = re.findall(r'(.*?)%', product_ratio_list[1])[0]
else:
product_average = None
return_product_average = None
big_text_sllers = html_1.xpath(
'//h4[contains(text(),"Number of sellers")]/parent::div/following-sibling::div/text()'
)
big_text_sller = big_text_sllers[0] if big_text_sllers else None
big_text_brands = html_1.xpath(
'//h4[contains(text(),"Number of new brands")]/parent::div/following-sibling::div/text()'
)
big_text_brand = big_text_brands[0] if big_text_brands else None
big_text_asins = html_1.xpath(
'//h4[contains(text(),"Number of ASINs")]/parent::div/following-sibling::div/text()'
)
big_text_asin = big_text_asins[0] if big_text_asins else None
big_text_new_asins = html_1.xpath(
'//h4[contains(text(),"Number of new ASINs")]/parent::div/following-sibling::div/text()'
)
big_text_new_asin = big_text_new_asins[0] if big_text_new_asins else None
big_text_per_asins = html_1.xpath(
'//h4[contains(text(),"Offers per ASIN")]/parent::div/following-sibling::div/text()'
)
big_text_per_asin = big_text_per_asins[0] if big_text_per_asins else None
big_text_Advertisement_list = html_1.xpath(
'//h4[contains(text(),"Advertisement Spend")]/parent::div/following-sibling::div//text()'
)
big_text_Advertisement = '|-|'.join(big_text_Advertisement_list) if big_text_Advertisement_list else None
big_text_star_list = html_1.xpath(
'//h4[contains(text(),"Star Ratings")]/parent::div/parent::div/parent::div/div//@width'
)
if big_text_star_list:
del big_text_star_list[0]
big_text_star = '|-|'.join(big_text_star_list)
else:
big_text_star = None
if big_text_star:
sta_list = big_text_star.split('|-|')
total = 0.0
for num_str in sta_list:
total += float(num_str)
results = [float(num) / total if float(num) != 0 else 0 for num in sta_list]
five_star = round(results[0], 2)
three_star = round(results[1], 2)
two_star = round(results[2], 2)
one_star = round(results[3], 2)
else:
five_star = 0
three_star = 0
two_star = 0
one_star = 0
if big_text_Advertisement:
if self.site_name == 'us':
pattern = r'\$([\d.]+)'
elif self.site_name == 'uk':
pattern = r'\£([\d.]+)'
elif self.site_name == 'de':
pattern = r'\€([\d.]+)'
else:
pattern = ''
matches_list = re.findall(pattern, big_text_Advertisement)
ad_spend = matches_list[0] if matches_list else None
majority_spend = matches_list[1] if matches_list else None
else:
ad_spend = 0
majority_spend = 0
# 转 int(保持你原逻辑)
if big_text_brand:
if 'K' in big_text_brand:
big_brand_int = int(float(big_text_brand.replace('K', '')) * 1000)
elif 'M' in big_text_brand:
big_brand_int = int(float(big_text_brand.replace('M', '')) * 10000)
else:
big_brand_int = int(big_text_brand)
else:
big_brand_int = None
if big_text_asin:
if 'K' in big_text_asin:
big_asin_int = int(float(big_text_asin.replace('K', '')) * 1000)
elif 'M' in big_text_asin:
big_asin_int = int(float(big_text_asin.replace('M', '')) * 10000)
else:
big_asin_int = int(big_text_asin)
else:
big_asin_int = None
if big_text_new_asin:
if 'K' in big_text_new_asin:
big_new_asin_int = int(float(big_text_new_asin.replace('K', '')) * 1000)
elif 'M' in big_text_new_asin:
big_new_asin_int = int(float(big_text_new_asin.replace('M', '')) * 10000)
else:
big_new_asin_int = int(big_text_new_asin)
else:
big_new_asin_int = None
if big_text_per_asin:
if 'K' in big_text_per_asin:
big_per_asin_int = int(float(big_text_per_asin.replace('K', '')) * 1000)
elif 'M' in big_text_per_asin:
big_per_asin_int = int(float(big_text_per_asin.replace('M', '')) * 10000)
else:
big_per_asin_int = int(big_text_per_asin)
else:
big_per_asin_int = None
top_data_json = self.new_top_grossing(driver, 'Top')
top_data_dict = json.loads(top_data_json)
if top_data_dict.get('products_aggregate_sales'):
_top_data_dict = self.parse_input('top', top_data_dict)
else:
_top_data_dict = self.parse_input('top', None)
news_data_json = self.new_top_grossing(driver, 'News')
news_data_dict = json.loads(news_data_json)
if news_data_dict.get('products_aggregate_sales'):
_news_data_dict = self.parse_input('news', news_data_dict)
else:
_news_data_dict = self.parse_input('news', None)
save_Category_list.append([
Category_name, Product_name[0], Keyword[0],
float(search_ratio), float(product_average), float(return_ratio), float(return_product_average),
self.y_w, big_text_sller, big_text_brand, big_text_asin, big_text_new_asin,
big_text_per_asin, big_text_Advertisement, big_text_star, big_brand_int,
big_asin_int, big_new_asin_int, big_per_asin_int, five_star, three_star, two_star,
one_star, ad_spend, majority_spend, most_popular_json_dict, reasons_returns_json,
top_data_json, news_data_json,
_top_data_dict['top_sales_amount'], _top_data_dict['top_sales_volume'],
_top_data_dict['top_search_ratio'], _top_data_dict['top_return_ratio'],
_top_data_dict['top_adv_spend'], _top_data_dict['top_majority_spend'],
_news_data_dict['news_sales_amount'], _news_data_dict['news_sales_volume'],
_news_data_dict['news_search_ratio'], _news_data_dict['news_return_ratio'],
_news_data_dict['news_adv_spend'], _news_data_dict['news_majority_spend']
])
except Exception as e:
print('============ 下标。超出 。 ==========', e)
# 入库(保持你原逻辑,只是不在这里递归重启)
print('存储数据长度:', len(save_Category_list))
while True:
try:
if save_Category_list:
with self.engine_pg.begin() as conn_pg:
for i in save_Category_list:
dele_sql = (
f"DELETE from {site}_aba_profit_category_insights "
f"where category='{i[0]}' and product_type='{i[1]}' "
f"and item_type_keyword='{i[2]}' and year_week='{self.y_w}'"
)
print('删除删除删除pg:', dele_sql)
conn_pg.execute(dele_sql)
df = pd.DataFrame(data=save_Category_list, columns=[
'category', "product_type", "item_type_keyword",
"search_ratio", "product_average", "return_ratio", "return_product_average",
"year_week", 'sellers', 'new_brands', 'asin', 'new_asin', 'per_asin',
'advertisement_spend', 'star_ratings', 'new_brands_int', 'asin_int',
'new_asin_int', 'per_asin_int', 'five_star', 'three_star', 'two_star',
'one_star', 'ad_spend', 'majority_spend', 'most_popular_keywords_item',
'reasons_returns_json', 'top_data_json', 'news_data_json',
'top_sales_amount', 'top_sales_volume', 'top_search_ratio', 'top_return_ratio',
'top_adv_spend', 'top_majority_spend',
'news_sales_amount', 'news_sales_volume', 'news_search_ratio', 'news_return_ratio',
'news_adv_spend', 'news_majority_spend'
])
self.engine_pg.to_sql(df, f'{site}_aba_profit_category_insights', if_exists="append")
print('存储成功 pg')
break
except Exception as e:
print('存储报错。。。。:', e)
time.sleep(30)
continue
except NeedRestart as e:
print(e)
raise # 交给外层重启 driver
except Exception as e:
print(e, '===产品分类 下标。超出 。无数据 ===', f"\n{traceback.format_exc()}")
time.sleep(2)
product_nums += 1
if product_nums > 12:
print('product_nums 超过阈值,触发重启 driver 以释放资源')
raise NeedRestart("product_nums>12 trigger restart")
# 更新状态 state=3(保持)
while True:
try:
with self.engine_pg.begin() as conn:
update_sql = f"update seller_category_insights_syn set state =3 where category='{Category}'"
print('更新update_sql:', update_sql)
conn.execute(update_sql)
self.update_cagetory_state = True
break
except Exception as e:
print(e, '修改状态3报错')
time.sleep(20)
# 你原逻辑:num>1 重启浏览器(保留)
if num > 1:
driver.close()
driver.quit()
print('重新启动 浏览器,')
self.run()
raise NeedRestart("num>1 trigger restart")
except NeedRestart as e:
print(f"[NeedRestart] {e}")
# 外层会重启 driver 并继续(从 state=1 断点续跑)
raise
except Exception as e:
print(e, '执行错误')
time.sleep(random.uniform(10, 20))
# 你原本这里是 reboot_driver(driver, site)
# 我们改成抛 NeedRestart:让外层统一重建 driver,避免递归
raise NeedRestart(f"执行错误触发重启:{e}")
# 全部完成写 workflow_progress(保持)
workflow_everyday_list = [
[self.site_name, self.y_w, '类目分析抓取完成', 3, f'{self.site_name}_aba_profit_category_insights', 'week', '类目分析', '是']
]
df_seller_asin_account = pd.DataFrame(data=workflow_everyday_list, columns=[
'site_name', 'date_info', 'status', 'status_val', 'table_name', 'date_type', 'page', 'is_end'
])
self.engine_us_mysql.to_sql(df_seller_asin_account, 'workflow_progress', if_exists='append')
def safe_get(self, lst, idx, default=None):
return lst[idx] if 0 <= idx < len(lst) else default
def parse_input(self, type, input):
sales_amount = 0
sales_volume = 0
search_ratio = 0
return_ratio = 0
adv_spend = 0
majority_spend = 0
if input:
products_aggregate_sales = input.get('products_aggregate_sales', [])[0]
if products_aggregate_sales:
split = products_aggregate_sales.split("|")
sales_amount_str = self.safe_get(split, 1, '').partition("$")[-1]
sales_volume_str = self.safe_get(re.findall(r'\d+', self.safe_get(split, 2, '')), 0, "0")
if len(sales_amount_str) > 0:
sales_amount = float(sales_amount_str.strip().replace(",", ""))
sales_volume = float(sales_volume_str)
search_ratio = float(input.get('search_ratio') or -1)
return_ratio = float(input.get('return_ratio') or -1)
big_text_Advertisement = input.get('big_text_Advertisement')
if big_text_Advertisement:
split = big_text_Advertisement.split("|-|")
adv_spend_str = self.safe_get(split, 0, '').partition("$")[-1]
majority_spend_str = self.safe_get(split, 1, '').partition("$")[-1]
adv_spend = (float(adv_spend_str.strip()) if adv_spend_str != '' else 0)
majority_spend = (float(majority_spend_str.strip()) if majority_spend_str != '' else 0)
return {
f"{type}_sales_amount": sales_amount,
f"{type}_sales_volume": sales_volume,
f"{type}_search_ratio": search_ratio,
f"{type}_return_ratio": return_ratio,
f"{type}_adv_spend": adv_spend,
f"{type}_majority_spend": majority_spend
}
def analysis_top_Newly_html(self, driver):
html_top = etree.HTML(driver.page_source)
products_aggregate_sales = html_top.xpath("//div[@class='sa-aggregation-label']/text()")
div_list = html_top.xpath("//div[@aria-label='Press SPACE to select this row.']")
items_list = []
seen = set()
for div in div_list:
items = {}
img_src_list = div.xpath('./div//img/@src')
img_src = img_src_list[0] if img_src_list else None
a_href_list = div.xpath(".//div//a/@href")
a_href = a_href_list[0] if a_href_list else None
title_list = div.xpath("./div//span[@class='popover-content']/text()")
title = title_list[0] if title_list else None
brand_list = div.xpath(".//span[contains(@id,'cell-brandName')]//div/text()")
brand = brand_list[0] if brand_list else None
bsr_rank_list = div.xpath(".//span[contains(@id,'cell-bsrBnRank')]//div/text()")
bsr_rank = bsr_rank_list[0] if bsr_rank_list else None
buy_price_list = div.xpath(".//span[contains(@id,'cell-buyBoxPrice')]//div/text()")
buy_price = buy_price_list[0] if buy_price_list else None
rating_list = div.xpath(".//span[contains(@id,'avgRating')]//kat-star-rating/@value")
rating = rating_list[0] if rating_list else None
review_list = div.xpath(".//span[contains(@id,'avgRating')]//kat-star-rating/@review")
review = review_list[0] if review_list else None
offers_list = div.xpath(".//span[contains(@id,'cell-offerCount')]/text()")
offers = offers_list[0] if offers_list else None
key = (img_src, a_href, title, brand, bsr_rank, buy_price, rating, review, offers)
if key in seen:
continue
seen.add(key)
if img_src and a_href and title and brand:
items['img_src'] = img_src
items['a_href'] = a_href
items['title'] = title
items['brand'] = brand
items['bsr_rank'] = bsr_rank
items['buy_price'] = buy_price
items['rating'] = rating
items['review'] = review
items['offers'] = offers
items_list.append(items)
most_popular_list = html_top.xpath("//div[@class='most-popular-keywords-container']/kat-list//li")
most_popular_keyword_list = []
if most_popular_list:
for most_popular in most_popular_list:
most_popular_keyword = most_popular.xpath('.//div[2]/text()')[0]
most_popular_b_nums = most_popular.xpath('.//div/b/text()')[0]
most_popular_keyword_list.append({
"most_popular_keywords": most_popular_keyword,
'most_popular_search_nums': most_popular_b_nums
})
top_ratio_list = html_top.xpath('//div[@class="big-text-section-name"][1]/div[@class="big-text"]/text()')
if top_ratio_list:
search_ratio = re.findall(r'(.*?)‰', top_ratio_list[0])[0]
return_ratio = re.findall(r'(.*?)%', top_ratio_list[1])[0]
else:
search_ratio = None
return_ratio = None
big_text_Advertisement_list = html_top.xpath(
'//h4[contains(text(),"Advertisement Spend")]/parent::div/following-sibling::div//text()'
)
big_text_Advertisement = '|-|'.join(big_text_Advertisement_list) if big_text_Advertisement_list else None
big_text_star_list = html_top.xpath(
'//h4[contains(text(),"Star Ratings")]/parent::div/parent::div/parent::div/div//@width'
)
if big_text_star_list:
del big_text_star_list[0]
big_text_star = '|-|'.join(big_text_star_list)
else:
big_text_star = None
data_dict = {
'products_aggregate_sales': products_aggregate_sales,
'asin_json': items_list,
'most_popular_json_dict': most_popular_keyword_list,
'search_ratio': search_ratio,
'return_ratio': return_ratio,
'big_text_Advertisement': big_text_Advertisement,
'big_text_star': big_text_star
}
return json.dumps(data_dict)
def new_top_grossing(self, driver, click_type):
try:
if click_type == 'Top':
def _do():
time.sleep(1.5)
driver.execute_script('document.querySelector("#section_id > kat-radiobutton:nth-child(2) > kat-label").click()')
time.sleep(1.5)
return self.analysis_top_Newly_html(driver)
return self._safe_action("click_top_grossing", _do, driver, site=self.site_name)
else:
def _do():
time.sleep(1.5)
driver.execute_script('document.querySelector("#section_id > kat-radiobutton:nth-child(3) > kat-label").click()')
time.sleep(1.5)
return self.analysis_top_Newly_html(driver)
return self._safe_action("click_newly_launched", _do, driver, site=self.site_name)
except Exception:
return "{}"
def read_category(self):
print('接着上次中断的继续')
self.mysql_connect(site=self.site_name)
select_sql = 'select category from seller_category_insights_syn where state =1'
df = self.engine_pg.read_sql(select_sql)
category_list = list(df.category)
print(category_list)
if category_list:
return category_list
else:
self.mysql_connect()
workflow_everyday_list = [
[self.site_name, self.y_w, '类目分析抓取完成', 3, f'{self.site_name}_aba_profit_category_insights', 'week', '类目分析', '是']
]
df_seller_asin_account = pd.DataFrame(data=workflow_everyday_list, columns=[
'site_name', 'date_info', 'status', 'status_val', 'table_name', 'date_type', 'page', 'is_end'
])
self.engine_us_mysql.to_sql(df_seller_asin_account, 'workflow_progress', if_exists='append')
def save_category(self, html):
Category_list = html.xpath('//h2[contains(text(),"Category")]/following-sibling::div/div')
Categorys_list = []
self.category_item = {}
Categorys_list_syn = []
for Category in Category_list:
Category_name = Category.xpath('./@id')
Category_label = Category.xpath('.//@label')
self.category_item[Category_label[0]] = Category_name[0]
Categorys_list.append(Category_name[0])
Categorys_list_syn.append([Category_label[0]])
if Categorys_list:
with self.engine_pg.begin() as conn:
global syn_state
if syn_state is False:
now = datetime.datetime.now()
is_monday = (now.weekday() == 0)
is_9_am = (now.hour == 11) # 保持你原判断
if is_monday and is_9_am:
TRUNCATE_SQL = 'TRUNCATE seller_category_insights_syn'
conn.execute(TRUNCATE_SQL)
conn.execute(
'INSERT INTO seller_category_insights_syn (category) VALUES (%s) '
'ON CONFLICT (category) DO UPDATE SET category = EXCLUDED.category',
Categorys_list_syn
)
print('存储初始数据成功')
return Categorys_list
def run(self):
self.mysql_connect(site=self.site_name)
restarts = 0
while restarts < self.MAX_DRIVER_RESTARTS:
driver = None
try:
driver = self._build_driver()
# 进入主流程(不改变逻辑)
self.get_category(self.site_name, driver)
# 如果 get_category 正常完成,说明全流程结束
return
except NeedRestart as e:
restarts += 1
print(f"[run] 捕获 NeedRestart,准备第 {restarts} 次重启:{e}")
except Exception as e:
restarts += 1
print(f"[run] 未知异常,准备第 {restarts} 次重启:{e}\n{traceback.format_exc()}")
finally:
if driver:
try:
driver.quit()
except Exception:
pass
time.sleep(3)
if __name__ == '__main__':
site = sys.argv[1].lower()
dow_category_Product(site).run()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment