Commit 694247a4 by Peng

更新了获取cookie请求头

me搜索词同步到pg6更新了数据库链接方式
合并了ai类型任务代码。变成ai_analyze_spider
优化了1688付款流程。新增专属余额对应订单号自动支付
parent 7597ea9f
......@@ -47,25 +47,44 @@ def get_cookie(site='us', zipCode='10010'):
engine_us = get_remote_engine(site, 'mysql')
n = random.randint(70, 114)
ua = f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{n}.0.{random.randint(1000, 5000)}.{random.randint(1, 181)} Safari/537.36'
n = random.randint(120, 130)
ua = f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{n}.0.0.0 Safari/537.36'
print(ua)
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.4929.149 Safari/537.36'
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36'
headers = {
'connection': 'close',
'authority': host,
'accept': 'text/html,*/*',
'accept-language': 'zh-CN,zh;q=0.9',
'cache-control': 'no-cache',
'content-type': 'application/x-www-form-urlencoded;charset=UTF-8',
'origin': url_,
'referer': url_,
'sec-ch-ua-mobile': '?0',
'user-agent': ua
}
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"Accept-Encoding": "gzip, deflate, br, zstd",
"Accept-Language": "zh-CN,zh;q=0.9",
"Cache-Control": "no-cache",
"Device-Memory": "8",
"Downlink": "1.25",
"Dpr": "0.75",
"Ect": "3g",
"Pragma": "no-cache",
"Rtt": "300",
"Sec-Ch-Device-Memory": "8",
"Sec-Ch-Dpr": "0.75",
"Sec-Ch-Ua": f'"Not_A Brand";v="8", "Chromium";v="{ua}", "Google Chrome";v="{ua}"',
"Sec-Ch-Ua-Mobile": "?0",
"Sec-Ch-Ua-Platform": '"Windows"',
"Sec-Ch-Ua-Platform-Version": '"10.0.0"',
"Sec-Ch-Viewport-Width": "2560",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "none",
"Sec-Fetch-User": "?1",
"Upgrade-Insecure-Requests": "1",
"User-Agent": ua,
"Viewport-Width": "2560",
}
alphabet = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n']
k = ""
for i in (0, random.randint(0, 5)):
k += random.choice(alphabet)
headers[k] = str(uuid.uuid4())
# headers[k] = str(uuid.uuid4())
sess = requests.Session()
sess.mount(url_, py_ja3.DESAdapter())
resp_ = sess.get(url_, headers=headers, timeout=15, verify=False)
......@@ -193,12 +212,10 @@ def get_cookie(site='us', zipCode='10010'):
if zipCode in ingress[0].strip() or "W1S 3" in ingress[0].strip():
print(f"*************** 当前获取 {site} 站点 cookie 邮编 {zipCode} ********************")
cookies = json.dumps(index_resp_cookies, ensure_ascii=False)
cookies_list=[[cookies,'DB']]
item = {"site": site, 'zipCode': ingress[0].strip(), 'cookie': cookies}
print(item)
# 构造 DataFrame
df = pd.DataFrame([{"cookies": cookies, "type": "DB"}])
# df_data_list = df.values.tolist()
# 存储到数据库
engine_us.to_sql(df, f"{site}_cookies", if_exists="append")
......@@ -210,13 +227,13 @@ def get_cookie(site='us', zipCode='10010'):
if __name__ == '__main__':
while True:
# get_cookie(site='us', zipCode='10010')
# get_cookie(site='de', zipCode='10115')
# get_cookie(site='uk', zipCode='W1S 3PR')
get_cookie(site='us', zipCode='10010')
get_cookie(site='de', zipCode='10115')
get_cookie(site='uk', zipCode='W1S 3PR')
# get_cookie(site='it', zipCode='85')
# get_cookie(site='es', zipCode='28001')
# get_cookie(site='fr', zipCode='75019')
# get_cookie(site='us', zipCode='10010')
get_cookie(site='es', zipCode='28001')
get_cookie(site='fr', zipCode='75019')
get_cookie(site='us', zipCode='10010')
get_cookie(site='de', zipCode='10115')
get_cookie(site='uk', zipCode='W1S 3PR')
time.sleep(random.uniform(10.5, 55.5))
time.sleep(random.uniform(30.5, 70.5))
......@@ -123,7 +123,6 @@ class dow_category_Product():
try:
num += 1
Category_name = Category
# _Category = Category.replace('&', '\\\&')
print("Category_name 名称 11111", Category)
driver.execute_script(f"""document.querySelector("kat-radiobutton[label='{Category}']").click()""")
time.sleep(1)
......
......@@ -74,7 +74,7 @@ class bsr_catgory(BaseUtils):
except:
print(site, 'db_cursor_connect 报错:', sql)
def db_cursor_connect_msyql_read(self, site=None,select_state1_sql=None):
def db_cursor_connect_msyql_read(self, site=None, select_state1_sql=None):
for i in range(3):
try:
if site:
......@@ -115,7 +115,7 @@ class bsr_catgory(BaseUtils):
self.year_month = f'{self.year}_{self.month}'
sele_sql = f"SELECT `week` FROM week_20_to_30 WHERE `year_month`='{self.year}_{self.month}'"
print(sele_sql)
df_year_week = self.db_cursor_connect_msyql_read(site='us',select_state1_sql=sele_sql)
df_year_week = self.db_cursor_connect_msyql_read(site='us', select_state1_sql=sele_sql)
self.year_week = list(df_year_week['week'])[-1]
print(self.year_week, '====当前周===1232333')
......@@ -188,19 +188,19 @@ class bsr_catgory(BaseUtils):
"category_first_id": category_first_id,
"category_parent_id": category_parent_id
}
keys_to_check = ['category_id', 'category_first_id', 'category_parent_id']
# 使用列表推导式检查多个键的值是否为空字符串或None
empty_or_none_keys = [key for key in keys_to_check if items.get(key) in ('', None)]
if empty_or_none_keys:
print('解析失败')
try:
account = 'pengyanbing'
title = self.site_name + ' bsr 榜单'
content = f' bsr 榜单解析 url 失败 节点数:{nodes_num} \n 解析url:{url}'
db_class = connect_db(self.site_name)
db_class.send_mg(account, title, content)
except:
pass
# keys_to_check = ['category_id', 'category_first_id', 'category_parent_id']
# # 使用列表推导式检查多个键的值是否为空字符串或None
# empty_or_none_keys = [key for key in keys_to_check if items.get(key) in ('', None)]
# if empty_or_none_keys:
# print('解析失败')
# try:
# account = 'pengyanbing'
# title = self.site_name + ' bsr 榜单'
# content = f' bsr 榜单解析 url 失败 节点数:{nodes_num} \n 解析url:{url}'
# db_class = connect_db(self.site_name)
# db_class.send_mg(account, title, content)
# except:
# pass
return items
def html_4(self, bum):
......@@ -604,7 +604,7 @@ class bsr_catgory(BaseUtils):
order by {self.site_name}_bs_category.category_id, category_parent_id;
"""
print('path_sql:', path_sql)
df_exist_rows = self.db_cursor_connect_msyql_read(site=None,select_state1_sql=path_sql)
df_exist_rows = self.db_cursor_connect_msyql_read(site=None, select_state1_sql=path_sql)
exist_rows = df_exist_rows.values.tolist()
group1_id = []
group2_id = []
......@@ -668,12 +668,12 @@ class bsr_catgory(BaseUtils):
# 不存在就插入
try:
select_sql_id = f'''SELECT id FROM {self.site_name}_bs_category WHERE `path`="{name_num_path[3]}"'''
df_id = self.db_cursor_connect_msyql_read(site=None,select_state1_sql=select_sql_id)
df_id = self.db_cursor_connect_msyql_read(site=None, select_state1_sql=select_sql_id)
if not df_id.empty:
save_name_num_list.append(name_num_path)
else:
select_sql_name = f'''SELECT en_name FROM {self.site_name}_bs_category WHERE `path`="{name_num_path[3]}" order by id desc '''
df_en_name = self.db_cursor_connect_msyql_read(site=None,select_state1_sql=select_sql_name)
df_en_name = self.db_cursor_connect_msyql_read(site=None, select_state1_sql=select_sql_name)
print('en_name::', df_en_name.values)
if df_en_name['en_name'][0] == name_num_path[1]:
pass
......@@ -725,7 +725,7 @@ class bsr_catgory(BaseUtils):
select id, path,nodes_num from {self.site_name}_bs_category where category_first_id is null and category_parent_id != '0'
and delete_time is null;
"""
df_nodes_num = self.db_cursor_connect_msyql_read(site=None,select_state1_sql=sql)
df_nodes_num = self.db_cursor_connect_msyql_read(site=None, select_state1_sql=sql)
if not df_nodes_num.empty:
id_path_list = df_nodes_num.values.tolist()
......@@ -815,13 +815,13 @@ class bsr_catgory(BaseUtils):
def select_id_1(self):
# 查询 子节点的顶级父类id
select_sql_1 = f'select id from {self.site_name}_bs_category where nodes_num=2'
df_id = self.db_cursor_connect_msyql_read(site=None,select_state1_sql=select_sql_1)
df_id = self.db_cursor_connect_msyql_read(site=None, select_state1_sql=select_sql_1)
df_id_lsit = df_id.values.tolist()
for id in df_id_lsit:
en_name_id_list = []
select_p_id = f"select t3.id,t4.en_name from (select t1.id,t1.parent_id,if(find_in_set(parent_id, @pids) > 0, @pids := concat(@pids, ',',id), 0) as ischild from (select id,p_id as parent_id from {self.site_name}_bs_category t order by p_id,id) t1,(select @pids := {id[0]}) t2) t3 LEFT JOIN {self.site_name}_bs_category t4 on t3.id = t4.id where ischild != 0;"
print('select_p_id::',select_p_id)
df_all_id = self.db_cursor_connect_msyql_read(site=None,select_state1_sql=select_p_id)
print('select_p_id::', select_p_id)
df_all_id = self.db_cursor_connect_msyql_read(site=None, select_state1_sql=select_p_id)
if not df_all_id.empty:
all_id_lsit = df_all_id.values.tolist()
for en_name_id in all_id_lsit:
......@@ -894,7 +894,7 @@ class bsr_catgory(BaseUtils):
and delete_time is null
order by category_id,category_first_id
"""
df_id_tuple = self.db_cursor_connect_msyql_read(site=None,select_state1_sql=id_sql)
df_id_tuple = self.db_cursor_connect_msyql_read(site=None, select_state1_sql=id_sql)
id_tuple = df_id_tuple.values.tolist()
id_list = []
for id in id_tuple:
......@@ -960,9 +960,9 @@ class bsr_catgory(BaseUtils):
_0_days = ((datetime.datetime.now()) + datetime.timedelta(days=0)).strftime("%Y-%m-%d")
select_sql = f"select count(id) FROM {self.site_name}_bs_category_top100_asin WHERE date_info = '{_0_days}';"
print(select_sql)
df_count_data_num = self.db_cursor_connect_msyql_read(site=None,select_state1_sql=select_sql)
df_count_data_num = self.db_cursor_connect_msyql_read(site=None, select_state1_sql=select_sql)
count_data_num = df_count_data_num['count(id)'][0]
print('count_data_num::',count_data_num)
print('count_data_num::', count_data_num)
self.send_ms_count_data_num(self.site_name, count_data_num, _0_days)
break
except Exception as e:
......@@ -971,6 +971,22 @@ class bsr_catgory(BaseUtils):
time.sleep(20)
continue
def sele_msyql_category(self, site):
engine_mysql = self.db_engine_us(site, 'mysql')
sql = f'select path, nodes_num,id from {site}_bs_category where nodes_num>1'
df = engine_mysql.read_sql(sql)
values_list = df.values.tolist()
with engine_mysql.begin() as conn_6:
for value in values_list:
print(value)
items = self.parse_url(value[1], value[0])
items['id'] = value[2]
print(items)
# {'category_id': '1722264031', 'category_first_id': 'baby', 'category_parent_id': '60244031', 'id': 67478}
ai_sql1 = f"update {site}_bs_category set category_id = '{items['category_id']}',category_parent_id='{items['category_parent_id']}',category_first_id='{items['category_first_id']}' where id={items['id']}"
print(ai_sql1)
conn_6.execute(ai_sql1)
def dele_self_real_spider(self):
print('每天晚上定时删除贺哲的抓取表。用户已经取消收藏店铺')
select_sql = 'select data_id from user_collection_syn where data_type =2'
......@@ -1013,6 +1029,7 @@ if __name__ == '__main__':
spider_us.run_update_redirect_flag()
spider_us.updata_category_first_id()
spider_us.send_ms()
for site in ['us','de', 'uk']:
for site in ['us', 'de', 'uk']:
spider_us = bsr_catgory(site_name=site)
spider_us.updata_category_state()
spider_us.sele_msyql_category(site)
import pandas as pd
from urllib.parse import quote
from datetime import datetime
import sys
import os
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from utils.db_connect import BaseUtils
# 从数据库获取数据的函数
def get_data_from_database(connection, query):
return pd.read_sql(query, connection)
def get_data_from_database(engine_pg, query):
result_df = engine_pg.read_sql(query)
return result_df
def db_read_data(engine_pg):
......@@ -22,17 +27,19 @@ def db_read_data(engine_pg):
# 创建初始 DataFrame
df_search_term = pd.DataFrame(data=result_list, columns=['search_term', 'url'])
print(df_search_term.shape)
df_search_term['date_info'] = str(datetime.now().strftime("%Y-%m-%d"))
print('date_info::',df_search_term['date_info'])
# 找出超过 450 字符长度的 URL 行的索引
long_url_rows = df_search_term['url'].str.len() <= 450
# 筛选保留不超过 450 字符长度的 URL 行
data_df = df_search_term[long_url_rows]
print('pg6 写入数据 merchantwords')
with engine_pg.begin() as conn:
data_df.to_sql(f'us_search_term_month_syn_merchantwords', con=engine_pg, if_exists="append", index=False)
engine_pg.to_sql(data_df,'us_merchantwords_search_term_month_syn_2025', if_exists="append")
update_sql = f"update us_search_term_month_merchantwords set state =3 where state=1"
print(update_sql)
conn.execute(update_sql)
deletesql = f"DELETE from us_search_term_month_syn_merchantwords where state =3"
deletesql = f"DELETE from us_merchantwords_search_term_month_syn_2025 where state =3"
print(deletesql)
conn.execute(deletesql)
......@@ -58,6 +65,7 @@ def build_urls(search_term):
]
return [[search_term, url] for url in urls]
# if __name__ == '__main__':
# # 传一个 数据库链接
# db_read_data(engine_pg)
\ No newline at end of file
if __name__ == '__main__':
# 传一个 数据库链接
engine_pg = BaseUtils(site_name='us').pg_connect_6()
db_read_data(engine_pg)
\ No newline at end of file
......@@ -258,26 +258,7 @@ class Save_asin_detail(BaseUtils):
if asin_not_div_id_dp_list:
self.db_change_state(state=13, asin_list=asin_not_div_id_dp_list)
@func_set_timeout(240)
def save_asin_not_buysales(self, asin_buySales_list):
while True:
try:
if is_internet_available():
pass
else:
self.engine = self.mysql_connect()
self.engine_pg = self.pg_connect()
print('错误月销的asin:', asin_buySales_list)
print('错误月销的asin:', len(asin_buySales_list))
df_asin_ = pd.DataFrame(data=asin_buySales_list, columns=['asin', 'buysales', 'date_info'])
self.engine_pg.to_sql(df_asin_, f'{self.site_name}_asin_detail_2025_not_buysales', if_exists='append')
break
except Exception as e:
print("存储 _asin_detail_2025_not_buysales 文本 数据错误", e)
self.engine = self.mysql_connect()
self.engine_pg = self.pg_connect()
time.sleep(random.uniform(10, 20.5))
continue
@func_set_timeout(240)
def save_bs_category_asin_detail(self, bs_category_asin_list_pg):
......
......@@ -21,7 +21,6 @@ import time
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
sess = requests.Session()
urllib3.disable_warnings()
import ast
class ai_async_asin_pg():
......@@ -120,8 +119,6 @@ class ai_async_asin_pg():
sess.mount(self.site_url, py_ja3.DESAdapter())
resp = sess.get(scraper_url, headers=headers,
timeout=10, verify=False)
# with open(rf'{self.site_name}_22_{asin}.html', 'w', encoding='utf-8')as f:
# f.write(resp.text)
if self.reuests_para_val.check_amazon_yzm(resp):
print('出现验证码,。asin---> ', asin)
if self.spider_state == '竞品asin':
......@@ -331,41 +328,31 @@ class ai_async_asin_pg():
def read_ai_asin(self):
self.pg_connect()
self.spider_type=True
self.spider_type = True
for module in ['Amazon:asin', 'Amazon:asinList']:
if module == 'Amazon:asin':
# pass
sql = f"SELECT elem->>'asin' AS asin,task_id,site_name FROM ai_asin_analyze_log,LATERAL json_array_elements(input_params) elem WHERE module='{module}' and spider_status='未开始' for update;"
else:
sql = f"""SELECT elem->>'asin' AS asin,task_id,site_name FROM ai_asin_analyze_log,LATERAL json_array_elements(input_params) elem WHERE module = '{module}' and spider_status='未开始' for update;"""
# sql = f"""SELECT elem->>'asin' AS asin,task_id,site_name FROM ai_asin_analyze_log,LATERAL json_array_elements(input_params) elem WHERE module = '{module}' and task_id=39 for update;"""
print(sql)
df_read = self.engine_pg.read_then_update(
select_sql=sql,
update_table='ai_asin_analyze_log',
set_values={"spider_status": '爬取中'}, # 把库存清零
where_keys=["task_id"], # WHERE sku = :sku
)
while True:
try:
if module == 'Amazon:asin':
sql = f"SELECT elem->>'asin' AS asin,task_id,site_name FROM ai_asin_analyze_log,LATERAL json_array_elements(input_params) elem WHERE module='{module}' and spider_status='未开始' for update;"
else:
sql = f"""SELECT elem->>'asin' AS asin,task_id,site_name FROM ai_asin_analyze_log,LATERAL json_array_elements(input_params) elem WHERE module = '{module}' and spider_status='未开始' for update;"""
# sql = f"""SELECT elem->>'asin' AS asin,task_id,site_name FROM ai_asin_analyze_log,LATERAL json_array_elements(input_params) elem WHERE module = '{module}' and task_id=39 for update;"""
print(sql)
df_read = self.engine_pg.read_then_update(
select_sql=sql,
update_table='ai_asin_analyze_log',
set_values={"spider_status": '爬取中'}, # 把库存清零
where_keys=["task_id"], # WHERE sku = :sku
)
break
except:
time.sleep(10)
print(f'开始 {module} 任务:', sql)
if not df_read.empty:
# if module == 'Amazon:asin':
# _asin_list = ast.literal_eval(df_read['asin'][0])
# asin_id_list = []
# for _aisn in _asin_list:
# asin_data_list = list(
# _aisn + '|-|' + df_read.task_id.astype(
# "U") + '|-|' + df_read.site_name + '|-|' + module)
# asin_id_list.extend(asin_data_list)
asin_id_list = list(
df_read['asin'] + '|-|' + df_read.task_id.astype(
"U") + '|-|' + df_read.site_name + '|-|' + module)
# else:
# asin_id_list = list(
# df_read['asin'] + '|-|' + df_read.task_id.astype(
# "U") + '|-|' + df_read.site_name + '|-|' + module)
print(asin_id_list)
for asin_id in asin_id_list:
print(asin_id)
self.queries_asin_queue.put(asin_id)
......@@ -385,7 +372,7 @@ class ai_async_asin_pg():
print(asin_id)
self.queries_asin_queue.put(asin_id)
html_thread = []
for i in range(5):
for i in range(8):
thread2 = threading.Thread(target=self.get_asin)
thread2.start()
html_thread.append(thread2)
......@@ -396,32 +383,37 @@ class ai_async_asin_pg():
def select_asin():
for site in ['us', 'de', 'uk']:
select_sql = f"""select id, site_name, task_id, unique_key as asin,sub_step from ai_asin_analyze_spider where sub_step = 'AsinInfoRepository:详情' and status = '未开始' and site_name='{site}' order by task_id"""
print('select_sql::', select_sql)
engine_pg15 = ai_async_asin_pg(site_name='us').pg_connect()
df_read = engine_pg15.read_then_update(
select_sql=select_sql,
update_table='ai_asin_analyze_spider',
set_values={"status": '爬取中'}, # 把库存清零
where_keys=["id", "site_name"], # WHERE sku = :sku
)
if not df_read.empty:
asin_id_list = list(
df_read['asin'] + '|-|' + df_read.task_id.astype(
"U") + '|-|' + df_read.site_name + '|-|' + df_read.id.astype(
"U") + '|-|' + df_read.sub_step)
print(asin_id_list)
while True:
try:
select_sql = f"""select id, site_name, task_id, unique_key as asin,sub_step from ai_asin_analyze_spider where sub_step = 'AsinInfoRepository:详情' and status = '未开始' and site_name='{site}' order by task_id"""
print('select_sql::', select_sql)
engine_pg15 = ai_async_asin_pg(site_name='us').pg_connect()
df_read = engine_pg15.read_then_update(
select_sql=select_sql,
update_table='ai_asin_analyze_spider',
set_values={"status": '爬取中'}, # 把库存清零
where_keys=["id", "site_name"], # WHERE sku = :sku
)
if not df_read.empty:
asin_id_list = list(
df_read['asin'] + '|-|' + df_read.task_id.astype(
"U") + '|-|' + df_read.site_name + '|-|' + df_read.id.astype(
"U") + '|-|' + df_read.sub_step)
print(asin_id_list)
ai_async_asin_pg(site_name=site).run_analzye_asin(asin_id_list)
ai_async_asin_pg(site_name=site).run_analzye_asin(asin_id_list)
break
except:
time.sleep(10)
def run_spider():
time_ip_num = 0
while True:
time_ip_num += 1
select_asin()
ai_async_asin_pg().read_ai_asin()
time.sleep(5)
select_asin() # 任务类型 AsinInfoRepository:详情
ai_async_asin_pg().read_ai_asin() # 任务类型 'Amazon:asin', 'Amazon:asinList'
time.sleep(20)
print('-----------------------------------------------------------------------------------------')
print()
if 10 <= datetime.now().hour < 22:
......
import os
import sys
import gzip
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from utils.secure_db_client import get_remote_engine
from amazon_spider.VPS_IP import pppoe_ip
......@@ -19,7 +19,6 @@ import time
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
sess = requests.Session()
urllib3.disable_warnings()
import ast
......@@ -206,46 +205,23 @@ class ai_async_asin_pg():
'bundles_this_asins_json': items['bundles_this_asins_data_json'],
'video_m3u8_url': items["video_m3u8"], 'result_list_json': items['result_list_json'],
'bundle_asin_component_json': items['bundle_asin_component_json'],
'bsr_category_asin_list': items['bs_category_asin_list_pg'],'review_json_list':items['review_json_list']
'bsr_category_asin_list': items['bs_category_asin_list_pg'],'review_json_list':items['review_json_list'],
'fbm_delivery_price': items['fbm_delivery_price']
}
print(item)
# a = None
# if result_list_json and module == 'Amazon:asin' and self.spider_state is None:
# is_sp_asin_state = None
# result_list_dict = json.loads(result_list_json)
# print(asin, '判断是否有竞品asin')
# for result_dict in result_list_dict:
# # Based on your recent shopping trends # Frequently purchased items with fast delivery
# # Customers who viewed this item also viewed # Brand in this category on Amazon
# sp_type = 'Based on your recent shopping trends'
# if result_dict.get(sp_type):
# print(asin, '找到有竞品asin。 数量:', len(result_dict[sp_type]))
# for i in result_dict[sp_type]:
# sp_asin = i + '|-|' + task_id + '|-|' + site_name + '|-|' + module
# self.sp_asin_queue.put(sp_asin)
# is_sp_asin_state = 111
# a = 1
# if is_sp_asin_state is None:
# print('没有找到竞品asin')
# self.item_queue.put(item)
# # self.save_data()
# # self.update_ai_asin_analyze_log([int(task_id)], '成功')
# a = 1
self.item_queue.put(item)
Requests_param_val().send_kafka(html_data=response, topic=self.topic_asin_html)
response_gzip = self.compress_string(response)
Requests_param_val().send_kafka(html_data=response_gzip, topic=self.topic_asin_html)
Requests_param_val().kafuka_producer_str.flush(timeout=30)
# if self.spider_state == '竞品asin':
# self.item_queue.put(item)
# a = 1
#
# if module == 'Amazon:asinList':
# self.item_queue.put(item)
# a = 1
# if a is None:
# self.item_queue.put(item)
else:
print('asin 商品 异常')
# 压缩字符串
def compress_string(self, input_string):
return gzip.compress(input_string.encode())
def update_ai_asin_analyze_log(self, task_id_list, status):
if task_id_list:
task_id_list = list(set(task_id_list))
......
......@@ -16,6 +16,7 @@ from datetime import datetime
import json
import threading
import time
import gzip
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
sess = requests.Session()
urllib3.disable_warnings()
......@@ -200,15 +201,20 @@ class ai_async_asin_pg():
'video_m3u8_url': items["video_m3u8"], 'result_list_json': items['result_list_json'],
'bundle_asin_component_json': items['bundle_asin_component_json'],
'bsr_category_asin_list': items['bs_category_asin_list_pg'],
'review_json_list': items['review_json_list']
'review_json_list': items['review_json_list'],'fbm_delivery_price':items['fbm_delivery_price']
}
print(item)
self.item_queue.put(item)
Requests_param_val().send_kafka(html_data=response, topic=self.topic_asin_html)
response_gzip = self.compress_string(response)
Requests_param_val().send_kafka(html_data=response_gzip, topic=self.topic_asin_html)
Requests_param_val().kafuka_producer_str.flush(timeout=30)
else:
print('asin 商品 异常')
# 压缩字符串
def compress_string(self, input_string):
return gzip.compress(input_string.encode())
def save_data(self):
self.pg_connect()
items_data_list = []
......@@ -245,6 +251,7 @@ class ai_async_asin_pg():
print('存储报错::', e)
self.pg_connect()
time.sleep(10)
def init_list(self):
print("=======清空变量==========")
self.asin_not_found_list = [] # 4
......
......@@ -75,7 +75,7 @@ class async_asin_pg():
self.topic_detail_month = f'{self.site_name}_asin_detail_month_2025_{self.month_}'
self.topic_asin_html = f'asin_html_2025_{self.month_}'
self.asin_video_list = []
self.asin_buySales_list = []
def get_asin(self):
while True:
if self.queries_asin_queue.empty() == False:
......@@ -112,7 +112,7 @@ class async_asin_pg():
sess.mount(self.site_url, py_ja3.DESAdapter())
resp = sess.get(scraper_url, headers=headers,
timeout=10, verify=False)
# with open(rf'{self.site_name}_22_{asin}.html', 'w', encoding='utf-8')as f:
# with open(rf'D:\新建文件夹\html_selenium_files\{self.site_name}_211123333_{asin}.html', 'w', encoding='utf-8')as f:
# f.write(resp.text)
if self.reuests_para_val.check_amazon_yzm(resp):
self.yzm_err_total_list.append(1)
......@@ -216,7 +216,7 @@ class async_asin_pg():
'bundles_this_asins_json': items['bundles_this_asins_data_json'],
'video_m3u8_url': items["video_m3u8"], 'result_list_json': items['result_list_json'],
'bundle_asin_component_json':items['bundle_asin_component_json'],
'review_json_list':items['review_json_list'],'asin_buySales_list':items['asin_buySales_list']
'review_json_list':items['review_json_list'],'fbm_delivery_price':items['fbm_delivery_price']
}
if self.site_name in ['uk', 'de', 'fr', 'es', 'it']:
item['five_six_val'] = items['five_six_val']
......@@ -286,6 +286,8 @@ class async_asin_pg():
item['node_id'] = _items["node_id"]
if item['review_json_list'] is None:
item['review_json_list'] = _items["review_json_list"]
if item['fbm_delivery_pric'] is None:
item['fbm_delivery_price'] = _items["fbm_delivery_price"]
except:
pass
_response_text_var = None
......@@ -366,8 +368,6 @@ class async_asin_pg():
item['img_list'] = json.dumps(items["all_img_video_list"])
else:
item['img_list'] = None
if item['asin_buySales_list']:
self.asin_buySales_list.extend(item['asin_buySales_list'])
self.item_queue.put(item)
if item['img_list'] is None:
......@@ -425,7 +425,6 @@ class async_asin_pg():
def init_list(self):
print("=======清空变量==========")
self.asin_buySales_list = []
self.asin_not_found_list = [] # 4
self.asin_not_sure_list = [] # 6
self.asin_not_foot_list = [] # 7
......@@ -478,7 +477,7 @@ class async_asin_pg():
def run(self):
asin_list = self.save_asin_detail.read_db_data()
# asin_list = ['B0D663T3W8|2025-01|1|1|null|null']
# asin_list = ['B0CW1ZM991|2025-01|1|1|null|null']
if asin_list:
for asin in asin_list:
self.queries_asin_queue.put(asin)
......@@ -488,7 +487,7 @@ class async_asin_pg():
for ck in cookies_dict.values():
self.cookies_queue.put(ck)
html_thread = []
for i in range(27):
for i in range(26):
thread2 = threading.Thread(target=self.get_asin)
thread2.start()
html_thread.append(thread2)
......@@ -500,14 +499,6 @@ class async_asin_pg():
self.reuests_para_val.kafuka_producer_str.flush(timeout=35)
except KafkaTimeoutError as e:
print("flush 超时,跳过这次等待:", e)
while True:
try:
if self.asin_buySales_list:
self.save_asin_detail.save_asin_not_buysales(self.asin_buySales_list)
break
except FunctionTimedOut as e:
print('断网', e)
while True:
try:
print('存储 asin bsr 文本 存储pg')
......@@ -569,4 +560,4 @@ class async_asin_pg():
pass
# if __name__ == '__main__':
# async_asin_pg(month=9, spider_int=1, week=14,site_name='us').run()
# async_asin_pg(month=12, spider_int=1, week=14,site_name='de').run()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment