Commit 58220d8c by Peng

1 优化了kafka连接。2 对asin请求新增参数,3 h10增加中文页面自动化操作,4搜索词抓取bs类型解析发生变化。重新解析,5 asin详情新增解析评论,6 asin详情解析星级有改变.

parent c8524b22
'存储到pg'
'获取小语言cookie'
import sys
import os
import pandas as pd
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
import requests
import json
from lxml import etree
import re
import random
import pymysql
import uuid
import time
import py_ja3
from params import DB_CONN_DICT
import urllib3
from secure_db_client import get_remote_engine
import py_ja3
import traceback
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
item = {}
......@@ -39,14 +43,10 @@ def get_cookie(site='us', zipCode='10010'):
elif site == 'it':
url_ = 'https://www.amazon.it'
host = 'www.amazon.it'
if site == 'us':
us_db = pymysql.connect(host=DB_CONN_DICT['mysql_host'], port=DB_CONN_DICT['mysql_port'], user=DB_CONN_DICT['mysql_user'],
password=DB_CONN_DICT['mysql_pwd'], database="selection", charset="utf8mb4")
else:
us_db = pymysql.connect(host=DB_CONN_DICT['mysql_host'], port=DB_CONN_DICT['mysql_port'], user=DB_CONN_DICT['mysql_user'],
password=DB_CONN_DICT['mysql_pwd'], database="selection_" + site,
charset="utf8mb4")
us_cursor = us_db.cursor()
engine_us = get_remote_engine(site, 'mysql')
n = random.randint(70, 114)
ua = f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{n}.0.{random.randint(1000, 5000)}.{random.randint(1, 181)} Safari/537.36'
headers = {
......@@ -77,274 +77,42 @@ def get_cookie(site='us', zipCode='10010'):
print("第一次发送请求,获取邮编:", ingress)
data_a_modal = html_xpath.xpath("//span[@id='nav-global-location-data-modal-action']/@data-a-modal")
data_modal = json.loads(data_a_modal[0])
if site != 'us':
csrftoken = html_xpath.xpath("//input[@name='anti-csrftoken-a2z']/@value")[0]
url_post = url_ + '/privacyprefs/retail/v1/acceptall'
dada_post = {
"anti-csrftoken-a2z": csrftoken,
"accept": "all"
}
resp_post = sess.post(url_post, headers=headers, cookies=cookies_dict, timeout=15, data=dada_post, verify=False)
cookie_post = resp_post.headers.get('set-cookie')
cookies_dict_post = {i.split("=")[0]: i.split("=")[-1] for i in cookie_post.split("; ")}
cookies_dict_post.update(cookies_dict)
else:
cookies_dict_post = cookies_dict
if site == 'us':
get_token_headers = {
'anti-csrftoken-a2z': data_modal['ajaxHeaders']['anti-csrftoken-a2z'],
'referer': url_,
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36',
}
print(get_token_headers,'23232')
else:
get_token_headers = {
'accept': 'text/html,*/*',
'accept-encoding': 'gzip, deflate, br',
'accept-language': 'zh-CN,zh;q=0.9',
'anti-csrftoken-a2z': data_modal['ajaxHeaders']['anti-csrftoken-a2z'],
'cache-control': 'no-cache',
'referer': url_,
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'same-origin',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36',
'viewport-width': '1920',
'x-requested-with': 'XMLHttpRequest',
}
data_modal_url = url_ + data_modal['url']
print('第二步 拼接url 点击更改位置:',data_modal_url)
data_modal_resp = sess.get(data_modal_url, headers=get_token_headers, cookies=cookies_dict_post,
timeout=15,verify=False)
data_modal_cookie = data_modal_resp.headers.get('set-cookie')
CSRF_TOKEN = re.findall('CSRF_TOKEN : "(.*?)",', data_modal_resp.text)[0]
print("CSRF_TOKEN:",CSRF_TOKEN)
try:
data_modal_cookie_dict = {i.split("=")[0]: i.split("=")[-1] for i in data_modal_cookie.split("; ")}
data_modal_cookie_dict.update(cookies_dict)
except:
data_modal_cookie_dict = cookies_dict_post
url_2 = url_ + '/portal-migration/hz/glow/address-change?actionSource=glow'
print('url_2:',url_2)
# {"locationType":"LOCATION_INPUT","zipCode":"10010","deviceType":"web","storeContext":"generic","pageType":"Gateway","actionSource":"glow"}
data = {"locationType":"LOCATION_INPUT","zipCode":zipCode,"storeContext":"generic","deviceType":"web","pageType":"Gateway","actionSource":"glow"}
print(data)
post_headers = {
'anti-csrftoken-a2z': CSRF_TOKEN,
# if site != 'us':
# csrftoken = html_xpath.xpath("//input[@name='anti-csrftoken-a2z']/@value")[0]
# url_post = url_ + '/privacyprefs/retail/v1/acceptall'
# dada_post = {
# "anti-csrftoken-a2z": csrftoken,
# "accept": "all"
# }
# resp_post = sess.post(url_post, headers=headers, cookies=cookies_dict, timeout=15, data=dada_post,
# verify=False)
# cookie_post = resp_post.headers.get('set-cookie')
# cookies_dict_post = {i.split("=")[0]: i.split("=")[-1] for i in cookie_post.split("; ")}
# cookies_dict_post.update(cookies_dict)
# else:
cookies_dict_post = cookies_dict
# if site == 'us':
# get_token_headers = {
# 'anti-csrftoken-a2z': data_modal['ajaxHeaders']['anti-csrftoken-a2z'],
# 'referer': url_,
# 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36',
# }
# print(get_token_headers, '23232')
# else:
get_token_headers = {
'accept': 'text/html,*/*',
'accept-encoding': 'gzip, deflate, br',
'accept-language': 'zh-CN,zh;q=0.9',
'anti-csrftoken-a2z': data_modal['ajaxHeaders']['anti-csrftoken-a2z'],
'cache-control': 'no-cache',
'content-length': '138',
'content-type': 'application/json',
'device-memory': '8',
'downlink': '10',
'dpr': '1',
'ect': '4g',
'origin': url_,
'pragma': 'no-cache',
'referer': url_,
'rtt': '250',
'sec-ch-device-memory': '8',
'sec-ch-dpr': '1',
'sec-ch-ua': '"Google Chrome";v="107", "Chromium";v="107", "Not=A?Brand";v="24"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'sec-ch-ua-platform-version': '"10.0.0"',
'sec-ch-viewport-width': '1920',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'same-origin',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36',
'viewport-width': '1920',
'TE':'trailers',
'x-requested-with': 'XMLHttpRequest'
'x-requested-with': 'XMLHttpRequest',
}
print('第三步 发送post 请求 输入 邮编 点击确定')
resp_2 = sess.post(url_2, headers=post_headers, json=data, cookies=data_modal_cookie_dict,
timeout=15,verify=False)
print(resp_2.text)
post_cookies = resp_2.headers.get('set-cookie')
try:
post_cookies_dict = {i.split("=")[0]: i.split("=")[-1] for i in post_cookies.split("; ")}
post_cookies_dict.update(data_modal_cookie_dict)
except:
post_cookies_dict = data_modal_cookie_dict
done_url = url_ + "/portal-migration/hz/glow/get-location-label?storeContext=generic&pageType=Gateway&actionSource=desktop-modal"
print('第四步,点击完成,')
done_resp = sess.get(done_url, headers=headers, cookies=post_cookies_dict, timeout=15,verify=False)
print(done_resp.text,'done_respdone_respdone_respdone_resp')
done_cookies_dict = sess.cookies.get_dict()
print('done_cookies_dict::',done_cookies_dict)
print("第五步,请求首页,获取邮编,是否修改成功")
index_resp = sess.get(url_, headers=headers, timeout=15,cookies=done_cookies_dict,verify=False)
index_resp_cookies = sess.cookies.get_dict()
print(sess.cookies.get_dict(),'2222222222222222')
index_xpath = etree.HTML(index_resp.text)
ingress = index_xpath.xpath("//span[@id='glow-ingress-line2']/text()")
print("获取最新邮编:", ingress)
if zipCode in ingress[0].strip() or "W1S 3" in ingress[0].strip():
print(f"*************** 当前获取 {site} 站点 cookie 邮编 {zipCode} ********************")
cookies = json.dumps(index_resp_cookies)
item = {"site": site, 'zipCode': ingress[0].strip(), 'cookie': cookies}
print(item)
insert_sql = f'insert into {site}_cookies (cookies,type)values (%s,%s)'
print(insert_sql)
us_cursor.execute(insert_sql, (cookies,'DB'))
us_db.commit()
us_cursor.close()
us_db.close()
sess.close()
except Exception as e:
print(f"获取 {site} 站点 cookie 报错,切换下一个站点",e)
print("报错", f"\n{traceback.format_exc()}")
time.sleep(random.uniform(2.5, 5.5))
if __name__ == '__main__':
while True:
get_cookie(site='us', zipCode='10010')
get_cookie(site='de', zipCode='10115')
get_cookie(site='uk', zipCode='W1S 3PR')
get_cookie(site='it', zipCode='00185')
get_cookie(site='es', zipCode='28001')
get_cookie(site='fr', zipCode='75019')
get_cookie(site='us', zipCode='10010')
get_cookie(site='de', zipCode='10115')
get_cookie(site='uk', zipCode='W1S 3PR')
time.sleep(random.uniform(5.5, 25.5))
'存储到pg'
'获取小语言cookie'
import sys
import os
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
import requests
import json
from lxml import etree
import re
import random
import pymysql
import uuid
import time
import py_ja3
from params import DB_CONN_DICT
from sqlalchemy import text
import urllib3
import traceback
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
item = {}
headers_num_int = 0
def get_cookie(site='us', zipCode='10010'):
try:
if site == "us":
url_ = 'https://www.amazon.com'
host = 'www.amazon.com'
elif site == 'uk':
url_ = 'https://www.amazon.co.uk' # 站点url
host = 'www.amazon.co.uk'
elif site == 'de':
url_ = 'https://www.amazon.de'
host = 'www.amazon.de'
elif site == 'fr':
url_ = 'https://www.amazon.fr'
host = 'www.amazon.fr'
elif site == 'es':
url_ = 'https://www.amazon.es'
host = 'www.amazon.es'
elif site == 'it':
url_ = 'https://www.amazon.it'
host = 'www.amazon.it'
if site == 'us':
us_db = pymysql.connect(host=DB_CONN_DICT['mysql_host'], port=DB_CONN_DICT['mysql_port'],
user=DB_CONN_DICT['mysql_user'],
password=DB_CONN_DICT['mysql_pwd'], database="selection", charset="utf8mb4")
else:
us_db = pymysql.connect(host=DB_CONN_DICT['mysql_host'], port=DB_CONN_DICT['mysql_port'],
user=DB_CONN_DICT['mysql_user'],
password=DB_CONN_DICT['mysql_pwd'], database="selection_" + site,
charset="utf8mb4")
us_cursor = us_db.cursor()
n = random.randint(70, 114)
ua = f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{n}.0.{random.randint(1000, 5000)}.{random.randint(1, 181)} Safari/537.36'
headers = {
'connection': 'close',
'authority': host,
'accept': 'text/html,*/*',
'accept-language': 'zh-CN,zh;q=0.9',
'cache-control': 'no-cache',
'content-type': 'application/x-www-form-urlencoded;charset=UTF-8',
'origin': url_,
'referer': url_,
'sec-ch-ua-mobile': '?0',
'user-agent': ua
}
alphabet = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n']
k = ""
for i in (0, random.randint(0, 5)):
k += random.choice(alphabet)
headers[k] = str(uuid.uuid4())
sess = requests.Session()
sess.mount(url_, py_ja3.DESAdapter())
resp_ = sess.get(url_, headers=headers, timeout=15, verify=False)
cookie = resp_.headers.get('set-cookie')
print("第一步 请求首页", url_)
cookies_dict = {i.split("=")[0]: i.split("=")[-1] for i in cookie.split("; ")}
html_xpath = etree.HTML(resp_.text)
ingress = html_xpath.xpath("//span[@id='glow-ingress-line2']/text()")
print("第一次发送请求,获取邮编:", ingress)
data_a_modal = html_xpath.xpath("//span[@id='nav-global-location-data-modal-action']/@data-a-modal")
data_modal = json.loads(data_a_modal[0])
if site != 'us':
csrftoken = html_xpath.xpath("//input[@name='anti-csrftoken-a2z']/@value")[0]
url_post = url_ + '/privacyprefs/retail/v1/acceptall'
dada_post = {
"anti-csrftoken-a2z": csrftoken,
"accept": "all"
}
resp_post = sess.post(url_post, headers=headers, cookies=cookies_dict, timeout=15, data=dada_post,
verify=False)
cookie_post = resp_post.headers.get('set-cookie')
cookies_dict_post = {i.split("=")[0]: i.split("=")[-1] for i in cookie_post.split("; ")}
cookies_dict_post.update(cookies_dict)
else:
cookies_dict_post = cookies_dict
if site == 'us':
get_token_headers = {
'anti-csrftoken-a2z': data_modal['ajaxHeaders']['anti-csrftoken-a2z'],
'referer': url_,
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36',
}
print(get_token_headers, '23232')
else:
get_token_headers = {
'accept': 'text/html,*/*',
'accept-encoding': 'gzip, deflate, br',
'accept-language': 'zh-CN,zh;q=0.9',
'anti-csrftoken-a2z': data_modal['ajaxHeaders']['anti-csrftoken-a2z'],
'cache-control': 'no-cache',
'referer': url_,
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'same-origin',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36',
'viewport-width': '1920',
'x-requested-with': 'XMLHttpRequest',
}
data_modal_url = url_ + data_modal['url']
print('第二步 拼接url 点击更改位置:', data_modal_url)
data_modal_resp = sess.get(data_modal_url, headers=get_token_headers, cookies=cookies_dict_post,
......@@ -424,24 +192,16 @@ def get_cookie(site='us', zipCode='10010'):
if zipCode in ingress[0].strip() or "W1S 3" in ingress[0].strip():
print(f"*************** 当前获取 {site} 站点 cookie 邮编 {zipCode} ********************")
cookies = json.dumps(index_resp_cookies)
cookies = json.dumps(index_resp_cookies, ensure_ascii=False)
cookies_list=[[cookies,'DB']]
item = {"site": site, 'zipCode': ingress[0].strip(), 'cookie': cookies}
print(item)
insert_sql = f'insert into {site}_cookies (cookies,type)values (%s,%s)'
print(insert_sql)
us_cursor.execute(insert_sql, (cookies,'DB'))
us_db.commit()
us_cursor.close()
us_db.close()
if site in ('us','de','uk'):
# 构造参数化的 SQL 语句
insert_sql = text("INSERT INTO {}_cookies (cookies, type) VALUES (:cookies, :type)".format(site))
print(insert_sql)
# 使用 with 上下文管理连接,自动开启事务并在结束时提交和关闭连接
# with engine_pg.begin() as conn:
# conn.execute(insert_sql, {"cookies": cookies, "type": "DB"})
# print("存储成功")
sess.close()
# 构造 DataFrame
df = pd.DataFrame([{"cookies": cookies, "type": "DB"}])
# df_data_list = df.values.tolist()
# 存储到数据库
engine_us.to_sql(df, f"{site}_cookies", if_exists="append")
except Exception as e:
print(f"获取 {site} 站点 cookie 报错,切换下一个站点",e)
print("报错", f"\n{traceback.format_exc()}")
......@@ -450,13 +210,13 @@ def get_cookie(site='us', zipCode='10010'):
if __name__ == '__main__':
while True:
get_cookie(site='us', zipCode='10010')
get_cookie(site='de', zipCode='10115')
get_cookie(site='uk', zipCode='W1S 3PR')
get_cookie(site='it', zipCode='00185')
get_cookie(site='es', zipCode='28001')
get_cookie(site='fr', zipCode='75019')
get_cookie(site='us', zipCode='10010')
# get_cookie(site='us', zipCode='10010')
# get_cookie(site='de', zipCode='10115')
# get_cookie(site='uk', zipCode='W1S 3PR')
# get_cookie(site='it', zipCode='85')
# get_cookie(site='es', zipCode='28001')
# get_cookie(site='fr', zipCode='75019')
# get_cookie(site='us', zipCode='10010')
get_cookie(site='de', zipCode='10115')
get_cookie(site='uk', zipCode='W1S 3PR')
time.sleep(random.uniform(10.5, 35.5))
time.sleep(random.uniform(10.5, 55.5))
......@@ -54,7 +54,7 @@ class CalculateMean(BaseUtils):
print(f"读取 {self.site_name}_one_category")
sql = f"select * from {self.site_name}_one_category where state!=4 and name = 'Health & Household' and `year_month`='{_year_month}';"
print('查询原始表:', sql)
self.df_sum = pd.read_sql(sql, con=self.engine)
self.df_sum = self.engine.read_sql(sql)
# # 排序
self.df_sum.sort_values(by=['name', 'rank'], inplace=True)
......@@ -66,7 +66,7 @@ class CalculateMean(BaseUtils):
self.cate_list = list(set(self.df_sum.name))
sql_select = f"SELECT `year_month` from selection.week_20_to_30 WHERE `week`={int(self.week)} and `year`={self.year}"
print(sql_select, 'sql_select:')
df = pd.read_sql(sql_select, con=self.engine)
df = self.engine.read_sql(sql_select)
self.year_month = list(df['year_month'])[0] if list(df['year_month']) else ''
print("self.year_month:", self.year_month)
......@@ -120,11 +120,11 @@ class CalculateMean(BaseUtils):
# sql = f'select en_name as name,category_id from {self.site_name}_bs_category WHERE nodes_num =2 and delete_time is NULL'
sql = f"select en_name as name, category_id from {self.site_name}_bs_category where 1 = 1 and nodes_num = 2 group by en_name, category_id"
df_en_name = pd.read_sql(sql, con=self.engine)
df_en_name = self.engine.read_sql(sql)
# 使用 merge 判断两个列的 name 是否一样
self.df_repeat = pd.merge(self.df_repeat, df_en_name, on='name', how='left')
self.df_repeat = self.df_repeat.loc[self.df_repeat.orders >= 1] # 保留大于0的 排名月销
self.df_repeat.to_sql(f"{self.site_name}_one_category_report_pyb", con=self.engine, if_exists="append", index=False)
self.engine.to_sql(self.df_repeat,f"{self.site_name}_one_category_report_pyb", if_exists="append")
def run(self):
self.send_mes(self.site_name)
......
......@@ -678,11 +678,20 @@ class bsr_catgory(BaseUtils):
if df_en_name['en_name'][0] == name_num_path[1]:
pass
else:
_strftime_ = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
update_name_sql = f'''update {self.site_name}_bs_category set delete_time = '2023-06-19 00:00:00' WHERE `path`="{name_num_path[3]}" and delete_time is null'''
print('更新 en_name:', update_name_sql)
self.db_cursor_connect_update(update_name_sql, self.site_name)
save_name_num_list.append(name_num_path)
select_sql_name_1 = f'''SELECT en_name,id FROM {self.site_name}_bs_category WHERE `path`="{name_num_path[3]}" and delete_time is null'''
df_en_name_1 = self.db_cursor_connect_msyql_read(site=None, select_state1_sql=select_sql_name_1)
print('en_name::', df_en_name_1.values)
if df_en_name_1['en_name'][0] == name_num_path[1]:
pass
else:
update_name_sql_1 = f'''update {self.site_name}_bs_category set delete_time = '2023-06-19 00:00:00' WHERE id={df_en_name_1['id'][0]} and delete_time is null'''
print('更新 en_name:', update_name_sql_1)
self.db_cursor_connect_update(update_name_sql_1, self.site_name)
break
except Exception as e:
print(e)
......@@ -820,7 +829,10 @@ class bsr_catgory(BaseUtils):
id_tuple = tuple(en_name_id_list)
print(len(id_tuple))
try:
update_sql = f'update {self.site_name}_bs_category set one_category_id={id[0]} where id in {id_tuple}'
if len(id_tuple) == 1:
update_sql = f"""UPDATE {self.site_name}_bs_category set one_category_id={id[0]} where id in ('{id_tuple[0]}')"""
else:
update_sql = f'update {self.site_name}_bs_category set one_category_id={id[0]} where id in {id_tuple}'
self.db_cursor_connect_update(update_sql, self.site_name)
except Exception as e:
print(e)
......
......@@ -34,13 +34,13 @@ class Save_asin_self(BaseUtils):
self.db_self_asin_detail = self.site_name + DB_REQUESTS_ASIN_PARAMS['db_self_asin_detail'][2:]
sql_read = "SELECT text_name FROM censored_thesaurus WHERE data_type='负面词汇'"
print(sql_read)
df = pd.read_sql(sql_read, con=self.engine)
df = self.engine.read_sql(sql_read)
self.text_list = list(df.text_name)
print('负面词汇:', self.text_list)
# asin_sql = f"SELECT asin,sku,erp_seller,{self.site_name}_upload_info,title,`describe` as describe_str ,selling from {self.site_name}_erp_asin_syn WHERE created_at>='2023-05-11' and asin_type=1;"
asin_sql = f"SELECT asin,sku,erp_seller,{self.site_name}_upload_info,title,`describe` as describe_str ,selling,is_variation,fulFillable from {self.site_name}_erp_asin_syn WHERE created_at>='2023-05-11';"
print('asin_sql::', asin_sql)
df_asin = pd.read_sql(asin_sql, con=self.engine)
df_asin = self.engine.read_sql(asin_sql)
self.asin_list = list(df_asin.asin)
print(len(self.asin_list))
df_asin[f'{self.site_name}_upload_info'].fillna('N/A', inplace=True)
......@@ -78,10 +78,15 @@ class Save_asin_self(BaseUtils):
# print(self_all_syn_sql)
self_all_syn_sql_1 = f'SELECT asin from {self.site_name}_self_real_spider WHERE asin in {asin_tuple} and state=4 and updated_at>="{self.time_strftime}"'
# print(self_all_syn_sql_1)
df_asin_error = pd.read_sql(self_all_syn_sql, con=self.engine)
df_asin_error_1 = pd.read_sql(self_all_syn_sql_1, con=self.engine)
asin_error_ = list(df_asin_error.asin)
asin_error_1 = list(df_asin_error_1.asin)
df_asin_error = self.engine.read_sql(self_all_syn_sql)
df_asin_error_1 = self.engine.read_sql(self_all_syn_sql_1)
asin_error_1 =[]
asin_error_ =[]
if not df_asin_error_1.empty:
asin_error_1 = list(df_asin_error_1.asin)
if not df_asin_error.empty:
asin_error_ = list(df_asin_error.asin)
asin_error_list = asin_error_1.extend(asin_error_)
if asin_error_list:
print("asin_error_list::", asin_error_list)
......@@ -101,12 +106,12 @@ class Save_asin_self(BaseUtils):
self.asin_list.remove(asin)
df = pd.DataFrame(data=sava_data,
columns=['asin', "sku", 'erp_seller', 'page_error'])
df.to_sql(f'{self.site_name}_erp_asin', con=self.engine, if_exists="append", index=False)
self.engine.to_sql(df,f'{self.site_name}_erp_asin', if_exists="append")
sava_data = []
asin_tuple = tuple(self.asin_list)
asin__detail_sql = f"SELECT asin,title,img_num,`describe`,category,page_inventory,search_category,product_description,img_type from {self.site_name}_self_asin_detail WHERE site='{self.site_name}' and created_at>='{self.time_strftime}' and asin in {asin_tuple};"
df_asin_detail = pd.read_sql(asin__detail_sql, con=self.engine)
df_asin_detail = self.engine.read_sql(asin__detail_sql)
fields_list = df_asin_detail.values.tolist()
for asin_data in fields_list:
data_list = []
......@@ -313,7 +318,7 @@ class Save_asin_self(BaseUtils):
df = pd.DataFrame(data=sava_data,
columns=['asin', "title_error", 'img_error', 'selling_error', 'search_ccategory_error',
'ccategory_error', 'buy_now_error', 'sku', 'erp_seller', 'describe_error'])
df.to_sql(f'{self.site_name}_erp_asin', con=self.engine, if_exists="append", index=False)
self.engine.to_sql(df,f'{self.site_name}_erp_asin', if_exists="append")
# def Compare_str(self,str1, str2):
# # 找出两个字符串中的最短长度
......
import pymysql
from params import DB_CONN_DICT,PG_CONN_DICT_14
import pandas as pd
import traceback
from sqlalchemy import create_engine
import time
import sys
import os
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from utils.secure_db_client import get_remote_engine
"""
每周三定时修改 feedback , product, 同步表修改状态 为 1 六个站点
"""
def run(site):
if site == 'us':
connect = pymysql.connect(host=DB_CONN_DICT['mysql_host'], port=DB_CONN_DICT['mysql_port'], user=DB_CONN_DICT['mysql_user'],
password=DB_CONN_DICT['mysql_pwd'], database="selection", charset="utf8mb4")
else:
connect = pymysql.connect(host=DB_CONN_DICT['mysql_host'], port=DB_CONN_DICT['mysql_port'], user=DB_CONN_DICT['mysql_user'],
password=DB_CONN_DICT['mysql_pwd'], database="selection_" + site, charset="utf8mb4")
if site == 'us':
engine_pg = create_engine(
f"postgresql+psycopg2://{PG_CONN_DICT_14['pg_user']}:{PG_CONN_DICT_14['pg_pwd']}@{PG_CONN_DICT_14['pg_host']}:{PG_CONN_DICT_14['pg_port']}/selection",
encoding='utf-8')
else:
engine_pg = create_engine(
f"postgresql+psycopg2://{PG_CONN_DICT_14['pg_user']}:{PG_CONN_DICT_14['pg_pwd']}@{PG_CONN_DICT_14['pg_host']}:{PG_CONN_DICT_14['pg_port']}/selection_{site}",
encoding='utf-8')
cursor = connect.cursor()
engine_mysql = get_remote_engine(
site_name=site, # -> database "selection"
db_type="mysql", # -> 服务端 alias "mysql"
)
engine_pg = get_remote_engine(
site_name=site, # -> database "selection"
db_type="postgresql_14_outer", # -> 服务端 alias "mysql"
)
# cursor = connect.cursor()
# 更改 feedback syn 表 状态为1
update_feedback_sql = f"update {site}_seller_account_syn_distinct set state = 1, product_state=1 and state!=12"
print(update_feedback_sql)
cursor.execute(update_feedback_sql)
connect.commit()
# 更改 店铺syn 表 状态为1
update_product_sql = f"update {site}_seller_account_product_syn set state = 1"
print(update_product_sql)
cursor.execute(update_product_sql)
connect.commit()
update_feedback_sql = f"update {site}_seller_account_syn set state = 1, product_state=1"
print(update_feedback_sql)
cursor.execute(update_feedback_sql)
connect.commit()
connect.close()
cursor.close()
with engine_mysql.begin() as conn_mysql:
update_feedback_sql = f"update {site}_seller_account_syn_distinct set state = 1, product_state=1 and state!=12"
conn_mysql.execute(update_feedback_sql)
# 更改 店铺syn 表 状态为1
update_product_sql = f"update {site}_seller_account_product_syn set state = 1"
print(update_product_sql)
conn_mysql.execute(update_product_sql)
update_feedback_sql = f"update {site}_seller_account_syn set state = 1, product_state=1"
print(update_feedback_sql)
conn_mysql.execute(update_feedback_sql)
if site in ('us'):
with engine_pg.begin() as conn:
conn.execute(update_feedback_sql)
......
......@@ -405,7 +405,7 @@ class H10():
if asin not in self.err_asin_list and self.useremail_state:
print('cerebro界面', self.site_name_url)
self.driver.get(f'https://members.helium10.com/cerebro?accountId={self.account_id}')
time.sleep(8)
time.sleep(10)
if 'You are viewing a demo of Cerebro' in self.driver.page_source:
print(self.email_name, '账号过期')
self.driver.refresh()
......@@ -496,14 +496,13 @@ class H10():
print('33333333333')
self.driver.execute_script(
"""document.querySelector("button[data-testid='runnewsearch']").click()""")
sleep(randint(10, 35))
sleep(randint(3, 8))
except:
print('点击 run 报错')
# 点击下载
time.sleep(8)
self.driver.execute_script('window.scrollBy(0, 600);')
time.sleep(1)
self.driver.execute_script('window.scrollBy(0, 300);')
time.sleep(2)
html = self.driver.page_source
if 'You have reached the limit of the uses' in html:
self.useremail_state = False
......@@ -519,8 +518,22 @@ class H10():
break
elif 'errorCodes.undefined' in html:
continue
sleep(randint(13, 28))
time.sleep(5)
self.verify()
time.sleep(2.5)
if 'Wrong entered data or no results' in html:
print('没有报告可下载2222', asin)
self.err_asin_list.append(asin)
break
elif 'Incorrect asins' in html:
print('中间框下载词 没有报告')
self.err_asins_adv_list.append(asin)
break
elif 'errorCodes.undefined' in html:
continue
time.sleep(5)
html = self.driver.page_source
resp = etree.HTML(html)
try:
div_class = resp.xpath(
......@@ -528,7 +541,7 @@ class H10():
except:
print('报错22222222222222')
if asinstype:
time.sleep(1.5)
time.sleep(2)
print('点击选择亚马逊精选 勾选')
try:
script = f"""
......@@ -618,7 +631,7 @@ class H10():
while True:
try:
sql = f"""SELECT DISTINCT sku,token from all_h10_syn where site='{self.site_url}' and state =1"""
print(sql, '2323324dd')
print(sql)
df = self.engine_us.read_sql(sql)
if not df.empty:
self.sku_data_list = list(df.sku + '|-|' + df.token)
......@@ -694,6 +707,7 @@ class H10():
with open(file_path, 'r', encoding='utf-8') as f:
f.read()
f.close()
print('找到文件:路径有效:',file_path)
return True
except:
print('文件路径不存在')
......@@ -720,60 +734,128 @@ class H10():
print('重新下载文件:', asin, path)
self.webdrvier_html(asin, None)
time.sleep(5)
time_strftime = time.strftime("%Y-%m-%d", time.localtime())
file_path = fr'{path}\{self.site_name_csv.upper()}_AMAZON_cerebro_{asin}_{time_strftime}.csv'
print('读取文件333333::', file_path)
columns = pd.read_csv(file_path, nrows=0).columns.tolist()
self.if_csv_path(file_path)
state = self.if_csv_path(file_path)
if state == False:
print('重新下载文件222:', asin, path)
self.webdrvier_html(asin, None)
self.if_csv_path(file_path)
# columns = pd.read_csv(file_path, nrows=0).columns.tolist()
#
# def contains_chinese(text):
# return bool(re.search(r'[\u4e00-\u9fff]', text))
# is_chinese_header = any(contains_chinese(col) for col in columns)
# if is_chinese_header:
# print("表头是中文")
# columns_to_include_zh = ['关键词词组', 'Cerebro IQ 得分', '搜索量', '搜索量趋势',
# '广告推广ASIN 数',
# '竞品数', 'CPR', '标题密度', '亚马逊推荐', '自然',
# '亚马逊推荐排名', '广告排名', '自然排名']
# df = pd.read_csv(file_path, usecols=columns_to_include_zh)
# # 中文 -> 英文映射
# df.rename(columns={
# '关键词词组': 'keyword',
# 'Cerebro IQ 得分': 'cerebro_iq_score',
# '搜索量': 'search_volume',
# '搜索量趋势': 'search_volume_trend',
# '广告推广ASIN 数': 'sponsored_asins',
# '竞品数': 'competing_product',
# 'CPR': 'cpr',
# '标题密度': 'title_desity',
# '亚马逊推荐': 'amazon_recommended',
# '自然': 'organic',
# '亚马逊推荐排名': 'amazon_recommended_rank',
# '广告排名': 'sponsored_rank',
# '自然排名': 'organic_rank'
# }, inplace=True)
# else:
# print("表头是英文")
# columns_to_include_en = ['Keyword Phrase', 'Cerebro IQ Score', 'Search Volume', 'Search Volume Trend',
# 'Sponsored ASINs',
# 'Competing Products', 'CPR', 'Title Density', 'Amazon Recommended', 'Organic',
# 'Amazon Rec. Rank', 'Sponsored Rank', 'Organic Rank']
# df = pd.read_csv(file_path, usecols=columns_to_include_en)
# df.rename(columns={
# 'Keyword Phrase': 'keyword',
# 'Cerebro IQ Score': 'cerebro_iq_score',
# 'Search Volume': 'search_volume',
# 'Search Volume Trend': 'search_volume_trend',
# 'Sponsored ASINs': 'sponsored_asins',
# 'Competing Products': 'competing_product',
# 'CPR': 'cpr',
# 'Title Density': 'title_desity',
# 'Amazon Recommended': 'amazon_recommended',
# 'Organic': 'organic',
# 'Amazon Rec. Rank': 'amazon_recommended_rank',
# 'Sponsored Rank': 'sponsored_rank',
# 'Organic Rank': 'organic_rank'
# }, inplace=True)
header_config = {
"chinese": {
"columns": ['关键词词组', 'Cerebro IQ 得分', '搜索量', '搜索量趋势',
'广告推广ASIN 数', '竞品数', 'CPR', '标题密度',
'亚马逊推荐', '自然', '亚马逊推荐排名', '广告排名', '自然排名'],
"rename_map": {
'关键词词组': 'keyword',
'Cerebro IQ 得分': 'cerebro_iq_score',
'搜索量': 'search_volume',
'搜索量趋势': 'search_volume_trend',
'广告推广ASIN 数': 'sponsored_asins',
'竞品数': 'competing_product',
'CPR': 'cpr',
'标题密度': 'title_desity',
'亚马逊推荐': 'amazon_recommended',
'自然': 'organic',
'亚马逊推荐排名': 'amazon_recommended_rank',
'广告排名': 'sponsored_rank',
'自然排名': 'organic_rank'
}
},
"english": {
"columns": ['Keyword Phrase', 'Cerebro IQ Score', 'Search Volume', 'Search Volume Trend',
'Sponsored ASINs', 'Competing Products', 'CPR', 'Title Density',
'Amazon Recommended', 'Organic', 'Amazon Rec. Rank',
'Sponsored Rank', 'Organic Rank'],
"rename_map": {
'Keyword Phrase': 'keyword',
'Cerebro IQ Score': 'cerebro_iq_score',
'Search Volume': 'search_volume',
'Search Volume Trend': 'search_volume_trend',
'Sponsored ASINs': 'sponsored_asins',
'Competing Products': 'competing_product',
'CPR': 'cpr',
'Title Density': 'title_desity',
'Amazon Recommended': 'amazon_recommended',
'Organic': 'organic',
'Amazon Rec. Rank': 'amazon_recommended_rank',
'Sponsored Rank': 'sponsored_rank',
'Organic Rank': 'organic_rank'
}
}
}
def contains_chinese(text):
"""判断字符串中是否包含中文"""
return bool(re.search(r'[\u4e00-\u9fff]', text))
is_chinese_header = any(contains_chinese(col) for col in columns)
if is_chinese_header:
print("表头是中文")
columns_to_include_zh = ['关键词词组', 'Cerebro IQ 得分', '搜索量', '搜索量趋势',
'广告推广ASIN 数',
'竞品数', 'CPR', '标题密度', '亚马逊推荐', '自然',
'亚马逊推荐排名', '广告排名', '自然排名']
df = pd.read_csv(file_path, usecols=columns_to_include_zh)
# 中文 -> 英文映射
df.rename(columns={
'关键词词组': 'keyword',
'Cerebro IQ 得分': 'cerebro_iq_score',
'搜索量': 'search_volume',
'搜索量趋势': 'search_volume_trend',
'广告推广ASIN 数': 'sponsored_asins',
'竞品数': 'competing_product',
'CPR': 'cpr',
'标题密度': 'title_desity',
'亚马逊推荐': 'amazon_recommended',
'自然': 'organic',
'亚马逊推荐排名': 'amazon_recommended_rank',
'广告排名': 'sponsored_rank',
'自然排名': 'organic_rank'
}, inplace=True)
else:
print("表头是英文")
columns_to_include_en = ['Keyword Phrase', 'Cerebro IQ Score', 'Search Volume', 'Search Volume Trend',
'Sponsored ASINs',
'Competing Products', 'CPR', 'Title Density', 'Amazon Recommended', 'Organic',
'Amazon Rec. Rank', 'Sponsored Rank', 'Organic Rank']
df = pd.read_csv(file_path, usecols=columns_to_include_en)
df.rename(columns={
'Keyword Phrase': 'keyword',
'Cerebro IQ Score': 'cerebro_iq_score',
'Search Volume': 'search_volume',
'Search Volume Trend': 'search_volume_trend',
'Sponsored ASINs': 'sponsored_asins',
'Competing Products': 'competing_product',
'CPR': 'cpr',
'Title Density': 'title_desity',
'Amazon Recommended': 'amazon_recommended',
'Organic': 'organic',
'Amazon Rec. Rank': 'amazon_recommended_rank',
'Sponsored Rank': 'sponsored_rank',
'Organic Rank': 'organic_rank'
}, inplace=True)
def detect_header_language(columns):
"""判断表头是否为中文"""
return "chinese" if any(contains_chinese(c) for c in columns) else "english"
columns = pd.read_csv(file_path, nrows=0).columns.tolist()
lang = detect_header_language(columns)
print("表头是中文" if lang == "chinese" else "表头是英文")
cfg = header_config[lang]
try:
df = pd.read_csv(file_path, usecols=cfg["columns"])
except ValueError as e:
missing = [col for col in cfg["columns"] if col not in columns]
raise ValueError(f"文件缺少以下列:{missing}") from e
df.rename(columns=cfg["rename_map"], inplace=True)
return df
def sava_data(self, path):
......
......@@ -17,8 +17,8 @@ import traceback
from datetime import datetime
import gzip
import json
from kafka.errors import KafkaError, KafkaTimeoutError
# from curl_cffi import requests as curl
from kafka.errors import KafkaTimeoutError
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
sess = requests.Session()
urllib3.disable_warnings()
......@@ -31,7 +31,6 @@ class async_asin_pg():
spider_int=spider_int)
self.spider_int = spider_int
self.reuests_para_val = self.save_asin_detail.reuests_para_val
self.kafuka_producer_str = self.save_asin_detail.kafuka_producer_str
self.redis14 = self.save_asin_detail.redis_db14
self.requests_error_asin_list = [] # 1
self.asin_not_found_list = [] # 4
......@@ -76,7 +75,7 @@ class async_asin_pg():
self.topic_detail_month = f'{self.site_name}_asin_detail_month_2025_{self.month_}'
self.topic_asin_html = f'asin_html_2025_{self.month_}'
self.asin_video_list = []
self.asin_buySales_list = []
def get_asin(self):
while True:
if self.queries_asin_queue.empty() == False:
......@@ -104,14 +103,16 @@ class async_asin_pg():
if is_variat == '1':
scraper_url = self.site_url + 'dp/' + query[0] + "?th=1&psc=1"
else:
scraper_url = self.site_url + 'dp/' + query[0]
scraper_url = self.site_url + 'dp/' + query[0] + '?th=1'
self.request_total_count_list.append(4)
print('scraper_url::', scraper_url)
try:
from urllib.parse import urlparse
sess.mount(self.site_url, py_ja3.DESAdapter())
resp = sess.get(scraper_url, headers=headers,
timeout=10, verify=False)
# with open(rf'D:\新建文件夹\requests_files\{self.site_name}_{asin}.html', 'w', encoding='utf-8')as f:
# with open(rf'{self.site_name}_22_{asin}.html', 'w', encoding='utf-8')as f:
# f.write(resp.text)
if self.reuests_para_val.check_amazon_yzm(resp):
self.yzm_err_total_list.append(1)
......@@ -130,7 +131,6 @@ class async_asin_pg():
response = resp.text
response_s = etree.HTML(response)
self.success_asin_total_list.append(3)
if self.reuests_para_val.check_amazon_not_page(response):
self.asin_not_found_list.append(asin)
continue
......@@ -215,7 +215,8 @@ class async_asin_pg():
'parent_asin': items["parentAsin"], 'div_id_list': items['div_id_list'],
'bundles_this_asins_json': items['bundles_this_asins_data_json'],
'video_m3u8_url': items["video_m3u8"], 'result_list_json': items['result_list_json'],
'bundle_asin_component_json':items['bundle_asin_component_json']
'bundle_asin_component_json':items['bundle_asin_component_json'],
'review_json_list':items['review_json_list'],'asin_buySales_list':items['asin_buySales_list']
}
if self.site_name in ['uk', 'de', 'fr', 'es', 'it']:
item['five_six_val'] = items['five_six_val']
......@@ -228,7 +229,7 @@ class async_asin_pg():
if item['variat_num'] > 0:
_url = self.site_url + 'dp/' + asin + "?th=1&psc=1"
else:
_url = self.site_url + 'dp/' + asin
_url = self.site_url + 'dp/' + asin + '?th=1'
print('第二次请求:', _url)
try:
_response_text = None
......@@ -267,9 +268,26 @@ class async_asin_pg():
item["seller_id"] = _items["seller_id"]
if item['seller_json'] is None:
item["seller_json"] = _items["seller_json"]
if item['five_star'] is None:
item['five_star'] = _items["five_star"]
if item['four_star'] is None:
item['four_star'] = _items["four_star"]
if item['four_star'] is None:
item['four_star'] = _items["four_star"]
if item['two_star'] is None:
item['two_star'] = _items["two_star"]
if item['one_star'] is None:
item['one_star'] = _items["one_star"]
if item['low_star'] is None:
item['low_star'] = _items["low_star"]
if item['category'] is None:
item['category'] = _items["category"]
if item['node_id'] is None:
item['node_id'] = _items["node_id"]
if item['review_json_list'] is None:
item['review_json_list'] = _items["review_json_list"]
except:
pass
print('itemitem:::', item)
_response_text_var = None
if item["buy_box_seller_type"] == 4 and item['page_inventory'] == 3 and item['variat_num'] > 0 and \
items["asin_variation_list"]:
......@@ -288,6 +306,7 @@ class async_asin_pg():
_to_items = ParseAsinUs(resp=_response_text_var, asin=asin, month=self.month_,
date_info=date_info,
site_name=self.site_name).xpath_html()
if item['buy_box_seller_type'] is None or item['buy_box_seller_type'] == 4:
item["buy_box_seller_type"] = _to_items["buy_box_seller_type"]
if item['account_name'] is None:
......@@ -320,6 +339,7 @@ class async_asin_pg():
if key in item['title']:
self.asin_not_sure_list.append(asin)
continue
print('itemitem:::', item)
# 上架时间 排名 重量 底部信息 如果都为None 重新抓取
if item["launch_time"] is None and item["rank"] is None and item['weight'] is None and item[
'product_detail_json'] is None and len(items['div_id_list']) < 1:
......@@ -346,9 +366,13 @@ class async_asin_pg():
item['img_list'] = json.dumps(items["all_img_video_list"])
else:
item['img_list'] = None
if item['asin_buySales_list']:
self.asin_buySales_list.extend(item['asin_buySales_list'])
self.item_queue.put(item)
if item['img_list'] is None:
item['img_list'] = []
# 获取字段值为None的字段名称写入redis进行统计
none_keys = [key for key, value in item.items() if
(value is None) or (value == -1 and key == 'price') or (
......@@ -365,8 +389,11 @@ class async_asin_pg():
if key in none_keys:
none_keys.remove(key)
log_time = time.strftime('%Y-%m-%d', time.localtime(time.time()))
self.redis14.rpush(f'{self.site_name}_{log_time}_asin_detail_is_none', *none_keys)
self.send_kafka(items=item, topic=self.topic_detail_month)
try:
self.redis14.rpush(f'{self.site_name}_{log_time}_asin_detail_is_none', *none_keys)
except:
pass
self.reuests_para_val.send_kafka(items=item, topic=self.topic_detail_month)
print(asin, 'rank 排名:', item['rank'])
if item['rank']:
if (item['rank'] < 9000):
......@@ -380,51 +407,25 @@ class async_asin_pg():
requests_num = 0
response_gzip = self.compress_string(response)
html_data = f'{self.site_name}|-||=|-|=||-|{asin}|-||=|-|=||-|{response_gzip}|-||=|-|=||-|{new_date}|-||=|-|=||-|{requests_num}'
self.send_kafka(html_data=html_data, topic=self.topic_asin_html)
self.reuests_para_val.send_kafka(html_data=html_data, topic=self.topic_asin_html)
else:
self.asin_not_div_id_dp_list.append(asin)
if 'Click the button below to continue shopping' in response:
self.requests_error_asin_list.append(query[0])
else:
print('状态13', asin)
self.asin_not_div_id_dp_list.append(asin)
continue
else:
print(f"当前线程-已完成-爬取-跳出循环")
break
def on_send_success(self, record_metadata):
print(f"消息发送成功: {record_metadata.topic}-{record_metadata.partition}-{record_metadata.offset}")
def on_send_error(self, excp):
print("消息发送失败", excp)
def send_kafka(self, items=None, html_data=None, topic=None, num=3):
print('向Kafka发送数据')
for i in range(3):
try:
if items:
del items['div_id_list']
future = self.kafuka_producer_str.send(topic, json.dumps(items))
future.add_callback(self.on_send_success).add_errback(self.on_send_error)
if html_data:
future = self.kafuka_producer_str.send(topic, html_data)
future.add_callback(self.on_send_success).add_errback(self.on_send_error)
print('向Kafka发送数据 发送成功')
break
except Exception as e:
print(e)
if i >= 1:
self.kafuka_producer_str = self.save_asin_detail.kafuka_producer_str
try:
self.kafuka_producer_str.flush(timeout=10)
except KafkaTimeoutError as e:
print("flush 超时,跳过这次等待:", e)
# 压缩字符串
def compress_string(self, input_string):
return gzip.compress(input_string.encode())
def init_list(self):
print("=======清空变量==========")
self.asin_buySales_list = []
self.asin_not_found_list = [] # 4
self.asin_not_sure_list = [] # 6
self.asin_not_foot_list = [] # 7
......@@ -448,7 +449,7 @@ class async_asin_pg():
self.bs_category_asin_list_pg = []
# 关闭redis
self.redis14.close()
self.kafuka_producer_str.close(timeout=10)
self.reuests_para_val.kafuka_producer_str.close(timeout=10)
self.asin_video_list = []
self.cookies_queue = Queue() # cookie队列
self.item_queue = Queue() # 存储 item 详情数据队列
......@@ -477,7 +478,7 @@ class async_asin_pg():
def run(self):
asin_list = self.save_asin_detail.read_db_data()
# asin_list = ['B0DSBTYG6W|2025-01|1|1|null|null']
# asin_list = ['B0D663T3W8|2025-01|1|1|null|null']
if asin_list:
for asin in asin_list:
self.queries_asin_queue.put(asin)
......@@ -487,7 +488,7 @@ class async_asin_pg():
for ck in cookies_dict.values():
self.cookies_queue.put(ck)
html_thread = []
for i in range(26):
for i in range(27):
thread2 = threading.Thread(target=self.get_asin)
thread2.start()
html_thread.append(thread2)
......@@ -496,9 +497,17 @@ class async_asin_pg():
# 存储数据
print('最后刷新kafka flush')
try:
self.kafuka_producer_str.flush(timeout=30)
self.reuests_para_val.kafuka_producer_str.flush(timeout=35)
except KafkaTimeoutError as e:
print("flush 超时,跳过这次等待:", e)
while True:
try:
if self.asin_buySales_list:
self.save_asin_detail.save_asin_not_buysales(self.asin_buySales_list)
break
except FunctionTimedOut as e:
print('断网', e)
while True:
try:
print('存储 asin bsr 文本 存储pg')
......@@ -539,22 +548,25 @@ class async_asin_pg():
# 清空变量,
new_date = datetime.now().strftime("%Y-%m-%d")
site_new_date = f'{self.site_name}_' + str(new_date)
if self.yzm_err_total_list:
print('验证码', len(self.yzm_err_total_list))
self.redis14.rpush(site_new_date, *self.yzm_err_total_list)
if self.asin_request_errp_total_list:
print('异常', len(self.asin_request_errp_total_list))
self.redis14.rpush(site_new_date, *self.asin_request_errp_total_list)
if self.success_asin_total_list:
print('成功', len(self.success_asin_total_list))
self.redis14.rpush(site_new_date, *self.success_asin_total_list)
if self.request_total_count_list:
print('总请求', len(self.request_total_count_list))
self.redis14.rpush(site_new_date, *self.request_total_count_list)
if self.hour_total_count_list:
new_date_hour = site_new_date + ':0-23'
self.redis14.rpush(new_date_hour, *self.hour_total_count_list)
self.init_list()
try:
if self.yzm_err_total_list:
print('验证码', len(self.yzm_err_total_list))
self.redis14.rpush(site_new_date, *self.yzm_err_total_list)
if self.asin_request_errp_total_list:
print('异常', len(self.asin_request_errp_total_list))
self.redis14.rpush(site_new_date, *self.asin_request_errp_total_list)
if self.success_asin_total_list:
print('成功', len(self.success_asin_total_list))
self.redis14.rpush(site_new_date, *self.success_asin_total_list)
if self.request_total_count_list:
print('总请求', len(self.request_total_count_list))
self.redis14.rpush(site_new_date, *self.request_total_count_list)
if self.hour_total_count_list:
new_date_hour = site_new_date + ':0-23'
self.redis14.rpush(new_date_hour, *self.hour_total_count_list)
self.init_list()
except:
pass
# if __name__ == '__main__':
# async_asin_pg(month=9, spider_int=1, week=14,site_name='us').run()
import sys
import os
import sys
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from amazon_params import py_ja3
from amazon_save_db.save_asin_detail_pg import Save_asin_detail
from utils.asin_parse import ParseAsinUs
from queue import Queue
import time
import re
from lxml import etree
import requests
......@@ -17,7 +16,7 @@ import traceback
from datetime import datetime
import gzip
import json
from kafka.errors import KafkaError, KafkaTimeoutError
from kafka.errors import KafkaTimeoutError
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
sess = requests.Session()
......@@ -214,7 +213,9 @@ class async_asin_pg():
'created_time': new_date, 'current_asin': items['current_asin'],
'parent_asin': items["parentAsin"], 'div_id_list': items['div_id_list'],
'bundles_this_asins_json': items['bundles_this_asins_data_json'],
'video_m3u8_url': items["video_m3u8"], 'result_list_json': items['result_list_json']
'video_m3u8_url': items["video_m3u8"], 'result_list_json': items['result_list_json'],
'bundle_asin_component_json': None, 'review_json_list': items['review_json_list'],
'asin_buySales_list': items['asin_buySales_list']
}
if self.site_name in ['uk', 'de', 'fr', 'es', 'it']:
item['five_six_val'] = items['five_six_val']
......@@ -379,7 +380,6 @@ class async_asin_pg():
def on_send_error(self, excp):
print("消息发送失败", excp)
def send_kafka(self, items=None, html_data=None, topic=None, num=3):
print('向Kafka发送数据')
for i in range(3):
......@@ -459,13 +459,52 @@ class async_asin_pg():
# 总请求 4
self.request_total_count_list = []
def run(self):
# asin_list = self.save_asin_detail.read_db_data()
asin_list = ['B0F1TKH4C1|2025-01|1|1|null|null',
'B0DQ413NZ3|2025-01|1|1|null|null',
'B0DYPCJMDZ|2025-01|1|1|null|null',
]
asin_list = ['B0FHPBN5BD|2025-01|1|1|null|null',
'B0F98H3X25|2025-01|1|1|null|null',
'B0FC5C8LYB|2025-01|1|1|null|null',
'B0F1C95Y7Z|2025-01|1|1|null|null',
'B0F1XPL81W|2025-01|1|1|null|null',
'B0FH6HXXKG|2025-01|1|1|null|null',
'B0FCDXQNKW|2025-01|1|1|null|null',
'B0FB8D2RZX|2025-01|1|1|null|null',
'B0F4QJ2PKL|2025-01|1|1|null|null',
'B0FTTSTYBH|2025-01|1|1|null|null',
'B0F1X7Y6HG|2025-01|1|1|null|null',
'B0FK4RJ8BQ|2025-01|1|1|null|null',
'B0FB31NQ6C|2025-01|1|1|null|null',
'B0F1XBNK8N|2025-01|1|1|null|null',
'B0F4R31W9G|2025-01|1|1|null|null',
'B0F2RZ7SQY|2025-01|1|1|null|null',
'B0FJL52XZL|2025-01|1|1|null|null',
'B0F1S7FC9Z|2025-01|1|1|null|null',
'B0FB3CGNWF|2025-01|1|1|null|null',
'B0F2SLP2JM|2025-01|1|1|null|null',
'B0FJ7YWTBC|2025-01|1|1|null|null',
'B0F1C95998|2025-01|1|1|null|null',
'B0FMRKGK1B|2025-01|1|1|null|null',
'B0F1NCNGCY|2025-01|1|1|null|null',
'B0FGHHZRDB|2025-01|1|1|null|null',
'B0FH6CRWJ3|2025-01|1|1|null|null',
'B0F4CGG71T|2025-01|1|1|null|null',
'B0F93LS2X4|2025-01|1|1|null|null',
'B0F8B343WJ|2025-01|1|1|null|null',
'B0F1CCJ6T8|2025-01|1|1|null|null',
'B0FPFKLV4W|2025-01|1|1|null|null',
'B0FB82RRNJ|2025-01|1|1|null|null',
'B0FBG8BNWR|2025-01|1|1|null|null',
'B0F1XD9PP4|2025-01|1|1|null|null',
'B0F1X9GPV4|2025-01|1|1|null|null',
'B0F4R1RKG7|2025-01|1|1|null|null',
'B0CM8VHPPG|2025-01|1|1|null|null',
'B0FPKC3VXL|2025-01|1|1|null|null',
'B0F9P17QZB|2025-01|1|1|null|null',
'B0FRLL5FRD|2025-01|1|1|null|null',
'B0FPX6QGC7|2025-01|1|1|null|null',
'B0FP97HMR6|2025-01|1|1|null|null',
]
if asin_list:
for asin in asin_list:
self.queries_asin_queue.put(asin)
......@@ -495,38 +534,29 @@ class async_asin_pg():
break
except FunctionTimedOut as e:
print('断网', e)
# if __name__ == '__main__':
# async_asin_pg(month=9, spider_int=1, week=14,site_name='us').run()
#
from datetime import datetime, timedelta
# 当前日期
today = datetime(2025, 7, 8)
# if __name__ == '__main__':
# async_asin_pg(month=9, spider_int=1, week=14, site_name='us').run()
# 向前推 447 天
delta = timedelta(days=119)
result_date = today - delta
#
# # 输出结果
print("447天前的日期是:", result_date.strftime("%Y-%m-%d"))
#
# from datetime import datetime
#
# # 当前日期
# today = datetime.today()
from datetime import datetime
# 当前日期
today = datetime.today()
#
# # 起始日期
# start_date = datetime(2025, 1, 24)
start_date = datetime(2025, 5, 7)
#
# # 相差天数
# delta_days = (today - start_date).days
# print('相差天数',delta_days)
delta_days = (today - start_date).days
print('相差天数',delta_days)
# # 除以30
# result = delta_days / 30
# print('每天销售',result)
result = delta_days / 30
print('每天销售',result)
print('累计销量', result * 9) # 每天* bsr月销
# print('每天销售* 月销售数量',result*110) # 每天销售* 月销售数量
# # 输出结果
# print(f"到今天相隔 {delta_days} 天")
# # print(1426*20.99)
\ No newline at end of file
print(f"到今天相隔 {delta_days} 天")
# # print(1426*20.99)
......@@ -250,7 +250,7 @@ class search_temp_pg(BaseUtils):
for search_url in search_term_list:
self.search_term_queue.put(search_url)
html_thread = []
for i in range(16):
for i in range(17):
thread2 = threading.Thread(target=self.get_search_kw, args=(i,))
html_thread.append(thread2)
for ti in html_thread:
......@@ -307,17 +307,23 @@ class search_temp_pg(BaseUtils):
self.engine_pg = self.pg_connect()
sql_read = f"""SELECT id, search_term, url FROM {self.db_search_term} where state=1 and month={self.month} LIMIT {self.read_size} for update;"""
print(sql_read)
self.df_read = self.engine.read_sql(sql_read)
# self.df_read = self.engine_pg.read_sql(sql_read)
self.df_read = self.engine_pg.read_then_update(
select_sql=sql_read,
update_table=self.db_search_term,
set_values={"state": 2}, # 把库存清零
where_keys=["id"], # WHERE sku = :sku
)
if self.df_read.shape[0] > 0:
self.id_tuple = tuple(self.df_read.id)
# self.id_tuple = tuple(self.df_read.id)
self.date_info = f'2025-{self.month}'
print('date_info::', self.date_info, ' 月:', self.month)
with self.engine_pg.begin() as conn:
if len(self.id_tuple) == 1:
sql_update = f'UPDATE {self.db_search_term} set state=2 where id in ({self.id_tuple[0]});'
else:
sql_update = f'UPDATE {self.db_search_term} set state=2 where id in {self.id_tuple};'
conn.execute(sql_update)
# with self.engine_pg.begin() as conn:
# if len(self.id_tuple) == 1:
# sql_update = f'UPDATE {self.db_search_term} set state=2 where id in ({self.id_tuple[0]});'
# else:
# sql_update = f'UPDATE {self.db_search_term} set state=2 where id in {self.id_tuple};'
# conn.execute(sql_update)
search_term_list = list(
self.df_read.id.astype("U") + '|-|' + self.df_read.search_term + '|-|' + self.df_read.url)
return search_term_list
......
......@@ -68,6 +68,9 @@ def select_sate_mysql(site, num=None, month=None, week=None):
engine_us_mysql = db_engine('us', 'mysql')
df = engine_us_mysql.read_sql(sql_select_)
if int(df.status_val[0]) in (1, 2):
redis_client = BaseUtils().redis_db()
lock_key = "ALL站点-asin同步-pg-api_lock"
lock = redis_client.lock(lock_key, timeout=15) # 10秒超时
update_workflow_progress = f"update workflow_progress set status_val=3,status='抓取结束' where page='反查搜索词' and date_info='2025-{week}' and site_name='{site}' and date_type='week'"
print('update_workflow_progress: 修改状态3 ', update_workflow_progress)
db_cursor_connect_update(update_workflow_progress, site)
......@@ -83,9 +86,7 @@ def select_sate_mysql(site, num=None, month=None, week=None):
ii += 1
if ii > 8:
break
redis_client = BaseUtils().redis_db()
lock_key = "ALL站点-asin同步-pg-api_lock"
lock = redis_client.lock(lock_key, timeout=5) # 10秒超时
if id_tuple is None:
DolphinschedulerHelper.start_process_instance_common(
project_name="big_data_selection",
......@@ -127,7 +128,7 @@ def long_time_task(site, proxy_name, month):
if __name__ == '__main__':
pppoe_ip()
site_list = ['us', 'de', 'uk']
site_list = ['us','de','uk']
month = int(sys.argv[1])
week = int(sys.argv[2])
proxy_name = None
......
import time
import sys
import os
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
import time
from redis.exceptions import LockError
from threading_spider.db_connectivity import connect_db
from threading_spider.post_to_dolphin import DolphinschedulerHelper
from utils.db_connect import BaseUtils
......@@ -84,7 +86,7 @@ def select_sate_mysql(site=None, num=None, page=None, month=None, week=None, spi
# 定义锁的键
redis_client = BaseUtils().redis_db()
lock_key = f"{year_week}_{site}_lock"
lock = redis_client.lock(lock_key, timeout=5) # 10秒超时
lock = redis_client.lock(lock_key, timeout=55) # 10秒超时
select_sql = f"select status_val from workflow_progress WHERE date_info='{year_week}' and date_type='week' and site_name='{site}' and page='ASIN详情'"
print(select_sql)
df_state = db_cursor_connect_msyql_read(select_sql)
......@@ -154,6 +156,7 @@ def select_sate_mysql(site=None, num=None, page=None, month=None, week=None, spi
update_workflow_progress = f"update workflow_progress set status_val=3,status='ASIN爬取完成',up_spider_state=3 where page='ASIN详情' and date_info='{year_week}' and site_name='{site}' and date_type='week'"
print(update_workflow_progress)
db_cursor_connect_update(update_workflow_progress, site)
db_class.send_mg('pengyanbing', '修改进度表', update_workflow_progress)
ii = 0
for i in range(10):
time.sleep(180)
......@@ -164,6 +167,7 @@ def select_sate_mysql(site=None, num=None, page=None, month=None, week=None, spi
update_month_asin_state = f"update workflow_progress set status_val=3,status='月ASIN抓取完成' WHERE site_name='{site}' and page='asin详情' and date_type='month' and status_val=1 and status='月ASIN导出完成' and date_info='{year_month}'"
print(update_month_asin_state)
db_cursor_connect_update(update_month_asin_state, site)
db_class.send_mg('pengyanbing', '修改 月 维度 进度表', update_month_asin_state)
update_month_spider_state = f"update workflow_progress set kafka_flow_state=1,spider_state=3,spider_int={spider_int} WHERE site_name='{site}' and date_type='month' and date_info='{year_month}' and page='ASIN详情'"
db_cursor_connect_update(update_month_spider_state, site)
DolphinschedulerHelper.start_process_instance_common(
......@@ -216,8 +220,8 @@ def select_sate_mysql(site=None, num=None, page=None, month=None, week=None, spi
# project_name="big_data_selection",
# process_df_name='ALL站点-启动30day/月流程',
# startParams={
# "site_name": "us",
# "site_name": "uk",
# "date_type": "month",
# "date_info": '2025-07'
# "date_info": '2025-10'
# }
# )
\ No newline at end of file
"解析asin详情数据"
import sys
import os
import html as html_module # 为标准库的 html 模块设置别名
import os
import sys
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
import re
......@@ -225,6 +225,74 @@ class ParseAsinUs(object):
def add_variation(self, asin, color, size, style, state, parentAsin, other_name):
self.asin_variation_list.append([asin, color, parentAsin, size, state, style, other_name])
def get_review(self, html, site_name):
reviews_all_dict = {site_name: "//span[@data-hook='cr-widget-FocalReviews']",
'other': "//span[@class='global-reviews-all']"}
review_json_list = []
for key_site, value_xpath in reviews_all_dict.items():
div_id_list = html.xpath(value_xpath + "//li[@data-hook='review']/@id")
for div_id in div_id_list:
user_href_list = html.xpath(
f"{value_xpath}//li[@id='{div_id}']//div[@class='a-row a-spacing-mini']/a/@href")
user_href = self.site_url + user_href_list[0] if user_href_list else None
user_img_list = html.xpath(
f"{value_xpath}//li[@id='{div_id}']//div[@class='a-row a-spacing-mini']//img/@data-src")
user_img = self.site_url + user_img_list[0] if user_img_list else None
user_name_list = html.xpath(
f"{value_xpath}//li[@id='{div_id}']//div[@class='a-row a-spacing-mini']//span[@class='a-profile-name']/text()")
user_name = user_name_list[0] if user_name_list else None
review_star_rating_list = html.xpath(
f"{value_xpath}//li[@id='{div_id}']//i[contains(@data-hook,'review-star-rating')]//text()")
review_star_rating = review_star_rating_list[0] if review_star_rating_list else None
if key_site == 'other':
review_title_list = html.xpath(
f"{value_xpath}//li[@id='{div_id}']//span[@data-hook='review-title']/span/text()")
review_title = review_title_list[0] if review_title_list else None
else:
review_title_list = html.xpath(
f"{value_xpath}//li[@id='{div_id}']//a[@data-hook='review-title']/span/text()")
review_title = review_title_list[0] if review_title_list else None
review_date_list = html.xpath(
f"{value_xpath}//li[@id='{div_id}']//span[@data-hook='review-date']/text()")
review_date = review_date_list[0] if review_date_list else None
review_href_list = html.xpath(
f"{value_xpath}//li[@id='{div_id}']//div[@class='a-row']//a/@href")
review_href = self.site_url + review_href_list[0] if review_href_list else None
var_data_list = html.xpath(
f"{value_xpath}//li[@id='{div_id}']//span[@data-hook='format-strip-linkless']//text()")
var_data = '|'.join(var_data_list) if var_data_list else None
var_asin_list = html.xpath(
f"{value_xpath}//li[@id='{div_id}']//div[@class='a-row a-spacing-mini review-data review-format-strip']//a/@href")
if var_asin_list:
varasin_list = re.findall(r'reviews/(.*)/ref', var_asin_list[0])
var_asin = varasin_list[0] if varasin_list else None
else:
var_asin = None
vp_list = html.xpath(
f"{value_xpath}//li[@id='{div_id}']//a[contains(@aria-label,'Verified Purchase')]//span/text()")
verified_purchase = vp_list[0] if vp_list else None
review_data_list = html.xpath(
f"{value_xpath}//li[@id='{div_id}']//span[@data-hook='review-body']//div[@data-hook='review-collapsed']/span/text()")
review_data_list = ''.join(review_data_list).strip()
review_data = review_data_list if review_data_list else None
items = {
'title': review_title,
'content': review_data,
'model': var_data,
'rating': review_star_rating,
'userName': user_name,
"commentTime": review_date,
"commentId": div_id,
'country': key_site,
}
review_json_list.append(items)
if review_json_list:
review_json = json.dumps(review_json_list,ensure_ascii=False)
return review_json
else:
return None
def xpath_html(self):
if self.site_name == "us":
from utils.params_asin_xpath import US_ASIN_XPATH as ASIN_XPATH
......@@ -290,10 +358,11 @@ class ParseAsinUs(object):
result[title] = asin_list
if result:
result_list.append(result)
h2_str_list = self.response_s.xpath('//h2[contains(@class,"a-spacing-medium")]/text()|//div[@class="a-column a-span8"]/h2[contains(@class,"carousel-heading")]/text()')
h2_str_list = self.response_s.xpath(
'//h2[contains(@class,"a-spacing-medium")]/text()|//div[@class="a-column a-span8"]/h2[contains(@class,"carousel-heading")]/text()')
if h2_str_list:
for h2_str in h2_str_list:
if h2_str !='Videos':
if h2_str != 'Videos':
data_asin_list = self.response_s.xpath(
f"""//h2[contains(text(),"{h2_str}")]/parent::div/parent::div//@data-asin|//h2[contains(text(),"{h2_str}")]/parent::div/parent::div/parent::div//@data-asin""")
print('h2_str_list::', h2_str, data_asin_list)
......@@ -473,10 +542,13 @@ class ParseAsinUs(object):
bundle_asin_point_list = self.response_s.xpath(
f"//a[contains(@href,'{bundle_component_asin}')]/parent::div/following-sibling::ul/li[contains(@id,'component-details-component-bullet-point')]/span/text()")
bundle_asin_point = '|-|'.join(bundle_asin_point_list) if bundle_asin_point_list else None
bundle_component_asin_item = {"bundle_component_asin":bundle_component_asin,"bundle_asin_title":bundle_asin_title,
'bundle_asin_img':bundle_asin_img,"bundle_asin_review":bundle_asin_review,
"bundle_asin_star":bundle_asin_star,"bundle_asin_price":bundle_asin_price,
"bundle_asin_point":bundle_asin_point}
bundle_component_asin_item = {"bundle_component_asin": bundle_component_asin,
"bundle_asin_title": bundle_asin_title,
'bundle_asin_img': bundle_asin_img,
"bundle_asin_review": bundle_asin_review,
"bundle_asin_star": bundle_asin_star,
"bundle_asin_price": bundle_asin_price,
"bundle_asin_point": bundle_asin_point}
bundle_asin_component_list.append(bundle_component_asin_item)
if bundle_asin_component_list:
bundle_asin_component_json = json.dumps(bundle_asin_component_list)
......@@ -506,7 +578,7 @@ class ParseAsinUs(object):
for td in td_list:
td_key_list = td.xpath('.//text()')
td_key = ''.join(td_key_list).strip()
td_value_list = td.xpath('./following-sibling::td//text()')
td_value_list = td.xpath('./following-sibling::td//span//text()')
try:
td_value = ''.join(td_value_list).strip()
td_dict[td_key] = td_value
......@@ -888,6 +960,7 @@ class ParseAsinUs(object):
star1 = stars_1_list[0]
else:
star1 = 0
low_star = int(star3) + int(star2) + int(star1)
# 评论分析
......@@ -1706,6 +1779,23 @@ class ParseAsinUs(object):
else:
best_sellers_herf = None
all_best_sellers_herf = None
if self.site_name == 'de':
for i in ASIN_XPATH['best_sellers_text']:
best_sellers_text_list = self.response_s.xpath(i)
if best_sellers_text_list:
all_bsr_category = '›'.join(best_sellers_text_list)
break
else:
all_bsr_category = None
if category is None and all_bsr_category:
bsr_category_list = re.findall(r' in (.*)', all_bsr_category)
category = bsr_category_list[0] if bsr_category_list else None
if node_id is None and best_sellers_herf:
node_id_list = re.findall(r'/(\d+)/ref=', best_sellers_herf)
node_id = node_id_list[0] if node_id_list else None
# rank 排名
for i in ASIN_XPATH['Best_rank']:
Best_rank_list_th = self.response_s.xpath(i)
......@@ -2081,6 +2171,7 @@ class ParseAsinUs(object):
Package = None
# 上架时间
try:
amazon_launch_time = None
tiem_dict = {"June": "6", "April": "4", "January": "1", "October": "10",
"November": "11", "August": "8",
"March": "3", "December": "12", "July": "7", "September": "9",
......@@ -2096,17 +2187,21 @@ class ParseAsinUs(object):
else:
Date_time = "0"
launch_time = None
if len(Date_time) > 1:
print('Date_time::L', Date_time)
time_s = re.findall(r"(.*?) ", Date_time)
time_ss = time_s[0]
amazon_launch_time = time_ss
t1 = tiem_dict.get(time_ss)
t2 = Date_time.replace(time_ss, t1)
try:
d2 = datetime.datetime.strptime(t2, '%m %d %Y') # 2007-06-28 00:00:00
except:
d2 = datetime.datetime.strptime(t2, '%d %m %Y') # 2007-06-28 00:00:00
launch_time = str(d2)
if launch_time is None:
for i in ASIN_XPATH['Date_time2']:
data_list = self.response_s.xpath(i)
......@@ -2122,6 +2217,7 @@ class ParseAsinUs(object):
month_ = re.findall(r'[A-Za-z]', data_time)
month_str = ''.join(month_)
_month = data_time.replace(month_str, tiem_dict.get(month_str))
amazon_launch_time = _month
try:
year_moth_day = datetime.datetime.strptime(_month, '%m %d %Y')
except:
......@@ -2130,6 +2226,7 @@ class ParseAsinUs(object):
break
else:
launch_time = None
elif self.site_name in ['de', 'fr', 'it', 'uk', 'es']:
if self.site_name == 'de':
tiem_dict = {"June": "6", "April": "4", "January": "1", "Oktober": "10", "October": "10",
......@@ -2178,6 +2275,7 @@ class ParseAsinUs(object):
time_ss = time_s[1]
t1 = tiem_dict.get(time_ss)
t2 = Date_time.replace(time_ss, t1)
amazon_launch_time = t2
try:
d2 = datetime.datetime.strptime(t2, '%m %d %Y')
except:
......@@ -2198,6 +2296,7 @@ class ParseAsinUs(object):
month_ = re.findall(r'[A-Za-z]', data_time)
month_str = ''.join(month_)
_month = data_time.replace(month_str, tiem_dict.get(month_str))
amazon_launch_time = _month
try:
year_moth_day = datetime.datetime.strptime(_month, '%m %d %Y')
except:
......@@ -2209,6 +2308,7 @@ class ParseAsinUs(object):
except Exception as e:
print(e, '时间报错')
launch_time = None
amazon_launch_time = None
# QA
for i in ASIN_XPATH['QA_num']:
askATF_list = self.response_s.xpath(i)
......@@ -2627,6 +2727,11 @@ class ParseAsinUs(object):
# 月销具体数值。如果有值拼接一起
# buy_sales_num_list
# 月销具体数值。如果有值拼接一起
# buy_sales_num_list
for i in ASIN_XPATH['buy_sales_num_list']:
buySales_num_list = self.response_s.xpath(i)
if buySales_num_list:
......@@ -2634,16 +2739,29 @@ class ParseAsinUs(object):
break
else:
buySales_num = None
# asin详情 月销售量
for i in ASIN_XPATH['buy_sales_list']:
buySales_list = self.response_s.xpath(i)
if buySales_list:
buySales = buySales_list[0].strip().replace(' ', '')
buySales_list2 = self.response_s.xpath(i)
if buySales_list2:
buySales = buySales_list2[0].strip().replace(' ', '')
if buySales_num:
buySales = buySales_num + buySales
asin_not_Sales = buySales_num + buySales
else:
asin_not_Sales = buySales
break
else:
buySales = None
asin_not_Sales = None
print('asin_not not _Sales:',asin_not_Sales)
buySales_list = self.response_s.xpath(
f'//div[@data-csa-c-asin="{self.asin}"]//span[contains(@id,"bought")]//text()|//span[contains(@id,"bought")]//text()')
print('buySales_list:::', buySales_list)
if buySales_list:
buy_Sales = ''.join(buySales_list)
buySales = buy_Sales.strip().replace(' ', '')
else:
buySales = None
if buySales:
if self.site_name == 'us' or self.site_name == 'uk':
if 'boughtinpast' in buySales:
......@@ -2655,6 +2773,20 @@ class ParseAsinUs(object):
pass
else:
buySales = None
if buySales:
if len(buySales) > 50:
buySales = None
asin_buySales_list = []
if asin_not_Sales and buySales is None:
asin_buy = self.asin
asin_buySales = asin_not_Sales
else:
asin_buy = None
asin_buySales = None
if asin_buy and asin_buySales:
asin_buySales_list.append([asin_buy, asin_buySales, self.date_info])
# 跟卖
for i in ASIN_XPATH['box_follow_list']:
buyBox_num_list = self.response_s.xpath(i)
......@@ -2668,28 +2800,6 @@ class ParseAsinUs(object):
break
else:
sellers_num = 1
# for i in ASIN_XPATH['buyBox']:
# # 卖家店铺链接
# buyBox = self.response_s.xpath(i)
# if buyBox:
# buyBox_name = buyBox[0].strip()
# break
# else:
# buyBox_name = None
#
# if buyBox_name is None:
# for i in ASIN_XPATH['buyBox1']:
# buyBox = self.response_s.xpath(i)
# if buyBox:
# if self.site_name == 'de':
# if ('Verkauf und Versand durch Amazon.' in buyBox[0].strip()):
# buyBox_name = 'Amazon'
# break
# else:
# buyBox_name = buyBox[-1].strip()
# break
# else:
# buyBox_name = None
for i in ASIN_XPATH['buyBox_url']:
buyBox_url = self.response_s.xpath(i)
......@@ -2826,10 +2936,12 @@ class ParseAsinUs(object):
seller_json = None
else:
seller_json = json.dumps(cleaned_data, ensure_ascii=False)
review_json = self.get_review(self.response_s, self.site_name)
item = {'asin': self.asin, 'week': self.week, 'month': self.month, 'title': title, 'img_url': image,
'rating': rating,
'total_comments': total_comments,
'price': price, "rank": rank, 'category': category, 'launch_time': launch_time,
'amazon_launch_time': amazon_launch_time,
'volume': Package,
'weight': Weight, "page_inventory": page_inventory,
"buy_box_seller_type": buy_box_seller_type,
......@@ -2860,7 +2972,9 @@ class ParseAsinUs(object):
'customer_reviews_json': customer_reviews_json, 'together_asin_json': together_asin_json,
'min_match_asin_json': min_match_asin_json, 'seller_json': seller_json, 'current_asin': current_asin,
'div_id_list': div_id_list, 'bundles_this_asins_data_json': bundles_this_asins_data_json,
'video_m3u8': video_m3u8, 'result_list_json': result_list_json,'bundle_asin_component_json':bundle_asin_component_json}
'video_m3u8': video_m3u8, 'result_list_json': result_list_json,
'bundle_asin_component_json': bundle_asin_component_json,
"review_json_list": review_json, 'asin_buySales_list': asin_buySales_list}
if self.site_name == 'us':
item['three_four_val'] = Join_Prime_int
elif self.site_name in ['uk', 'fr', 'it', 'es']:
......@@ -2872,5 +2986,5 @@ class ParseAsinUs(object):
return item
if __name__ == '__main__':
ParseAsinUs().xpath_html()
# if __name__ == '__main__':
# ParseAsinUs().xpath_html()
......@@ -222,11 +222,16 @@ DE_ASIN_XPATH = {
"brand2": ["//a[@id='amznStoresBylineLogoImageContainer']/following-sibling::a/text()"],
"ac_name": ["//span[@class='ac-keyword-link']/a/text()", "//span[@class='ac-for-text']/span/text()"],
"reviews": ['//*[@id="acrCustomerReviewText"]/text()', '//div[@class="a2s-pdd-reviews"]//a/span/text()'],
"star5": ["//a[@class='a-link-normal 5star']/@title|//a[contains(@href,'ref=acr_dp_hist_5')]/div[@class='a-section a-spacing-none a-text-right aok-nowrap']/text()"],
"star4": ["//a[@class='a-link-normal 4star']/@title|//a[contains(@href,'ref=acr_dp_hist_4')]/div[@class='a-section a-spacing-none a-text-right aok-nowrap']/text()"],
"star3": ["//a[@class='a-link-normal 3star']/@title|//a[contains(@href,'ref=acr_dp_hist_3')]/div[@class='a-section a-spacing-none a-text-right aok-nowrap']/text()"],
"star2": ["//a[@class='a-link-normal 2star']/@title|//a[contains(@href,'ref=acr_dp_hist_2')]/div[@class='a-section a-spacing-none a-text-right aok-nowrap']/text()"],
"star1": ["//a[@class='a-link-normal 1star']/@title|//a[contains(@href,'ref=acr_dp_hist_1')]/div[@class='a-section a-spacing-none a-text-right aok-nowrap']/text()"],
"star5": ["//a[@class='a-link-normal 5star']/@title|//a[contains(@href,'ref=acr_dp_hist_5')]/div[@class='a-section a-spacing-none a-text-right aok-nowrap']/text()","//a[contains(@href,'five_star')]/@aria-label",
"//a[contains(@href,'ref=acr_dp_hist_5')]/div[@class='a-section a-spacing-none a-text-right aok-nowrap']/text()"],
"star4": ["//a[@class='a-link-normal 4star']/@title|//a[contains(@href,'ref=acr_dp_hist_4')]/div[@class='a-section a-spacing-none a-text-right aok-nowrap']/text()","//a[contains(@href,'four_star')]/@aria-label",
"//a[contains(@href,'ref=acr_dp_hist_4')]/div[@class='a-section a-spacing-none a-text-right aok-nowrap']/text()"],
"star3": ["//a[@class='a-link-normal 3star']/@title|//a[contains(@href,'ref=acr_dp_hist_3')]/div[@class='a-section a-spacing-none a-text-right aok-nowrap']/text()","//a[contains(@href,'three_star')]/@aria-label",
"//a[contains(@href,'ref=acr_dp_hist_3')]/div[@class='a-section a-spacing-none a-text-right aok-nowrap']/text()"],
"star2": ["//a[@class='a-link-normal 2star']/@title|//a[contains(@href,'ref=acr_dp_hist_2')]/div[@class='a-section a-spacing-none a-text-right aok-nowrap']/text()","//a[contains(@href,'two_star')]/@aria-label",
"//a[contains(@href,'ref=acr_dp_hist_2')]/div[@class='a-section a-spacing-none a-text-right aok-nowrap']/text()"],
"star1": ["//a[@class='a-link-normal 1star']/@title|//a[contains(@href,'ref=acr_dp_hist_1')]/div[@class='a-section a-spacing-none a-text-right aok-nowrap']/text()","//a[contains(@href,'one_star')]/@aria-label",
"//a[contains(@href,'ref=acr_dp_hist_1')]/div[@class='a-section a-spacing-none a-text-right aok-nowrap']/text()"],
"material": ["//span[text()='Material']/ancestor-or-self::td/following-sibling::td/span/text()"],
"package_quantity": ["//label[contains(text(),'Package Quantity:')]/following-sibling::span/text()"],
"pattern_name": ["//span[contains(text(),'Pattern Name:')]/following-sibling::span/text()"],
......@@ -269,6 +274,9 @@ DE_ASIN_XPATH = {
"best_sellers_herf": ['//span[contains(text(),"Amazon Bestseller-Rang")]/parent::span//a/@href',
'//th[contains(text(),"Amazon Bestseller-Rang")]/following-sibling::td//a/@href'],
"best_sellers_text": ['//span[contains(text(),"Amazon Bestseller-Rang")]/parent::span//a/text()',
'//th[contains(text(),"Amazon Bestseller-Rang")]/following-sibling::td//a/text()'],
"Best_rank": ['//th[contains(text(),"Amazon Bestseller-Rang")]/following-sibling::td//text()',
'//span[contains(text(),"Amazon Bestseller-Rang")]/parent::span//text()'],
"Best_rank2": ['//th[contains(text(),"Amazon Bestseller")]/following-sibling::td//text()',
......
......@@ -488,12 +488,29 @@ class ParseSearchTermUs(object):
def parse_bs(self):
try:
asin_list = self.etree_html.xpath(
"//span[contains(text(),'estseller')]/parent::span//parent::span[contains(@id,'best-seller')]/@id|//span[contains(text(),'Seller')]/parent::span//parent::span[contains(@id,'best-seller')]/@id")
print('############## bsr_asin::', asin_list)
if len(asin_list):
asin_list = [asin.split("-")[0] for asin in asin_list if len(asin.split("-")[0]) >= 9]
self.bs_list.extend(self.parse_type_common(asin_list=asin_list, cate_type='sb'))
bsr_asin_xpath_list = [
"//div[@data-csa-c-content-id='BEST_SELLER']/parent::div/parent::div/parent::div/parent::span/parent::div//@data-csa-c-asin",
"//div[@data-csa-c-content-id='BEST_SELLER']/parent::div/parent::div/parent::div/parent::div/parent::div//@data-csa-c-asin",
"//div[@data-csa-c-content-id='BEST_SELLER']/parent::div/parent::div/parent::div//@data-csa-c-item-id",
"//div[@data-csa-c-content-id='BEST_SELLER']/parent::div/parent::div/parent::div/parent::span/parent::div//@data-csa-c-item-id",
"//div[@data-csa-c-content-id='BEST_SELLER']/parent::div/parent::div/parent::div/parent::div/parent::div//@data-csa-c-item-id"]
for bsr_asin_xpath in bsr_asin_xpath_list:
asin_list = self.etree_html.xpath(bsr_asin_xpath)
print('############## bsr_asin::', asin_list)
bsr_asin_list = []
if len(asin_list):
asin_list = [asin.split("-")[0] for asin in asin_list if len(asin.split("-")[0]) >= 9]
for asin in asin_list:
if len(asin)>10:
pattern = re.compile(r'(?<=amzn1\.asin\.)[A-Z0-9]{10}', re.I)
asins = pattern.findall(asin)
bsr_asin_list.append(asins)
else:
bsr_asin_list.append(asin)
print('############## bsr_asin::', bsr_asin_list)
self.bs_list.extend(self.parse_type_common(asin_list=bsr_asin_list, cate_type='sb'))
break
except Exception as e:
pass
......@@ -592,3 +609,12 @@ class ParseSearchTermUs(object):
"https://www.amazon.co.uk/dp/B09FLQD7VN?pd_rd_i=B09FLQD7VN&pd_rd_w=GwsFh&pf_rd_p=88aa1216-6e73-4bd1-9903-e6883ff8dae3&pd_rd_wg=2kZM8&pf_rd_r=P8P1KCGMPXS9XWH1NFQV&pd_rd_r=a7c81c84-a2aa-47ad-8bd9-055c75c99a28"
return (self.zr_list, self.sp_list, self.sb_list, self.ac_list,
self.bs_list, self.er_list, self.tr_list, self.sold_list, self.buy_text_list, self.hr_list)
# if __name__ == '__main__':
# with open(r'C:\Users\ASUS\Downloads\python2.html','r',encoding='utf-8')as f:
# response = f.read()
# parse_search_term = ParseSearchTermUs(page_source=response, driver=None, search_term='keywords',
# page=1, site_name='us')
# st_list = parse_search_term.run()
# zr_list, sp_list, sb_list, ac_list, bs_list, er_list, tr_list, sort_list, buy_text_list, hr_list = st_list
# print( zr_list, sp_list, sb_list, ac_list, bs_list, er_list, tr_list, sort_list, buy_text_list, hr_list )
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment