Commit e94e2cdd by Peng

把两个ai代码合并一起。都是根据任务类型来抓取asin详情。存储。两份代码有很大一部分有重合的。

ai_asin_spider
asin_analyze_spider
合并。已经测试完成
parent 58d7dda5
import gzip
import os
import sys
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from utils.secure_db_client import get_remote_engine
from amazon_spider.VPS_IP import pppoe_ip
from amazon_params import py_ja3
from utils.asin_parse import ParseAsinUs
from utils.requests_param import Requests_param_val
from queue import Queue
from lxml import etree
import requests
import urllib3
from datetime import datetime
import json
import pandas as pd
import threading
import time
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
sess = requests.Session()
urllib3.disable_warnings()
import ast
class ai_async_asin_pg():
def __init__(self, site_name='us', spider_type=None):
self.spider_type = spider_type
self.site_name = site_name
self.queries_asin_queue = Queue() # 需要爬取的asin队列
self.item_queue = Queue() # 存储 item 详情数据队列
self.pg_connect()
self.sp_asin_queue = Queue()
self.spider_state = None
self.update_ai_asin_analyze_log_list = []
month = time.strftime("%m")
day = time.strftime("%d")
if int(day) > 10:
_month = int(month)
else:
if int(month) > 1:
_month = int(month) - 1 # 上个月
else:
_month = int(month)
if _month < 10:
_month = str(f'0{_month}')
self.topic_asin_html = f'asin_html_2025_{str(_month)}'
def get_params(self, site_name='us'):
# 站点
self.site_name = site_name # 站点
self.reuests_para_val = Requests_param_val(site_name=self.site_name)
self.cookies_queue = Queue() # cookie队列
self.cookie_dict_delete_id = {}
# 返回 对应站点的host,首页链接
self.site_url, self.host = self.reuests_para_val.get_site_url(self.site_name)
if self.cookies_queue.empty():
cookies_dict = self.reuests_para_val.get_cookie(num=168)
self.cookie_dict_delete_id = cookies_dict
for ck in cookies_dict.values():
self.cookies_queue.put(ck)
def pg_connect(self):
self.engine_pg = get_remote_engine(
site_name=self.site_name, # -> database "selection"
db_type='postgresql_15_outer', # -> 服务端 alias "mysql"
)
return self.engine_pg
def get_asin(self):
while True:
if self.queries_asin_queue.empty() == False and self.spider_state is None:
asin_queu = self.queries_asin_queue.get()
elif self.sp_asin_queue.empty() == False:
self.spider_state = '竞品asin'
print('执行竞品asin 抓取')
asin_queu = self.sp_asin_queue.get()
else:
break
# ['B09658Q5RP|-|82|-|us|-|6248', 'B0CSPVS7JL|-|82|-|us|-|6249']
print('::asin_queu::: ', asin_queu)
queu_list = asin_queu.split('|-|')
print('queu_list:::', queu_list)
if self.spider_type:
asin = queu_list[0]
task_id = queu_list[1]
site_name = queu_list[2]
module = queu_list[3]
id_str = ''
if module == 'Amazon:asin':
sub_step = 'Amazon:asin:竞品'
elif module == 'Amazon:asinList':
sub_step = 'Amazon:asinList:详情'
else:
sub_step = None
self.get_params(site_name=site_name)
else:
asin = queu_list[0]
task_id = queu_list[1]
site_name = queu_list[2]
id_str = queu_list[3]
sub_step = queu_list[4]
if self.cookies_queue.empty():
cookies_dict = self.reuests_para_val.get_cookie()
self.cookie_dict_delete_id = cookies_dict
for ck in cookies_dict.values():
self.cookies_queue.put(ck)
# 获取组装cookie
cookie_str = self.reuests_para_val.get_cookie_str(self.cookies_queue)
headers = self.reuests_para_val.requests_amazon_headers(host=self.host, site_url=self.site_url,
asin=asin, scraper_url=None)
headers["cookie"] = cookie_str
scraper_url = self.site_url + 'dp/' + asin + "?th=1&psc=1"
print('scraper_url::', scraper_url)
try:
sess.mount(self.site_url, py_ja3.DESAdapter())
resp = sess.get(scraper_url, headers=headers,
timeout=10, verify=False)
# with open(rf'{self.site_name}_22_{asin}.html', 'w', encoding='utf-8')as f:
# f.write(resp.text)
if self.reuests_para_val.check_amazon_yzm(resp):
print('出现验证码,。asin---> ', asin)
if self.spider_state == '竞品asin':
self.sp_asin_queue.put(
asin + '|-|' + task_id + '|-|' + site_name + '|-|' + id_str)
else:
self.queries_asin_queue.put(asin + '|-|' + task_id + '|-|' + site_name + '|-|' + id_str)
continue
except Exception as e:
print("请求错误错误: 。asin---> ", asin, '错误:', e)
if self.spider_state == '竞品asin':
self.sp_asin_queue.put(
asin + '|-|' + task_id + '|-|' + site_name + '|-|' + id_str)
else:
self.queries_asin_queue.put(asin + '|-|' + task_id + '|-|' + site_name + '|-|' + id_str)
continue
response_url = resp.url
response = resp.text
response_s = etree.HTML(response)
if self.reuests_para_val.check_amazon_not_page(response): # asin 已下架 状态 4 Listen Now
continue
if self.reuests_para_val.check_amazon_page(response, response_url): # 检查是不是正常商品页面
continue
if self.reuests_para_val.check_amazon_allow_redirects(response_url, asin): # 检查是否被重定向
continue
# 获取邮编
try:
ingress = response_s.xpath("//span[@id='glow-ingress-line2']/text()")
print(ingress, ' 打印 邮编 ', resp.url)
except Exception as e:
print('asin 不是正常页面', asin)
continue
try:
ingress = ingress[0].strip()
except:
ingress = None
if ingress:
if self.reuests_para_val.check_amazon_ingress(ingress):
if self.spider_state == '竞品asin':
self.sp_asin_queue.put(
asin + '|-|' + task_id + '|-|' + site_name + '|-|' + id_str)
else:
self.queries_asin_queue.put(asin + '|-|' + task_id + '|-|' + site_name + '|-|' + id_str)
continue
div_dp = response_s.xpath('//div[@id="dp"]')
if div_dp:
items = ParseAsinUs(resp=response, asin=asin, site_name=self.site_name).xpath_html()
new_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
item = {'asin': items["asin"], 'task_id': task_id, 'id': id_str, 'sub_step': sub_step,
'url': scraper_url,
'title': items["title"], 'img_url': items["img_url"],
'rating': items["rating"], 'total_comments': items["total_comments"],
'price': items["price"], "rank": items["rank"], 'category': items["category"],
'launch_time': items["launch_time"], 'volume': items["volume"],
'weight': items["weight"], "page_inventory": items["page_inventory"],
"buy_box_seller_type": items["buy_box_seller_type"],
"asin_vartion_list": items["asin_vartion_list"], 'title_len': items["title_len"],
'img_num': items["img_num"], 'img_type': items["img_type"],
'activity_type': items["activity_type"],
'one_two_val': items["one_two_val"], 'three_four_val': items["three_four_val"],
'eight_val': items["eight_val"],
'qa_num': items["qa_num"], 'five_star': items["five_star"], 'four_star': items["four_star"],
'three_star': items["three_star"],
'two_star': items["two_star"], 'one_star': items["one_star"], 'low_star': items["low_star"],
'together_asin': items["together_asin"],
'brand': items["brand"], 'ac_name': items["ac_name"], 'material': items["material"],
'node_id': items["node_id"],
'sp_num': items["sp_num"], 'describe': items["describe"],
'weight_str': items["weight_str"], 'package_quantity': items['package_quantity'],
'pattern_name': items['pattern_name'], 'seller_id': items["seller_id"],
'variat_num': items['variat_num'],
'site_name': self.site_name, 'best_sellers_rank': items["best_sellers_rank"],
'best_sellers_herf': items["best_sellers_herf"], 'account_url': items["account_url"],
'account_name': items["account_name"], 'parentAsin': items["parentAsin"],
'asinUpdateTime': new_date, 'follow_sellers': items['sellers_num'],
'all_best_sellers_herf': items['all_best_sellers_herf'],
'product_description': items['product_description'], 'buy_sales': items['buySales'],
'image_view': items['image_view'], 'product_json': items['product_json'],
'product_detail_json': items['productdetail_json'],
'review_ai_text': items['review_ai_text'], 'review_label_json': items['review_label_json'],
'lob_asin_json': items['lob_asin_json'],
'sp_initial_seen_asins_json': items['sp_initial_seen_asins_json'],
'sp_4stars_initial_seen_asins_json': items['sp_4stars_initial_seen_asins_json'],
'sp_delivery_initial_seen_asins_json': items['sp_delivery_initial_seen_asins_json'],
'compare_similar_asin_json': items['compare_similar_asin_json'],
'customer_reviews_json': items['customer_reviews_json'],
'together_asin_json': items['together_asin_json'],
'min_match_asin_json': items['min_match_asin_json'], 'seller_json': items['seller_json'],
'created_time': new_date, 'current_asin': items['current_asin'],
'parent_asin': items["parentAsin"],
'bundles_this_asins_json': items['bundles_this_asins_data_json'],
'video_m3u8_url': items["video_m3u8"], 'result_list_json': items['result_list_json'],
'bundle_asin_component_json': items['bundle_asin_component_json'],
'bsr_category_asin_list': items['bs_category_asin_list_pg'],
'review_json_list': items['review_json_list'], 'fbm_delivery_price': items['fbm_delivery_price']
}
print(item)
self.item_queue.put(item)
response_gzip = self.compress_string(response)
Requests_param_val().send_kafka(html_data=response_gzip, topic=self.topic_asin_html)
Requests_param_val().kafuka_producer_str.flush(timeout=30)
else:
print('asin 商品 异常')
# 压缩字符串
def compress_string(self, input_string):
return gzip.compress(input_string.encode())
def update_ai_asin_analyze_log(self, task_id_list, status):
if task_id_list:
task_id_list = list(set(task_id_list))
while True:
try:
with self.engine_pg.begin() as conn:
for task_id in task_id_list:
sql_update = f"""UPDATE ai_asin_analyze_log a set spider_status='{status}' where a.task_id = {task_id}"""
print('UPDATE_sql:', sql_update)
conn.execute(sql_update)
break
except Exception as e:
print('更新 ai_asin_analyze_log 失败', e)
time.sleep(5)
def save_data_asin_analyze(self):
self.pg_connect()
items_data_list = []
id_list = []
while True:
if self.item_queue.empty() == False:
items = self.item_queue.get()
items_data_list.append(
[int(items['task_id']), items['asin'], items['site_name'], '成功', items, int(items['id']),
items['sub_step']])
id_list.append(int(items['id']))
else:
break
if items_data_list:
while True:
try:
self.pg_connect()
print('存储数据, 数量', len(items_data_list))
with self.engine_pg.begin() as conn:
if len(set(id_list)) == 1:
sql_delete = f"delete from ai_asin_analyze_spider where id in ({tuple(id_list)[0]});"
else:
sql_delete = f"delete from ai_asin_analyze_spider where id in {tuple(set(id_list))};"
print('删除:', sql_delete)
conn.execute(sql_delete)
print(len(items_data_list))
df_asin_detail = pd.DataFrame(data=items_data_list,
columns=['task_id', 'unique_key', 'site_name', 'status', 'html_json',
'id', 'sub_step'])
df_asin_detail['html_json'] = df_asin_detail['html_json'].apply(
lambda x: json.dumps(x, ensure_ascii=False) if isinstance(x, (dict, list)) else x
)
self.engine_pg.to_sql(df_asin_detail, 'ai_asin_analyze_spider', if_exists='append')
break
except Exception as e:
print('存储报错::', e)
self.pg_connect()
time.sleep(10)
def save_data_ai_asin(self):
self.pg_connect()
items_data_list = []
update_time = int(time.time())
task_id_list = []
while True:
if self.item_queue.empty() == False:
items = self.item_queue.get()
unique_key = self.site_name + ':' + items['asin']
items_data_list.append(
[int(items['task_id']), items['url'], items['sub_step'], '爬取成功', items, update_time, unique_key])
task_id_list.append(int(items['task_id']))
else:
break
if task_id_list:
self.update_ai_asin_analyze_log(task_id_list, '成功')
while True:
try:
print('存储数据, 数量', len(items_data_list))
if items_data_list:
print(len(items_data_list))
df_asin_detail = pd.DataFrame(data=items_data_list,
columns=['task_id', 'url', 'sub_step', 'status', 'html_json',
'create_time',
'unique_key'])
df_asin_detail['html_json'] = df_asin_detail['html_json'].apply(
lambda x: json.dumps(x, ensure_ascii=False) if isinstance(x, (dict, list)) else x
)
self.engine_pg.to_sql(df_asin_detail, 'ai_asin_analyze_spider', if_exists='append')
break
except Exception as e:
print('存储报错::', e)
time.sleep(10)
else:
print('save_data 存储数据, 数量', len(items_data_list))
def task(self):
result = 1 + 1
print("执行结果:", result)
def read_ai_asin(self):
self.pg_connect()
self.spider_type=True
for module in ['Amazon:asin', 'Amazon:asinList']:
if module == 'Amazon:asin':
# pass
sql = f"SELECT elem->>'asin' AS asin,task_id,site_name FROM ai_asin_analyze_log,LATERAL json_array_elements(input_params) elem WHERE module='{module}' and spider_status='未开始' for update;"
else:
sql = f"""SELECT elem->>'asin' AS asin,task_id,site_name FROM ai_asin_analyze_log,LATERAL json_array_elements(input_params) elem WHERE module = '{module}' and spider_status='未开始' for update;"""
# sql = f"""SELECT elem->>'asin' AS asin,task_id,site_name FROM ai_asin_analyze_log,LATERAL json_array_elements(input_params) elem WHERE module = '{module}' and task_id=39 for update;"""
print(sql)
df_read = self.engine_pg.read_then_update(
select_sql=sql,
update_table='ai_asin_analyze_log',
set_values={"spider_status": '爬取中'}, # 把库存清零
where_keys=["task_id"], # WHERE sku = :sku
)
print(f'开始 {module} 任务:', sql)
if not df_read.empty:
# if module == 'Amazon:asin':
# _asin_list = ast.literal_eval(df_read['asin'][0])
# asin_id_list = []
# for _aisn in _asin_list:
# asin_data_list = list(
# _aisn + '|-|' + df_read.task_id.astype(
# "U") + '|-|' + df_read.site_name + '|-|' + module)
# asin_id_list.extend(asin_data_list)
asin_id_list = list(
df_read['asin'] + '|-|' + df_read.task_id.astype(
"U") + '|-|' + df_read.site_name + '|-|' + module)
# else:
# asin_id_list = list(
# df_read['asin'] + '|-|' + df_read.task_id.astype(
# "U") + '|-|' + df_read.site_name + '|-|' + module)
print(asin_id_list)
for asin_id in asin_id_list:
print(asin_id)
self.queries_asin_queue.put(asin_id)
html_thread = []
for i in range(5):
thread2 = threading.Thread(target=self.get_asin)
thread2.start()
html_thread.append(thread2)
for t2 in html_thread:
t2.join()
self.save_data_ai_asin()
def run_analzye_asin(self, asin_id_list):
self.get_params()
print(asin_id_list)
for asin_id in asin_id_list:
print(asin_id)
self.queries_asin_queue.put(asin_id)
html_thread = []
for i in range(5):
thread2 = threading.Thread(target=self.get_asin)
thread2.start()
html_thread.append(thread2)
for t2 in html_thread:
t2.join()
self.save_data_asin_analyze()
def select_asin():
for site in ['us', 'de', 'uk']:
select_sql = f"""select id, site_name, task_id, unique_key as asin,sub_step from ai_asin_analyze_spider where sub_step = 'AsinInfoRepository:详情' and status = '未开始' and site_name='{site}' order by task_id"""
print('select_sql::', select_sql)
engine_pg15 = ai_async_asin_pg(site_name='us').pg_connect()
df_read = engine_pg15.read_then_update(
select_sql=select_sql,
update_table='ai_asin_analyze_spider',
set_values={"status": '爬取中'}, # 把库存清零
where_keys=["id", "site_name"], # WHERE sku = :sku
)
if not df_read.empty:
asin_id_list = list(
df_read['asin'] + '|-|' + df_read.task_id.astype(
"U") + '|-|' + df_read.site_name + '|-|' + df_read.id.astype(
"U") + '|-|' + df_read.sub_step)
print(asin_id_list)
ai_async_asin_pg(site_name=site).run_analzye_asin(asin_id_list)
def run_spider():
time_ip_num = 0
while True:
time_ip_num += 1
select_asin()
ai_async_asin_pg().read_ai_asin()
time.sleep(5)
print('-----------------------------------------------------------------------------------------')
print()
if 10 <= datetime.now().hour < 22:
if time_ip_num > 180:
pppoe_ip()
time_ip_num = 0
time.sleep(5)
if __name__ == '__main__':
run_spider()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment