Commit 409ca255 by Peng

no message

parent 4825673f
...@@ -3,24 +3,20 @@ import os ...@@ -3,24 +3,20 @@ import os
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from utils.db_connect import BaseUtils from utils.db_connect import BaseUtils
from amazon_params import py_ja3
from amazon_params.params import DB_REQUESTS_ASIN_PARAMS from amazon_params.params import DB_REQUESTS_ASIN_PARAMS
from utils.requests_param import Requests_param_val from utils.requests_param import Requests_param_val
from queue import Queue from queue import Queue, Empty
import time import time
import random import random
from lxml import etree from lxml import etree
import json import json
from curl_cffi import requests from curl_cffi import requests
import requests as requests2
sess = requests2.Session()
import traceback import traceback
import pandas as pd import pandas as pd
import threading import threading
import urllib3 import urllib3
import re import re
import uuid
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
urllib3.disable_warnings() urllib3.disable_warnings()
...@@ -78,9 +74,12 @@ class async_account_name_products(BaseUtils): ...@@ -78,9 +74,12 @@ class async_account_name_products(BaseUtils):
def get_product(self, t_num): def get_product(self, t_num):
while True: while True:
time.sleep(0.3) try:
if self.queries_asin_queue.empty() == False: querys = self.queries_asin_queue.get_nowait()
querys = self.queries_asin_queue.get() except Empty:
print(f"当前线程-{t_num} 已完成-爬取-跳出循环")
break
else:
thread_num = len(threading.enumerate()) thread_num = len(threading.enumerate())
if self.cookies_queue.empty(): if self.cookies_queue.empty():
cookies_dict = self.reuests_para_val.get_cookie() cookies_dict = self.reuests_para_val.get_cookie()
...@@ -96,10 +95,8 @@ class async_account_name_products(BaseUtils): ...@@ -96,10 +95,8 @@ class async_account_name_products(BaseUtils):
headers["cookie"] = cookie_str headers["cookie"] = cookie_str
try: try:
print(self.headers_num_int, '请求url: ', scraper_url) print(self.headers_num_int, '请求url: ', scraper_url)
sess.mount(self.site_url, py_ja3.DESAdapter()) resp = requests.get(scraper_url, headers=headers,
resp = sess.get(scraper_url, headers=headers, timeout=10, verify=False, impersonate="chrome")
timeout=10, verify=False)
resp.close()
if self.reuests_para_val.check_amazon_yzm(resp): if self.reuests_para_val.check_amazon_yzm(resp):
print(f"{self.site_name} 站点 + 使用代理ip出现验证码:{scraper_url}") print(f"{self.site_name} 站点 + 使用代理ip出现验证码:{scraper_url}")
time.sleep(random.uniform(1.5, 5.5)) time.sleep(random.uniform(1.5, 5.5))
...@@ -138,7 +135,7 @@ class async_account_name_products(BaseUtils): ...@@ -138,7 +135,7 @@ class async_account_name_products(BaseUtils):
continue continue
try: try:
ingress = ingress[0].strip() ingress = ingress[0].strip()
except: except Exception:
ingress = None ingress = None
print("获取邮编錯誤:") print("获取邮编錯誤:")
print(ingress, '邮编 ') print(ingress, '邮编 ')
...@@ -147,7 +144,7 @@ class async_account_name_products(BaseUtils): ...@@ -147,7 +144,7 @@ class async_account_name_products(BaseUtils):
"Cina" in ingress): "Cina" in ingress):
try: try:
cookie_ubid_main_id = re.findall(r'ubid-main=(.*?);', cookie_str)[0] cookie_ubid_main_id = re.findall(r'ubid-main=(.*?);', cookie_str)[0]
except: except Exception:
cookie_ubid_main_id = re.findall(r'session-id=(.*?);', cookie_str)[0] cookie_ubid_main_id = re.findall(r'session-id=(.*?);', cookie_str)[0]
for cookie_key_value in self.cookie_dict_delete_id.items(): for cookie_key_value in self.cookie_dict_delete_id.items():
if cookie_ubid_main_id in cookie_key_value[1]: if cookie_ubid_main_id in cookie_key_value[1]:
...@@ -162,10 +159,11 @@ class async_account_name_products(BaseUtils): ...@@ -162,10 +159,11 @@ class async_account_name_products(BaseUtils):
continue continue
# 获取产品总数 # 获取产品总数
results_span_list = response_s.xpath( results_span_list = response_s.xpath(
'//span[contains(text(),"results")]/text()|//div[@class="a-section a-spacing-small a-spacing-top-small"]//span/text()') '//span[contains(text(),"result")]/text()|//div[@class="a-section a-spacing-small a-spacing-top-small"]//span/text()')
results_list = [] results_list = []
if len(results_span_list) > 0: if len(results_span_list) > 0:
ele_text = results_span_list[0].replace(".", "").replace(",", "").replace("\xa0", "") ele_text = results_span_list[0].replace(".", "").replace(",", "").replace("\xa0", "")
print('results_span_list:::', results_span_list)
ele_a = re.findall("\d+-\d+", ele_text) ele_a = re.findall("\d+-\d+", ele_text)
if len(ele_a) == 0: if len(ele_a) == 0:
ele_a = re.findall("\d+–\d+", ele_text) ele_a = re.findall("\d+–\d+", ele_text)
...@@ -181,7 +179,7 @@ class async_account_name_products(BaseUtils): ...@@ -181,7 +179,7 @@ class async_account_name_products(BaseUtils):
results_int = 1 results_int = 1
num_int = int(results_int) num_int = int(results_int)
self.seller_account_num_list.append((seller_id, num_int)) self.seller_account_num_list.append((seller_id, num_int))
except: except Exception:
num_int = 0 num_int = 0
print("店铺 产品总数:", seller_id, account_name, num_int) print("店铺 产品总数:", seller_id, account_name, num_int)
products_asin_link_list = response_s.xpath( products_asin_link_list = response_s.xpath(
...@@ -194,13 +192,12 @@ class async_account_name_products(BaseUtils): ...@@ -194,13 +192,12 @@ class async_account_name_products(BaseUtils):
# 获取 asin 位置 # 获取 asin 位置
asin_href_list = response_s.xpath(f"//div[@data-asin='{products_asin}']//a/@href") asin_href_list = response_s.xpath(f"//div[@data-asin='{products_asin}']//a/@href")
if len(asin_href_list) > 0: if len(asin_href_list) > 0:
asin_href_list = response_s.xpath(f"//div[@data-asin='{products_asin}']//a/@href")
asin_href_join = ''.join(asin_href_list) asin_href_join = ''.join(asin_href_list)
row_num_lsit = re.findall(fr"{products_asin}/ref=sr_1_(\d+)\?", asin_href_join) row_num_lsit = re.findall(fr"{products_asin}/ref=sr_1_(\d+)\?", asin_href_join)
try: try:
row_num = row_num_lsit[0] if row_num_lsit else 0 row_num = row_num_lsit[0] if row_num_lsit else 0
row_num_int = int(row_num) row_num_int = int(row_num)
except: except Exception:
row_num_int = 0 row_num_int = 0
else: else:
row_num_int = 0 row_num_int = 0
...@@ -228,7 +225,9 @@ class async_account_name_products(BaseUtils): ...@@ -228,7 +225,9 @@ class async_account_name_products(BaseUtils):
f'//div[@data-asin="{products_asin}"]//span[contains(@class,"a-size-base s-")]//text()') f'//div[@data-asin="{products_asin}"]//span[contains(@class,"a-size-base s-")]//text()')
if len(asin_review) == 0: if len(asin_review) == 0:
asin_review = response_s.xpath( asin_review = response_s.xpath(
f'//div[@data-asin="{products_asin}"]//span[@class="a-color-link"]//text()') f'//div[@data-asin="{products_asin}"]//span[@class="a-color-link"]//text()|//div[@data-asin="{products_asin}"]//span[contains(@class,"-normal-weight-text")]/text()')
print('asin_priceasin_review::::',asin_review)
total_comments = None total_comments = None
if asin_review: if asin_review:
number_of_reviews = asin_review[0].strip().replace(')', '').replace('(', '') number_of_reviews = asin_review[0].strip().replace(')', '').replace('(', '')
...@@ -259,9 +258,9 @@ class async_account_name_products(BaseUtils): ...@@ -259,9 +258,9 @@ class async_account_name_products(BaseUtils):
elif self.site_name == 'de': elif self.site_name == 'de':
if "Sternebewertungen" in number_of_reviews: if "Sternebewertungen" in number_of_reviews:
total_comments = \ total_comments = \
re.findall(r"(.*) Sternebewertungen", number_of_reviews[0])[0] re.findall(r"(.*) Sternebewertungen", number_of_reviews)[0]
elif "Sternebewertung" in number_of_reviews[0]: elif "Sternebewertung" in number_of_reviews:
total_comments = re.findall(r"(.*) Sternebewertung", number_of_reviews[0])[ total_comments = re.findall(r"(.*) Sternebewertung", number_of_reviews)[
0] 0]
else: else:
total_comments = number_of_reviews.replace('\xa0', '').strip() total_comments = number_of_reviews.replace('\xa0', '').strip()
...@@ -271,6 +270,9 @@ class async_account_name_products(BaseUtils): ...@@ -271,6 +270,9 @@ class async_account_name_products(BaseUtils):
else: else:
total_comments = number_of_reviews total_comments = number_of_reviews
if total_comments: if total_comments:
if 'K' in total_comments or 'k' in total_comments:
num = float(total_comments.replace('K', '').replace('k', '').replace(',', '').replace('\xa0', '').strip())
total_comments = str(int(num * 1000))
reviews = total_comments.replace('.', '').replace(',', '').replace('\xa0', reviews = total_comments.replace('.', '').replace(',', '').replace('\xa0',
'').strip() '').strip()
try: try:
...@@ -278,7 +280,7 @@ class async_account_name_products(BaseUtils): ...@@ -278,7 +280,7 @@ class async_account_name_products(BaseUtils):
pass pass
else: else:
reviews = 0 reviews = 0
except: except Exception:
reviews = 0 reviews = 0
else: else:
reviews = None reviews = None
...@@ -316,6 +318,7 @@ class async_account_name_products(BaseUtils): ...@@ -316,6 +318,7 @@ class async_account_name_products(BaseUtils):
try: try:
asin_price = response_s.xpath( asin_price = response_s.xpath(
f'//div[@data-asin="{products_asin}"]//span[@class="a-offscreen"]//text()') f'//div[@data-asin="{products_asin}"]//span[@class="a-offscreen"]//text()')
print('asin_price::::',asin_price)
if asin_price: if asin_price:
if self.site_name == 'us': if self.site_name == 'us':
prices = asin_price[0].replace("$", "").replace("£", "").replace("€", prices = asin_price[0].replace("$", "").replace("£", "").replace("€",
...@@ -335,7 +338,7 @@ class async_account_name_products(BaseUtils): ...@@ -335,7 +338,7 @@ class async_account_name_products(BaseUtils):
try: try:
rating = rating.replace(',', '.') rating = rating.replace(',', '.')
rating = round(float(rating), 2) rating = round(float(rating), 2)
except: except Exception:
rating = 0 rating = 0
if price: if price:
try: try:
...@@ -344,7 +347,7 @@ class async_account_name_products(BaseUtils): ...@@ -344,7 +347,7 @@ class async_account_name_products(BaseUtils):
price = None price = None
if price: if price:
price = round(float(price), 2) price = round(float(price), 2)
except: except Exception:
price = None price = None
buy_data_list = response_s.xpath( buy_data_list = response_s.xpath(
f"//div[@data-asin='{products_asin}']//span[contains(text(),'bought')]/text()") # 月销 f"//div[@data-asin='{products_asin}']//span[contains(text(),'bought')]/text()") # 月销
...@@ -360,9 +363,6 @@ class async_account_name_products(BaseUtils): ...@@ -360,9 +363,6 @@ class async_account_name_products(BaseUtils):
else: else:
if len(products_asin_link_list) == 0: if len(products_asin_link_list) == 0:
self.asin_not_sure_list.append(seller_id) self.asin_not_sure_list.append(seller_id)
else:
print(f"当前线程-{t_num} 已完成-爬取-跳出循环")
break
def init_list(self): def init_list(self):
print("=======清空变量==========") print("=======清空变量==========")
...@@ -422,7 +422,7 @@ class async_account_name_products(BaseUtils): ...@@ -422,7 +422,7 @@ class async_account_name_products(BaseUtils):
into_workflow_progress = f"INSERT INTO workflow_progress (page, date_info, site_name, date_type, is_end, status_val, status, table_name) VALUES ('店铺产品', '{self.year_month.replace('_', '-')}', '{self.site_name}', 'month', '否', 3, '抓取结束','{self.site_name}_asin_detail_product');" into_workflow_progress = f"INSERT INTO workflow_progress (page, date_info, site_name, date_type, is_end, status_val, status, table_name) VALUES ('店铺产品', '{self.year_month.replace('_', '-')}', '{self.site_name}', 'month', '否', 3, '抓取结束','{self.site_name}_asin_detail_product');"
cursor_us.execute(into_workflow_progress) cursor_us.execute(into_workflow_progress)
connect_us.commit() connect_us.commit()
except: except Exception:
into_workflow_progress = f"update workflow_progress set status_val=3,status='抓取结束' where page='店铺产品' and date_info='{self.year_month.replace('_', '-')}' and site_name='{self.site_name}' and date_type='month'" into_workflow_progress = f"update workflow_progress set status_val=3,status='抓取结束' where page='店铺产品' and date_info='{self.year_month.replace('_', '-')}' and site_name='{self.site_name}' and date_type='month'"
print(into_workflow_progress) print(into_workflow_progress)
cursor_us.execute(into_workflow_progress) cursor_us.execute(into_workflow_progress)
...@@ -436,7 +436,7 @@ class async_account_name_products(BaseUtils): ...@@ -436,7 +436,7 @@ class async_account_name_products(BaseUtils):
} }
try: try:
requests.post(url=url, data=data, timeout=15) requests.post(url=url, data=data, timeout=15)
except: except Exception:
pass pass
cursor_us.close() cursor_us.close()
connect_us.close() connect_us.close()
...@@ -475,21 +475,21 @@ class async_account_name_products(BaseUtils): ...@@ -475,21 +475,21 @@ class async_account_name_products(BaseUtils):
self.engine = self.mysql_connect() self.engine = self.mysql_connect()
sql_read = f'SELECT account_name, id, seller_id FROM {self.db_seller_account_syn} WHERE product_state=1 LIMIT {self.read_size} for update;' sql_read = f'SELECT account_name, id, seller_id FROM {self.db_seller_account_syn} WHERE product_state=1 LIMIT {self.read_size} for update;'
print(sql_read) print(sql_read)
self.df_read = self.engine.read_sql(sql_read) self.df_read = self.engine.read_then_update(
if self.df_read.shape[0] == 0: select_sql=sql_read,
update_table=f"{self.db_seller_account_syn}",
set_values={"product_state": 2},
where_keys=["id"],
)
if self.df_read.shape[0] > 0:
asin_list = list(self.df_read.account_name + '|-|' + self.df_read.seller_id)
return asin_list
elif self.df_read.shape[0] == 0:
self.stop_item_queue = False self.stop_item_queue = False
return [] return []
with self.engine.begin() as conn:
self.index_tuple = tuple(self.df_read['id'])
if len(self.index_tuple) == 1:
sql_update = f"""UPDATE {self.db_seller_account_syn} a set product_state=2 where a.id in ({self.index_tuple[0]})"""
else:
sql_update = f"""UPDATE {self.db_seller_account_syn} a set product_state=2 where a.id in {self.index_tuple}"""
conn.execute(sql_update)
asin_list = list(self.df_read.account_name + '|-|' + self.df_read.seller_id)
return asin_list
except Exception as e: except Exception as e:
print("读取数据出bug并等待5s继续", e, f"\n{traceback.format_exc()}") print("读取数据出bug并等待5s继续", e, f"\n{traceback.format_exc()}")
time.sleep(5)
continue continue
def save_data(self): def save_data(self):
...@@ -557,10 +557,12 @@ class async_account_name_products(BaseUtils): ...@@ -557,10 +557,12 @@ class async_account_name_products(BaseUtils):
def db_change_state(self, state=2): def db_change_state(self, state=2):
if state == 1: if state == 1:
self.db_change_state_common(state=state, account_name_list=self.requests_error_asin_list) self.db_change_state_common(state=state, account_name_list=self.requests_error_asin_list)
if state == 3: elif state == 3:
self.db_change_state_common(state=state, account_name_list=self.account_name_list_update) self.db_change_state_common(state=state, account_name_list=self.account_name_list_update)
elif state == 4: elif state == 4:
self.db_change_state_common(state=state, account_name_list=self.asin_not_found_list) self.db_change_state_common(state=state, account_name_list=self.asin_not_found_list)
elif state == 5:
self.db_change_state_common(state=state, account_name_list=self.asin_not_seller_id)
elif state == 6: elif state == 6:
self.db_change_state_common(state=state, account_name_list=self.asin_not_sure_list) self.db_change_state_common(state=state, account_name_list=self.asin_not_sure_list)
...@@ -583,4 +585,7 @@ class async_account_name_products(BaseUtils): ...@@ -583,4 +585,7 @@ class async_account_name_products(BaseUtils):
break break
except Exception as e: except Exception as e:
print(f"更改{self.db_seller_account_syn}表的state={state}出错", e, f"\n{traceback.format_exc()}") print(f"更改{self.db_seller_account_syn}表的state={state}出错", e, f"\n{traceback.format_exc()}")
continue continue
\ No newline at end of file
if __name__ == '__main__':
async_account_name_products('us', read_size=20, proxy_name=None, week=5).run()
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment