Commit 4418209b by Peng

no message

parent 859e586e
import json
import random
import time
import pandas as pd
import redis
from lxml import html
from playwright.sync_api import sync_playwright
from secure_db_client import get_remote_engine
def mysql_connect():
engine_us_mysql = get_remote_engine(
site_name='us', # -> database "selection"
db_type='mysql', # -> 服务端 alias "mysql"
)
return engine_us_mysql
def run(asin_list):
print('asin_list:::',asin_list)
print('asin_list:::',len(asin_list))
if asin_list:
# 初始化
with sync_playwright() as _playwright:
# _playwright.chromium.launch_persistent_context
browser = _playwright.chromium.launch_persistent_context(
# 指定本机用户缓存地址
user_data_dir=r"C:\Users\Administrator\AppData\Local\Google\Chrome\User Data",
# 指定本机google客户端exe的路径
executable_path=r"C:\Program Files\Google\Chrome\Application\chrome.exe",
# 要想通过这个下载文件这个必然要开 默认是False
accept_downloads=True,
# 设置不是无头模式
headless=False, # False 打开。 True 无头浏览器
bypass_csp=True,
locale='en-GB',
ignore_https_errors=True,
no_viewport=True,
slow_mo=10,
# 跳过检测
args=['--disable-blink-features=AutomationControlled', '--remote-debugging-port=9222']
)
page = browser.new_page()
js = """
Object.defineProperties(navigator, {webdriver:{get:()=>undefined}});
"""
page.add_init_script(js)
page.evaluate_handle('''() =>{ window.chrome = { runtime: {}, }; }''')
page.evaluate_handle(
'''() =>{ Object.defineProperty(navigator, 'plugins', { get: () => [1, 2, 3, 4, 5,6], }); }''')
# 模拟浏览器参数
page.locator("body").click()
js = """
Object.defineProperties(navigator, {webdriver:{get:()=>undefined}});
"""
page.add_init_script(js)
print('打开浏览器请求asin:')
page = browser.new_page()
try:
page.goto('https://sellercentral.amazon.com')
time.sleep(random.uniform(2, 5))
except:
save_asin_var_data(asin_list[0], json.dumps({"content": "网络有问题 登录账号失败。远程账号电脑检查"}), '失败')
for asin in asin_list:
time.sleep(random.uniform(1, 3))
try:
print('请求asin', asin)
url = f"https://sellercentral.amazon.com/listing/varwiz/search?searchText={asin}"
print('url:', url)
page.goto(url)
time.sleep(random.uniform(3, 8))
print()
print(page.content())
html_string = page.content()
time.sleep(0.5)
if 'The ASIN you searched for is not part of any variation' not in html_string:
doc = html.fromstring(html_string)
# 取第一个 <pre> 的文本内容(会自动去掉标签内 HTML)
pre_nodes = doc.xpath('//pre')
if not pre_nodes:
raise ValueError("找不到 <pre> 节点")
pre_text = pre_nodes[0].text_content().strip()
# 直接尝试解析(适用于 <pre> 里就是整段 JSON 的情况)
data_json = json.loads(pre_text)
print(data_json) # dict / list
print('获取完成', asin)
save_asin_var_data(asin, data_json, '成功')
else:
print('没有该asin,', asin)
save_asin_var_data(asin, json.dumps(
{"content": "The ASIN you searched for is not part of any variation family"}), '成功')
except Exception as e:
print('报错,‘23232323232323232323', e)
save_asin_var_data(asin, json.dumps({"content": "下载失败。远程账号电脑检查"}), '失败')
continue
def redis_get_asin():
asin_list = []
random_key_list = []
redis_client = redis.Redis(host='113.100.143.162', port=6379, db=10, password='fG7#vT6kQ1pX')
while True:
try:
print('轮询redis 查询,')
for i in range(10):
# 随机获取一个key
random_key = redis_client.randomkey()
if random_key:
random_key_list.append(random_key)
# 获取该key对应的value
value = redis_client.get(random_key)
value = value.decode('utf-8')
print('redis取出asin: ', value)
if value not in asin_list:
asin_list.append(value)
else:
break
if asin_list:
_asin_lis = list(set(asin_list))
print("_asin_lis:::",_asin_lis, )
print("_asin_lis::: len ", len(_asin_lis))
run(_asin_lis) # 传递asin 列表
asin_list = []
for _key in random_key_list:
print(' 删除redis的asin:', _key)
redis_client.delete(_key) # 删除redis的asin
random_key_list = []
else:
time.sleep(3)
continue
# redis_client.close() 关闭redis
except Exception as e:
print('查询redis报错', e)
redis_client.close()
redis_client = redis.Redis(host='192.168.10.224', port=6379, db=10, password='fG7#vT6kQ1pX')
time.sleep(5)
continue
def save_asin_var_data(asin, data_json, spider_value):
engine_us_mysql = mysql_connect()
workflow_everyday_list = [[asin, data_json, spider_value]]
print('存储数据:', len(workflow_everyday_list))
df_seller_asin_account = pd.DataFrame(data=workflow_everyday_list,
columns=['asin', 'asin_var_data', 'spider_value'])
engine_us_mysql.to_sql(df_seller_asin_account, 'us_asin_var_info')
if __name__ == '__main__':
redis_get_asin()
import requests
data = {
"username":"pengyanbing",
'password':"15112376559"
}
# url = 'http://192.168.2.28:5000/login'
# resp = requests.post(url,json=data)
url = 'http://192.168.2.28:5000/user/members/index'
resp = requests.get(url,headers={'inventory-token':'fFZ7P4XpSA6nxaH7Xw7aHQ'})
print(resp.content.decode('utf-8'))
DB_CONFIG = {
'host': '120.77.232.73',
'port': 3306,
'user': 'yswg_it_cangchu',
'password': 'Yswg@inv-cangchu241011420',
'db': 'inventory'
}
# REDIS_CONN = {
# "redis_host": "120.77.232.73",
# "redis_port": 6379,
# "redis_pwd": "yswgInventoryTest@202307#",
# "redis_db": 1
#
# }
REDIS_CONN = {
"redis_host": "113.100.143.162",
"redis_port": 6379,
"redis_pwd": "fG7#vT6kQ1pX",
"redis_db": 14
}
\ No newline at end of file
import pymysql
from params import DB_CONN_DICT,PG_CONN_DICT_14
import pandas as pd
import traceback
from sqlalchemy import create_engine
import time
"""
每周三定时修改 feedback , product, 同步表修改状态 为 1 六个站点
"""
def run(site):
if site == 'us':
connect = pymysql.connect(host=DB_CONN_DICT['mysql_host'], port=DB_CONN_DICT['mysql_port'], user=DB_CONN_DICT['mysql_user'],
password=DB_CONN_DICT['mysql_pwd'], database="selection", charset="utf8mb4")
else:
connect = pymysql.connect(host=DB_CONN_DICT['mysql_host'], port=DB_CONN_DICT['mysql_port'], user=DB_CONN_DICT['mysql_user'],
password=DB_CONN_DICT['mysql_pwd'], database="selection_" + site, charset="utf8mb4")
if site == 'us':
engine_pg = create_engine(
f"postgresql+psycopg2://{PG_CONN_DICT_14['pg_user']}:{PG_CONN_DICT_14['pg_pwd']}@{PG_CONN_DICT_14['pg_host']}:{PG_CONN_DICT_14['pg_port']}/selection",
encoding='utf-8')
else:
engine_pg = create_engine(
f"postgresql+psycopg2://{PG_CONN_DICT_14['pg_user']}:{PG_CONN_DICT_14['pg_pwd']}@{PG_CONN_DICT_14['pg_host']}:{PG_CONN_DICT_14['pg_port']}/selection_{site}",
encoding='utf-8')
cursor = connect.cursor()
# 更改 feedback syn 表 状态为1
update_feedback_sql = f"update {site}_seller_account_syn_distinct set state = 1, product_state=1 and state!=12"
print(update_feedback_sql)
cursor.execute(update_feedback_sql)
connect.commit()
# 更改 店铺syn 表 状态为1
update_product_sql = f"update {site}_seller_account_product_syn set state = 1"
print(update_product_sql)
cursor.execute(update_product_sql)
connect.commit()
update_feedback_sql = f"update {site}_seller_account_syn set state = 1, product_state=1"
print(update_feedback_sql)
cursor.execute(update_feedback_sql)
connect.commit()
connect.close()
cursor.close()
if site in ('us'):
with engine_pg.begin() as conn:
conn.execute(update_feedback_sql)
conn.execute(update_product_sql)
conn.execute(update_feedback_sql)
if __name__ == '__main__':
run('us')
run('de')
run('uk')
run('fr')
run('es')
run('it')
\ No newline at end of file
from secure_db_client import get_remote_engine
import platform
from sqlalchemy import create_engine
import pandas as pd
import datetime
from amazon_params.params import DB_CONN_DICT, REDIS_CONN, PG_CONN_DICT
from params import DB_CONN_DICT, PG_CONN_DICT_21, REDIS_CONN, PG_CONN_DICT
import redis
import time
from collections import Counter
import json
import codecs
def update_state(site):
if site == 'us':
......@@ -17,7 +20,7 @@ def update_state(site):
engine = create_engine(
f'mysql+pymysql://{DB_CONN_DICT["mysql_user"]}:' + f'{DB_CONN_DICT["mysql_pwd"]}@{DB_CONN_DICT["mysql_host"]}:{DB_CONN_DICT["mysql_port"]}/{db}?charset=utf8mb4') # , pool_recycle=3600
engine_spider = create_engine(
f"postgresql+psycopg2://postgres:fazAqRRVV9vDmwDNRNb593ht5TxYVrfTyHJSJ3BS@61.145.136.61:54328/{db}",
f"postgresql+psycopg2://postgres:F9kL2sXe81rZq@61.145.136.61:54328/{db}",
encoding='utf-8')
engine_pg6 = create_engine(
f"postgresql+psycopg2://{PG_CONN_DICT['pg_user']}:{PG_CONN_DICT['pg_pwd']}@{PG_CONN_DICT['pg_host']}:{PG_CONN_DICT['pg_port']}/{db}",
......@@ -62,12 +65,12 @@ def update_state(site):
sql_update_month = f"update {site}_search_term_month_syn set state = 1 where state = 2 and updated_time <'{up_time}'"
print(sql_update_month)
conn_14.execute(sql_update_month)
seller_sql = f"update {site}_seller_account_syn_distinct set state = 1 where state = 2 and updated_at <'{up_time}';"
print('修改店铺信息抓取表状态:',seller_sql)
conn_14.execute(seller_sql)
seller_sql_product = f"update {site}_seller_account_syn_distinct set product_state = 1 where product_state = 2 and updated_at <'{up_time}';"
print('修改店铺asin抓取表状态',seller_sql_product)
conn_14.execute(seller_sql_product)
# seller_sql = f"update {site}_seller_account_syn_distinct set state = 1 where state = 2 and updated_at <'{up_time}';"
# print('修改店铺信息抓取表状态:',seller_sql)
# conn_14.execute(seller_sql)
# seller_sql_product = f"update {site}_seller_account_syn_distinct set product_state = 1 where product_state = 2 and updated_at <'{up_time}';"
# print('修改店铺asin抓取表状态',seller_sql_product)
# conn_14.execute(seller_sql_product)
select_sql = f"select count(id) from {site}_all_syn_st_month_2025_{min_month} where state=1"
df = pd.read_sql(select_sql, con=engine_spider)
id_count_min = df.iloc[0, 0]
......@@ -150,7 +153,7 @@ def get_redis_data(site_name, engine_pg6, engine_spider):
list_data.append(count)
print(list_data)
new_date_hour = f'{site_name}_' + str(new_date) + ':0-23'
print(new_date_hour)
print(new_date_hour,'13223')
list_hour_data = redis14.lrange(new_date_hour, start_index, end_index)
# 使用 Counter 统计元素出现次数
element_counts_hour = Counter(list_hour_data)
......@@ -204,19 +207,40 @@ def get_redis_data(site_name, engine_pg6, engine_spider):
print(asin_column_json)
# 验证码 1 异常 2 成功 3 总请求 4
with engine_pg6.begin() as conn:
# 如果只有一个 URL,用单引号括起来
sql_update = f"""UPDATE {site_name}_count_request_data
SET asin_request_err_total={list_data[1]},
code_err_total={list_data[0]},
success_asin_total={list_data[2]},
request_total_count={list_data[3]},
hour_asin_total='{hour_data_json}',
asin_column_json='{asin_column_json}',
remain_asin_total = {remain_asin_total}
WHERE date_info='{new_date}'"""
# sql_update = f"UPDATE {site_name}_count_request_data SET asin_request_err_total={list_data[1]},code_err_total={list_data[0]},success_asin_total={list_data[2]},request_total_count={list_data[3]},hour_asin_total='{hour_data_json}',asin_column_json='{asin_column_json}' where date_info='{new_date}'"
print(sql_update)
conn.execute(sql_update)
sql_upsert = f"""
INSERT INTO {site_name}_count_request_data (
date_info,
asin_request_err_total,
code_err_total,
success_asin_total,
request_total_count,
hour_asin_total,
asin_column_json,
remain_asin_total
) VALUES (
'{new_date}',
{list_data[1]},
{list_data[0]},
{list_data[2]},
{list_data[3]},
'{hour_data_json}',
'{asin_column_json}',
{remain_asin_total}
)
ON CONFLICT (date_info) DO UPDATE SET
asin_request_err_total = EXCLUDED.asin_request_err_total,
code_err_total = EXCLUDED.code_err_total,
success_asin_total = EXCLUDED.success_asin_total,
request_total_count = EXCLUDED.request_total_count,
hour_asin_total = EXCLUDED.hour_asin_total,
asin_column_json = EXCLUDED.asin_column_json,
remain_asin_total = EXCLUDED.remain_asin_total;
"""
print(sql_upsert)
conn.execute(sql_upsert)
if __name__ == '__main__':
......
import os
import sys
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from secure_db_client import get_remote_engine
from curl_cffi import requests
from utils.db_connect import BaseUtils
import re
from lxml import etree
os.environ['NO_PROXY'] = 'amazon.com'
import json
from urllib.parse import urlparse
class Amazon_reviewer():
def __init__(self, site_name='us'):
if site_name == "us":
self.site_url = 'https://www.amazon.com'
self.host = 'www.amazon.com'
elif site_name == 'uk':
self.site_url = 'https://www.amazon.co.uk' # 站点url
self.host = 'www.amazon.co.uk'
elif site_name == 'de':
self.site_url = 'https://www.amazon.de'
self.host = 'www.amazon.de'
elif site_name == 'fr':
self.site_url = 'https://www.amazon.fr'
self.host = 'www.amazon.fr'
elif site_name == 'es':
self.site_url = 'https://www.amazon.es'
self.host = 'www.amazon.es'
elif site_name == 'it':
self.site_url = 'https://www.amazon.it'
self.host = 'www.amazon.it'
def pg_connect(self):
engine_pg15 = get_remote_engine(
site_name='us', # -> database "selection"
db_type='postgresql_15_outer', # -> 服务端 alias "mysql"
)
return engine_pg15
def redis_db(self):
redis14_ = BaseUtils().redis_db()
headers_json = redis14_.get('amaozn_login_dict')
self.cookeis_dict = json.loads(headers_json)
redis14_.close()
def get_asin_reviewer(self, asin='0740303090'):
headers = {
'authority': urlparse(self.site_url).hostname,
'host': self.host,
"x-requested-with": "XMLHttpRequest",
"accept": "text/html,*/*",
"content-type": "application/x-www-form-urlencoded;charset=UTF-8",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36",
"origin": self.site_url,
"accept-language": "zh-CN,zh;q=0.9",
}
url = f'{self.site_url}/product-reviews/B00CX547FE/ref=cm_cr_getr_d_paging_btm_next_1?sortBy=recent&pageNumber=1'
response = requests.get(url, headers=headers, cookies=self.cookeis_dict)
resp = etree.HTML(response.text)
with open(r'C:\Users\ASUS\Desktop\text.html', 'w', encoding='utf-8')as f:
f.write(response.text)
div_list = resp.xpath("//div[@id='cm_cr-review_list']/ul/li")
for div in div_list:
user_href_list = div.xpath(".//div[@class='a-row a-spacing-mini']/a/@href")
user_href = self.site_url + user_href_list[0] if user_href_list else None
user_img_list = div.xpath(".//div[@class='a-row a-spacing-mini']//img/@data-src")
user_img = self.site_url + user_img_list[0] if user_img_list else None
user_name_list = div.xpath(".//div[@class='a-row a-spacing-mini']//span[@class='a-profile-name']/text()")
user_name = user_name_list[0] if user_name_list else None
review_star_rating_list = div.xpath(".//div[@class='a-row']//i[@data-hook='review-star-rating']//text()")
review_star_rating = review_star_rating_list[0] if review_star_rating_list else None
review_title_list = div.xpath(".//div[@class='a-row']//a/span/text()")
review_title = review_title_list[0] if review_title_list else None
review_date_list = div.xpath(".//span[@data-hook='review-date']/text()")
review_date = review_date_list[0] if review_date_list else None
review_href_list = div.xpath(".//div[@class='a-row']//a/@href")
review_href = self.site_url + review_href_list[0] if review_href_list else None
var_data_list = div.xpath(".//div[@class='a-row a-spacing-mini review-data review-format-strip']//a/text()")
var_data = '||'.join(var_data_list) if var_data_list else None
var_asin_list = div.xpath(".//div[@class='a-row a-spacing-mini review-data review-format-strip']//a/@href")
if var_asin_list:
varasin_list = re.findall(r'reviews/(.*)/ref', var_asin_list[0])
var_asin = varasin_list[0] if varasin_list else None
else:
var_asin = None
vp_list = div.xpath(".//a[contains(@aria-label,'Verified Purchase')]//span/text()")
verified_purchase = vp_list[0] if vp_list else None
review_data_list = div.xpath(
".//div[@class='a-row a-spacing-small review-data']/span[@data-hook='review-body']//text()")
review_data_list = ''.join(review_data_list).strip()
review_data = review_data_list if review_data_list else None
review_img_list = div.xpath(".//img[@data-hook='review-image-tile']/@src")
print('review_img_list::', review_img_list)
if review_img_list:
review_img = ','.join(review_img_list).strip()
else:
review_img = None
items = {'user_name': user_name, 'user_img': user_img, "user_href": user_href,
'review_star_rating': review_star_rating,
'review_title': review_title, "review_date": review_date, "review_href": review_href,
"var_data": var_data,
'var_asin': var_asin, "is_vp": verified_purchase, "review_data": review_data,
"review_data_img": review_img}
print(items)
def run(self):
self.redis_db()
self.get_asin_reviewer()
if __name__ == '__main__':
Amazon_reviewer().run()
# from lxml import etree
#
# with open(r'C:\Users\ASUS\Desktop\text.html','r',encoding='utf-8')as f:
# resp = f.read()
#
# html = etree.HTML(resp)
# h2_str = html.xpath('//h2[contains(@class,"a-spacing-medium")]/text()')
# print(h2_str)
# data_asin_list = html.xpath(f"//h2[contains(text(),'{h2_str[0]}')]/parent::div/parent::div//@data-asin")
# print(data_asin_list)
import curl_cffi
headers = {
"Referer": "https://depatisnet.dpma.de/DepatisNet/depatisnet",
"Origin": "https://depatisnet.dpma.de",
"Accept": "application/json, text/javascript, */*; q=0.01",
"Accept-Encoding": "gzip, deflate, br, zstd",
"Accept-Language": "zh-CN,zh-TW;q=0.9,zh;q=0.8",
"Cache-Control": "no-cache",
"User-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36",
}
resp = curl_cffi.get('https://depatisnet.dpma.de/DepatisNet/depatisnet?window=1&space=main&content=treffer&action=textpdf&docid=CN000119456546B',headers=headers)
print(resp.text)
\ No newline at end of file
DEFAULT_USER = "fangxingjun"
DEFAULT_USER_TOKEN = "fxj_token_123"
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment