Commit efc3b2e1 by Peng

no message

parent 6078b1ae
import time import hashlib
import re # import requests
import pandas as pd import json
import sys
import os import os
import urllib3
import random import random
import re
import sys
import time
import uuid import uuid
from urllib.parse import urlparse
from threading import Lock
import urllib3
from lxml import etree from lxml import etree
# import requests
import json
import hashlib
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from amazon_params.params import DB_REQUESTS_ASIN_PARAMS from amazon_params.params import DB_REQUESTS_ASIN_PARAMS
...@@ -19,6 +20,8 @@ from amazon_spider.VPS_IP import is_internet_available ...@@ -19,6 +20,8 @@ from amazon_spider.VPS_IP import is_internet_available
from datetime import datetime, timedelta from datetime import datetime, timedelta
import traceback import traceback
from curl_cffi import requests from curl_cffi import requests
from kafka.errors import KafkaTimeoutError
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
urllib3.disable_warnings() urllib3.disable_warnings()
...@@ -31,6 +34,9 @@ class Requests_param_val(BaseUtils): ...@@ -31,6 +34,9 @@ class Requests_param_val(BaseUtils):
self.proxy_name = 'Kdl_h10' self.proxy_name = 'Kdl_h10'
print("站点名称:", self.site_name, '抓取项目', "代理ip:", self.proxy_name) print("站点名称:", self.site_name, '抓取项目', "代理ip:", self.proxy_name)
self.cookies_queue = Queue() # cookie队列 self.cookies_queue = Queue() # cookie队列
self.kafuka_producer_str = self.kafuka_connect()
self.next_page_lock = Lock()
self.headers_num_int_s = 0
def init_db_names(self): def init_db_names(self):
self.engine_pg = self.pg_connect() self.engine_pg = self.pg_connect()
...@@ -38,7 +44,7 @@ class Requests_param_val(BaseUtils): ...@@ -38,7 +44,7 @@ class Requests_param_val(BaseUtils):
self.db_cookies = self.site_name + DB_REQUESTS_ASIN_PARAMS['db_cookies'][2:] self.db_cookies = self.site_name + DB_REQUESTS_ASIN_PARAMS['db_cookies'][2:]
self.db_ip_16yun = DB_REQUESTS_ASIN_PARAMS['db_ip_16yun'] self.db_ip_16yun = DB_REQUESTS_ASIN_PARAMS['db_ip_16yun']
def get_site_url(self, site_name): def get_site_url(self, site_name='us'):
if site_name == "us": if site_name == "us":
self.site_url = 'https://www.amazon.com/' self.site_url = 'https://www.amazon.com/'
self.host = 'www.amazon.com' self.host = 'www.amazon.com'
...@@ -97,7 +103,7 @@ class Requests_param_val(BaseUtils): ...@@ -97,7 +103,7 @@ class Requests_param_val(BaseUtils):
except: except:
break break
def get_cookie(self): def get_cookie(self, num=None):
print("获取cookie,并返回") print("获取cookie,并返回")
while True: while True:
if is_internet_available(): if is_internet_available():
...@@ -110,10 +116,11 @@ class Requests_param_val(BaseUtils): ...@@ -110,10 +116,11 @@ class Requests_param_val(BaseUtils):
self.engine = self.mysql_connect() self.engine = self.mysql_connect()
self.engine_pg = self.pg_connect() self.engine_pg = self.pg_connect()
with self.engine.begin() as conn: with self.engine.begin() as conn:
sql_read = f'SELECT cookies,id FROM {self.db_cookies} limit 350;' if num:
sql_read = f'SELECT cookies,id FROM {self.db_cookies} limit {num};'
else:
sql_read = f'SELECT cookies,id FROM {self.db_cookies} limit 300;'
print("获取cookie:", sql_read) print("获取cookie:", sql_read)
# a = conn.execute(sql_read)
# df_read = pd.DataFrame(a, columns=['cookies', 'id'])
df_read = self.engine.read_sql(sql_read) df_read = self.engine.read_sql(sql_read)
clientPriceList = list(df_read.cookies + "|-|" + df_read.id.astype("U")) clientPriceList = list(df_read.cookies + "|-|" + df_read.id.astype("U"))
for ck in clientPriceList: for ck in clientPriceList:
...@@ -130,9 +137,9 @@ class Requests_param_val(BaseUtils): ...@@ -130,9 +137,9 @@ class Requests_param_val(BaseUtils):
def db_column(self, site): def db_column(self, site):
if site in ('us', 'de', 'uk'): if site in ('us', 'de', 'uk'):
asin_detail_table = f'select * from {site}_asin_detail_month_2025 limit 1' asin_detail_table = f'select * from {site}_asin_detail_month_2026 limit 1'
else: else:
asin_detail_table = f'select * from {site}_asin_detail_2025 limit 1' asin_detail_table = f'select * from {site}_asin_detail_2026 limit 1'
print(asin_detail_table) print(asin_detail_table)
# df = pd.read_sql(asin_detail_table, con=self.engine_pg) # df = pd.read_sql(asin_detail_table, con=self.engine_pg)
df = self.engine_pg.read_sql(asin_detail_table) df = self.engine_pg.read_sql(asin_detail_table)
...@@ -141,10 +148,8 @@ class Requests_param_val(BaseUtils): ...@@ -141,10 +148,8 @@ class Requests_param_val(BaseUtils):
columns_list.remove('id') columns_list.remove('id')
columns_list.remove('updated_time') columns_list.remove('updated_time')
columns_list.remove('category_state') columns_list.remove('category_state')
if site in ('fr','es','it'): if site in ('fr', 'es', 'it'):
columns_list.append('week') columns_list.append('week')
print(len(columns_list))
print(columns_list)
return columns_list return columns_list
# 检查是返回源码是否正确 # 检查是返回源码是否正确
...@@ -167,7 +172,8 @@ class Requests_param_val(BaseUtils): ...@@ -167,7 +172,8 @@ class Requests_param_val(BaseUtils):
def check_amazon_allow_redirects(self, response_url, asin): def check_amazon_allow_redirects(self, response_url, asin):
if ("keywords" in response_url) or ("dp/" not in response_url) or ( if ("keywords" in response_url) or ("dp/" not in response_url) or (
"ref=" in response_url and "encoding=" in response_url) or (asin not in response_url) or ( "ref=" in response_url and "encoding=" in response_url) or (asin not in response_url) or (
"ASIN=" in response_url and "ref_=lx_bd" in response_url)or('ref=rd_fr_' in response_url and f'ref=rd_fr_{asin}'in response_url)\ "ASIN=" in response_url and "ref_=lx_bd" in response_url) or (
'ref=rd_fr_' in response_url and f'ref=rd_fr_{asin}' in response_url) \
or ('&ASIN=' in response_url): or ('&ASIN=' in response_url):
return True return True
...@@ -175,7 +181,8 @@ class Requests_param_val(BaseUtils): ...@@ -175,7 +181,8 @@ class Requests_param_val(BaseUtils):
# 检查邮编是否正确。 # 检查邮编是否正确。
def check_amazon_ingress(self, ingress): def check_amazon_ingress(self, ingress):
if ("中国大陆" in ingress) or ("China" in ingress) or ("Hong" in ingress) or ("Chine" in ingress) or ( if ("中国大陆" in ingress) or ("China" in ingress) or ("Hong" in ingress) or ("Chine" in ingress) or (
"Cina" in ingress) or ("Update location" in ingress) or ('香' in ingress) or ("location" in ingress): "Cina" in ingress) or ("Update location" in ingress) or ('香' in ingress) or ("location" in ingress)\
or ('Vereinigte' in ingress):
return True return True
# 检查请求是否出现验证码: # 检查请求是否出现验证码:
...@@ -190,30 +197,31 @@ class Requests_param_val(BaseUtils): ...@@ -190,30 +197,31 @@ class Requests_param_val(BaseUtils):
# 组装请求头, # 组装请求头,
def requests_amazon_headers(self, host=None, site_url=None, asin=None, scraper_url=None): def requests_amazon_headers(self, host=None, site_url=None, asin=None, scraper_url=None):
n = random.randint(118, 124) n = random.randint(120, 142)
# Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36
ua = f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{n}.0.{random.randint(1000, 5000)}.{random.randint(1, 181)} Safari/537.36' ua = f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{n}.0.{random.randint(1000, 5000)}.{random.randint(1, 181)} Safari/537.36'
# ua = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
headers = { headers = {
'connection': 'close', 'connection': 'close',
'authority': host, 'authority': urlparse(self.site_url).hostname,
'accept': 'text/html,*/*', 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'accept-language': 'zh-CN,zh;q=0.9', 'accept-language': 'zh-CN,zh;q=0.9',
'accept-Encodin': 'gzip, deflate, br, zstd',
'cache-control': 'no-cache', 'cache-control': 'no-cache',
'content-type': 'application/x-www-form-urlencoded;charset=UTF-8', 'content-type': 'application/x-www-form-urlencoded;charset=UTF-8',
'sec-ch-ua-mobile': '?0', 'sec-ch-ua-mobile': '?0',
'user-agent': ua, 'user-agent': ua,
"Host": self.host, "pragma": "no-cache",
"Pragma": "no-cache",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "none",
"Sec-Fetch-User": "?1",
"Upgrade-Insecure-Requests": "1",
} }
# if asin: if asin:
# headers['origin'] = f'{site_url}dp/{asin}' headers['origin'] = f'{site_url}dp/{asin}'
# headers['referer'] = f'{site_url}dp/{asin}' headers['referer'] = f'{site_url}?th=1'
if scraper_url: if scraper_url:
headers['origin'] = scraper_url headers['origin'] = scraper_url
headers['referer'] = scraper_url headers['referer'] = scraper_url
alphabet = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', alphabet = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r',
's', 't', 'u', 'v', 'w', 'x', 'y', 'z'] 's', 't', 'u', 'v', 'w', 'x', 'y', 'z']
k = "" k = ""
...@@ -245,7 +253,8 @@ class Requests_param_val(BaseUtils): ...@@ -245,7 +253,8 @@ class Requests_param_val(BaseUtils):
if ingress: if ingress:
if ("Page Not Found" in resp.text) or ( if ("Page Not Found" in resp.text) or (
"We are sorry! This Gift Card is not available" in resp.text) or ( "We are sorry! This Gift Card is not available" in resp.text) or (
"500 - An error occurred" in resp.text) or ("Sorry! Something went wrong!" in resp.text): "500 - An error occurred" in resp.text) or (
"Sorry! Something went wrong!" in resp.text):
return None return None
else: else:
return resp.text return resp.text
...@@ -276,29 +285,33 @@ class Requests_param_val(BaseUtils): ...@@ -276,29 +285,33 @@ class Requests_param_val(BaseUtils):
def get_cookie_str(self, cookies_queue): def get_cookie_str(self, cookies_queue):
while 1: while 1:
cookie_str = cookies_queue.get() cookie_str = cookies_queue.get()
if len(cookie_str) > 50: try:
try: cookie_lsit = json.loads(cookie_str)
cookie_lsit = json.loads(cookie_str) except:
except: cookie_lsit = eval(cookie_str)
cookie_lsit = eval(cookie_str) cookie_dic = {}
cookie_dic = {} try:
try: for i in cookie_lsit:
for i in cookie_lsit: if i:
if i: cookie_dic[i["name"]] = i["value"]
cookie_dic[i["name"]] = i["value"] else:
else: continue
continue cookie_str = ''
cookie_str = '' for k, v in cookie_dic.items():
for k, v in cookie_dic.items(): cookie_str = cookie_str + str(k) + '=' + str(v) + ';'
cookie_str = cookie_str + str(k) + '=' + str(v) + ';' break
break except:
except: cookie_str = ''
cookie_str = '' for k, v in cookie_lsit.items():
for k, v in cookie_lsit.items(): cookie_str = cookie_str + str(k) + '=' + str(v) + ';'
cookie_str = cookie_str + str(k) + '=' + str(v) + ';'
break
else:
break break
if self.site_name == 'uk':
cookie_str = cookie_str.replace('i18n-prefs=HKD;', 'i18n-prefs=GBP;').replace('i18n-prefs=USD;', 'i18n-prefs=GBP;')
elif self.site_name == 'de':
cookie_str = cookie_str.replace('i18n-prefs=HKD;', 'i18n-prefs=EUR;').replace('i18n-prefs=USD;', 'i18n-prefs=EUR;')
elif self.site_name == 'us':
cookie_str = cookie_str.replace('i18n-prefs=HKD;', 'i18n-prefs=USD;')
return cookie_str return cookie_str
# 获取自增id区间。根据传的站点获取对应的月 周 syn表的id # 获取自增id区间。根据传的站点获取对应的月 周 syn表的id
...@@ -312,10 +325,10 @@ class Requests_param_val(BaseUtils): ...@@ -312,10 +325,10 @@ class Requests_param_val(BaseUtils):
with self.engine.begin() as conn: with self.engine.begin() as conn:
if self.site_name in ('us', 'de', 'uk'): if self.site_name in ('us', 'de', 'uk'):
if state == 3 and minid_maxid: if state == 3 and minid_maxid:
sql_update = f"""UPDATE {self.site_name}_syn_asin_all_minid_maxid set state=3 where minid_maxid='{minid_maxid}' and yaer_month = '2025_{month}'""" sql_update = f"""UPDATE {self.site_name}_syn_asin_all_minid_maxid set state=3 where minid_maxid='{minid_maxid}' and yaer_month = '2026_{month}'"""
print(sql_update) print(sql_update)
conn.execute(sql_update) conn.execute(sql_update)
sql_read = f"""SELECT id, minid_maxid FROM {self.site_name}_syn_asin_all_minid_maxid WHERE STATE = 1 and yaer_month = '2025_{month}' LIMIT 1""" sql_read = f"""SELECT id, minid_maxid FROM {self.site_name}_syn_asin_all_minid_maxid WHERE STATE = 1 and yaer_month = '2026_{month}' LIMIT 1"""
print('sql_read:::', sql_read) print('sql_read:::', sql_read)
else: else:
if state == 2 and minid_maxid: if state == 2 and minid_maxid:
...@@ -348,5 +361,35 @@ class Requests_param_val(BaseUtils): ...@@ -348,5 +361,35 @@ class Requests_param_val(BaseUtils):
# 获取哈希的十六进制表示 # 获取哈希的十六进制表示
md5_hex_digest = md5_hash.hexdigest() md5_hex_digest = md5_hash.hexdigest()
return md5_hex_digest return md5_hex_digest
if __name__ == '__main__':
Requests_param_val(site_name='uk').get_minid_maxid(month='07',state=1) def on_send_success(self, record_metadata):
print(f"消息发送成功: {record_metadata.topic}-{record_metadata.partition}-{record_metadata.offset}")
def on_send_error(self, excp):
print("消息发送失败", excp)
def send_kafka(self, items=None, html_data=None, topic=None, num=3):
print('向Kafka发送数据')
for i in range(5):
try:
if items:
del items['div_id_list']
future = self.kafuka_producer_str.send(topic, json.dumps(items))
future.add_callback(self.on_send_success).add_errback(self.on_send_error)
future.get(30)
if html_data:
future = self.kafuka_producer_str.send(topic, html_data)
future.add_callback(self.on_send_success).add_errback(self.on_send_error)
future.get(30)
print('向Kafka发送数据 发送成功')
with self.next_page_lock:
self.headers_num_int_s += 1
if self.headers_num_int_s % 10 == 0:
self.kafuka_producer_str.flush()
break
except Exception as e:
print(f"kafka发送失败(第{i + 1}/5次)", e)
time.sleep(2)
if i >= 1 and i % 2 == 1:
self.kafuka_producer_str = self.kafuka_connect(acks=True)
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment