Commit 31856c55 by Peng

no message

parent 7a812382
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$/.." vcs="Git" />
</component>
</project>
\ No newline at end of file
# Default ignored files
/shelf/
/workspace.xml
# Editor-based HTTP Client requests
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="jdk" jdkName="Python 3.7" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
<component name="PyDocumentationSettings">
<option name="format" value="PLAIN" />
<option name="myDocStringFormat" value="Plain" />
</component>
</module>
\ No newline at end of file
<component name="InspectionProjectProfileManager">
<profile version="1.0">
<option name="myName" value="Project Default" />
<inspection_tool class="PyInterpreterInspection" enabled="false" level="WARNING" enabled_by_default="false" />
</profile>
</component>
\ No newline at end of file
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Black">
<option name="sdkName" value="Python 3.11" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.7" project-jdk-type="Python SDK" />
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/amazon_spider.iml" filepath="$PROJECT_DIR$/.idea/amazon_spider.iml" />
</modules>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$/.." vcs="Git" />
</component>
</project>
\ No newline at end of file
{
"dependencies": {
"crypto-js": "^4.2.0",
"execjs": "^1.0.7",
"jsdom": "^26.0.0",
"jsencrypt": "^3.3.2",
"pako": "^2.1.0"
}
}
# Default ignored files
/shelf/
/workspace.xml
# Editor-based HTTP Client requests
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="jdk" jdkName="Python 3.7" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
<component name="PyDocumentationSettings">
<option name="format" value="PLAIN" />
<option name="myDocStringFormat" value="Plain" />
</component>
</module>
\ No newline at end of file
<component name="InspectionProjectProfileManager">
<profile version="1.0">
<option name="myName" value="Project Default" />
<inspection_tool class="PyInterpreterInspection" enabled="false" level="WARNING" enabled_by_default="false" />
</profile>
</component>
\ No newline at end of file
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Black">
<option name="sdkName" value="Python 3.8" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.7" project-jdk-type="Python SDK" />
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/amazon_spider.iml" filepath="$PROJECT_DIR$/.idea/amazon_spider.iml" />
</modules>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$/../.." vcs="Git" />
</component>
</project>
\ No newline at end of file
import os
os.environ['NO_PROXY'] = 'stackoverflow.com'
import logging
logging.captureWarnings(True)
from DrissionPage import ChromiumPage, ChromiumOptions
import time
from datetime import datetime, timedelta
from time import sleep
from random import randint
import requests
import math
import pandas as pd
import redis
import json
from pathlib import Path
import re
from bs4 import BeautifulSoup
import difflib
import os
class Amazon_dif():
def __init__(self):
self.page = ChromiumPage()
# 修改请求头
self.headers = {
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'accept-language': 'zh-CN,zh;q=0.9',
'cache-control': 'no-cache',
'pragma': 'no-cache',
'priority': 'u=0, i',
'sec-ch-ua': '"Google Chrome";v="137", "Chromium";v="137", "Not/A)Brand";v="24"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'sec-fetch-dest': 'document',
'sec-fetch-mode': 'navigate',
'sec-fetch-site': 'none',
'sec-fetch-user': '?1',
'upgrade-insecure-requests': '1',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36',
}
self.url = "https://sellercentral.amazon.com/help/hub/reference/external/GVACXTSVATE36M4M?locale=zh-CN&ref=as_cn_ags_policy_na_product&initialSessionID=000-9225692-1269734&ld=NSBing&pageName=CN%3AAS%3AGS-policy"
def get_html(self):
try:
self.page.get(self.url)
self.page.maximize_window()
sleep(randint(5, 10))
current_html = self.page.page_source
return current_html
except Exception as e:
print(e)
return None
def read_file(self, file_path):
if not os.path.exists(file_path):
print(f"文件 {file_path} 不存在")
return ""
with open(file_path, 'r', encoding='utf-8') as f:
return f.read()
def extract_text(self, html_content):
soup = BeautifulSoup(html_content, 'html.parser')
return soup.get_text().strip()
def compare_texts(self, text1, text2):
matcher = difflib.SequenceMatcher(None, text1, text2)
differences = []
for tag, i1, i2, j1, j2 in matcher.get_opcodes():
if tag == 'replace':
differences.append(f"[修改] 原文: '{text1[i1:i2]}' → 新文: '{text2[j1:j2]}'")
elif tag == 'delete':
differences.append(f"[删除] 内容: '{text1[i1:i2]}'")
elif tag == 'insert':
differences.append(f"[新增] 内容: '{text2[j1:j2]}'")
return differences
def run(self):
# current_html = self.get_html()
with open('amazon_current.html', 'r', encoding='utf-8') as f:
current_html = f.read()
if not current_html:
return
old_html = self.read_file('amazon.html')
current_text = self.extract_text(current_html)
old_text = self.extract_text(old_html)
print("开始对比文本内容...")
differences = self.compare_texts(old_text, current_text)
if differences:
print("发现以下差异:")
for line in differences:
print(line)
else:
print("文本内容一致,没有变化。")
self.page.quit()
if __name__ == '__main__':
Amazon_dif().run()
import os
os.environ['NO_PROXY'] = 'stackoverflow.com'
import logging
logging.captureWarnings(True)
from DrissionPage import ChromiumPage, ChromiumOptions
import time
from datetime import datetime, timedelta
from time import sleep
from random import randint
import requests
import math
import pandas as pd
import redis
import json
from pathlib import Path
import re
from bs4 import BeautifulSoup
import difflib
import os
class Amazon_dif():
def __init__(self):
self.page = ChromiumPage()
# 修改请求头
self.headers = {
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'accept-language': 'zh-CN,zh;q=0.9',
'cache-control': 'no-cache',
'pragma': 'no-cache',
'priority': 'u=0, i',
'sec-ch-ua': '"Google Chrome";v="137", "Chromium";v="137", "Not/A)Brand";v="24"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'sec-fetch-dest': 'document',
'sec-fetch-mode': 'navigate',
'sec-fetch-site': 'none',
'sec-fetch-user': '?1',
'upgrade-insecure-requests': '1',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36',
# 'cookie': 'session-id=131-7473319-0965428; i18n-prefs=USD; ubid-main=133-5822818-2169950; lc-main=en_US; session-id-time=2082787201l; session-token=Smj0Ndc7BGb8LkC6cYMxfBHWaVMx1RsTt8H+UMSKIhKpBxUfCbhvsCyUQ5/h57GMrZ5cb1KqO8y9sFAeHas20J9Jeo4TIR5ZTWBqHb6ttKR5dsjo/MjQSTEikqeMaESjIx46hfGBdeNPbtMRgOfQukbH2IMbMq/qscttii8Mcbc7dCZ09BG3bAndPQCfzFruVMoovxKAInJMeabQgCny7vR2uJOl+RTmA3q92O2T3IIGDvHT7FTkCdognP5YIJaEllc+3weeWGfVQLWIMr0jR7z5JPvS2/Sr5cnqCS3dnECHgH55FJhHbGXR1XWZyjzNFkrUkDE0OLeHL0UB3XQFb+USlVXhfgM0; ph_phc_tGCcl9SINhy3N0zy98fdlWrc1ppQ67KJ8pZMzVZOECH_posthog=%7B%22distinct_id%22%3A%220197aa4c-f598-7f50-a1d4-f80d94f60ef8%22%2C%22%24sesid%22%3A%5B1750909709968%2C%220197aa4c-f59c-7e50-b1d3-d2deccf9be85%22%2C1750908859804%5D%7D',
}
self.url = "https://sellercentral.amazon.com/help/hub/reference/external/GVACXTSVATE36M4M?locale=zh-CN&ref=as_cn_ags_policy_na_product&initialSessionID=000-9225692-1269734&ld=NSBing&pageName=CN%3AAS%3AGS-policy"
def get_html(self):
try:
self.page.get(self.url)
self.page.maximize_window()
sleep(randint(5, 10))
current_html = self.page.page_source
return current_html
except Exception as e:
print(e)
return None
def read_file(self, file_path):
if not os.path.exists(file_path):
print(f"文件 {file_path} 不存在")
return ""
with open(file_path, 'r', encoding='utf-8') as f:
return f.read()
def extract_text(self, html_content):
soup = BeautifulSoup(html_content, 'html.parser')
return soup.get_text().strip()
def compare_texts(self, text1, text2):
matcher = difflib.SequenceMatcher(None, text1, text2)
differences = []
for tag, i1, i2, j1, j2 in matcher.get_opcodes():
if tag == 'replace':
differences.append(f"[修改] 原文: '{text1[i1:i2]}' → 新文: '{text2[j1:j2]}'")
elif tag == 'delete':
differences.append(f"[删除] 内容: '{text1[i1:i2]}'")
elif tag == 'insert':
differences.append(f"[新增] 内容: '{text2[j1:j2]}'")
return differences
def run(self):
# current_html = self.get_html()
with open('amazon_current.html', 'r', encoding='utf-8') as f:
current_html = f.read()
if not current_html:
return
old_html = self.read_file('amazon.html')
current_text = self.extract_text(current_html)
old_text = self.extract_text(old_html)
print("开始对比文本内容...")
differences = self.compare_texts(old_text, current_text)
if differences:
print("发现以下差异:")
for line in differences:
print(line)
else:
print("文本内容一致,没有变化。")
self.page.quit()
if __name__ == '__main__':
Amazon_dif().run()
# Default ignored files
/shelf/
/workspace.xml
# Editor-based HTTP Client requests
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>
\ No newline at end of file
<component name="InspectionProjectProfileManager">
<profile version="1.0">
<option name="myName" value="Project Default" />
<inspection_tool class="PyInterpreterInspection" enabled="false" level="WARNING" enabled_by_default="false" />
</profile>
</component>
\ No newline at end of file
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Black">
<option name="sdkName" value="Python 3.11" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.11" project-jdk-type="Python SDK" />
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/amazon_details.iml" filepath="$PROJECT_DIR$/.idea/amazon_details.iml" />
</modules>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$/../../.." vcs="Git" />
</component>
</project>
\ No newline at end of file
import platform
# 需要用到的表
DB_REQUESTS_ASIN_PARAMS = {
"db_syn": "us_all_syn_st",
"db_variat": "us_variat", # 更改
"db_cookies": "us_cookies",
"db_us_erp_asin_syn": "us_erp_asin_syn",
"db_us_erp_asin": "us_erp_asin",
"db_seller_account_syn": "us_seller_account_syn",
"db_seller_asin_account": "us_seller_asin_account",
"db_asin_image": "us_asin_image",
"db_seller_account_feedback_5000_asin_syn": "us_seller_account_feedback_5000_asin_syn",
"db_amazon_seller_account": "amazon_seller_account",
"db_ip_16yun": "ip_16yun",
"db_request_param_val": "us_request_param_val",
"db_seller_account_feedback": "us_seller_account_feedback",
"db_self_asin_detail": "us_self_asin_detail",
"db_seller_account_product_syn": "us_seller_account_product_syn",
"db_seller_account_feedback_new": "us_seller_account_feedback_report",
"db_competitive_aisn_syn": "us_competitive_aisn_syn",
"db_competitive_aisn": "us_competitive_aisn",
"db_asin_detail_product": "fr_seller_asin_product",
"db_search_term": "us_search_term",
"db_variation_total": "us_variation_total",
"db_asin_star": "uk_asin_star",
"db_asin_add_cart": "us_asin_fb_together",
"db_brand_asin": "us_brand_asin",
"db_bs_category_asin_detail": "fr_bs_category_asin_detail",
"db_asin_of_concern": "us_asin_of_concern",
"db_potential_product_asin_syn": "us_potential_product_asin_syn",
"db_potential_product_asin": "us_potential_product_asin",
'db_user_collection_syn': 'user_collection_syn',
'db_self_product_detail': "us_self_product_detail",
"int_threading_num": 9,
"if_ip_num_proxy": 100,
"if_ip_num": 20,
"requests_timeout": 15,
"de_requests_timeout": 15,
"de_int_threading_num": 8,
"feedback_threading_num": 8
}
DB_SEARCH_TERM_PARAMS_SPIDER = {
"db_search_term": "us_search_term",
"db_asin_detail_simply": "us_asin_detail_simply",
"db_search_term_serverip": "us_search_term_serverip",
"db_search_term_zr": "us_search_term_rank_zr",
"db_search_term_sp": "us_search_term_rank_sp",
"db_search_term_sb": "us_search_term_rank_sb",
"db_search_term_ac": "us_search_term_rank_ac",
"db_search_term_bs": "us_search_term_rank_bs",
"db_search_term_er": "us_search_term_rank_er",
"db_search_term_hr": "us_search_term_rank_hr",
"db_search_term_tr": "us_search_term_rank_tr",
"db_other_search_term": "us_other_search_term",
"us_brand_analytics": "us_brand_analytics",
}
DB_SEARCH_TERM_PARAMS_SPIDER_MYSQL = {
"db_search_term": "us_search_term",
"db_asin_detail_simply": "us_asin_detail_simply",
"db_search_term_serverip": "us_search_term_serverip",
"db_search_term_zr": "us_search_term_rank_zr",
"db_search_term_sp": "us_search_term_rank_sp",
"db_search_term_sb": "us_search_term_rank_sb",
"db_search_term_ac": "us_search_term_rank_ac",
"db_search_term_bs": "us_search_term_rank_bs",
"db_search_term_er": "us_search_term_rank_er",
"db_search_term_hr": "us_search_term_rank_hr",
"db_search_term_tr": "us_search_term_rank_tr",
"db_potential_product_st_syn": "us_potential_product_st_syn",
"db_potential_product_st": "us_potential_product_st",
"db_other_search_term": "us_other_search_term",
"us_brand_analytics": "us_brand_analytics",
}
# 连接mysql数据库参数
DB_CONN_DICT = {
"mysql_port": 3306,
"mysql_db": "selection",
"mysql_user": "adv_yswg",
"mysql_pwd": "Gd1pGJog1ysLMLBdML8w81",
"mysql_host": "rm-wz9yg9bsb2zf01ea4yo.mysql.rds.aliyuncs.com",
}
# 连接pg14数据库参数 113.100.143.162:5432
if platform.system().lower() == 'windows':
PG_CONN_DICT = {
"pg_port": 5432,
"pg_db": "selection",
"pg_user": "postgres",
"pg_pwd": "fazAqRRVV9vDmwDNRNb593ht5TxYVrfTyHJSJ3BS",
"pg_host": "192.168.10.223",
}
else:
PG_CONN_DICT = {
"pg_port": 54328,
"pg_db": "selection",
"pg_user": "postgres",
"pg_pwd": "fazAqRRVV9vDmwDNRNb593ht5TxYVrfTyHJSJ3BS",
"pg_host": "61.145.136.61",
}
# 连接pg6数据库参数 113.100.143.162:5432
if platform.system().lower() == 'windows':
PG_CONN_DICT_6 = {
"pg_port": 5432,
"pg_db": "selection",
"pg_user": "postgres",
"pg_pwd": "fazAqRRVV9vDmwDNRNb593ht5TxYVrfTyHJSJ3BS",
"pg_host": "113.100.143.162",
}
else:
PG_CONN_DICT_6 = {
"pg_port": 5432,
"pg_db": "selection",
"pg_user": "postgres",
"pg_pwd": "fazAqRRVV9vDmwDNRNb593ht5TxYVrfTyHJSJ3BS",
"pg_host": "113.100.143.162",
}
# 连接pg12数据库参数 113.100.143.162:5443
if platform.system().lower() == 'windows':
PG_CONN_DICT_21 = {
"pg_port": 5443,
"pg_db": "selection",
"pg_user": "postgres",
"pg_pwd": "fazAqRRVV9vDmwDNRNb593ht5TxYVrfTyHJSJ3BS",
"pg_host": "113.100.143.162",
}
else:
PG_CONN_DICT_21 = {
"pg_port": 5443,
"pg_db": "selection",
"pg_user": "postgres",
"pg_pwd": "fazAqRRVV9vDmwDNRNb593ht5TxYVrfTyHJSJ3BS",
"pg_host": "113.100.143.162",
}
# doris
DORIS_CONN = {
"doris_port": 19030,
"doris_db": "selection",
"doris_user": "root",
"doris_host": "113.100.143.162",
"doris_pwd": ""
}
# redis
REDIS_CONN = {
"redis_host": "113.100.143.162",
"redis_port": 6379,
"redis_pwd": "yswg2023",
"redis_db": 14
}
# 连接starrocks数据库参数
if platform.system().lower() == 'windows':
starrocks_CONN = {
"mysql_port": 19030,
"mysql_db": "us_spider",
"mysql_user": "pengyanbing",
"mysql_pwd": "pengyanbing12345",
"mysql_host": "113.100.143.162"
}
else:
starrocks_CONN = {
"mysql_port": 19030,
"mysql_db": "us_spider",
"mysql_user": "pengyanbing",
"mysql_pwd": "pengyanbing12345",
"mysql_host": "192.168.10.151"
}
\ No newline at end of file
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.ssl_ import create_urllib3_context
import random
ORIGIN_CIPHERS = ('ECDH+AESGCM:DH+AESGCM:ECDH+AES256:DH+AES256:ECDH+AES128:DH+AES:ECDH+HIGH:'
'DH+HIGH:ECDH+3DES:DH+3DES:RSA+AESGCM:RSA+AES:RSA+HIGH:RSA+3DES')
class DESAdapter(HTTPAdapter):
def __init__(self, *args, **kwargs):
"""
A TransportAdapter that re-enables 3DES support in Requests.
"""
CIPHERS = ORIGIN_CIPHERS.split(':')
random.shuffle(CIPHERS)
CIPHERS = ':'.join(CIPHERS)
md5_list = [':!aNULL:!eNULL:!MD5', ':!aNULL:!MD5:!DSS']
self.CIPHERS = CIPHERS + random.choice(md5_list)
# self.CIPHERS = CIPHERS + ':!aNULL:!MD5:!DSS'
super().__init__(*args, **kwargs)
def init_poolmanager(self, *args, **kwargs):
context = create_urllib3_context(ciphers=self.CIPHERS)
kwargs['ssl_context'] = context
return super(DESAdapter, self).init_poolmanager(*args, **kwargs)
def proxy_manager_for(self, *args, **kwargs):
context = create_urllib3_context(ciphers=self.CIPHERS)
kwargs['ssl_context'] = context
return super(DESAdapter, self).proxy_manager_for(*args, **kwargs)
\ No newline at end of file
This source diff could not be displayed because it is too large. You can view the blob instead.
import subprocess
import time
import requests
import random
def is_internet_available():
try:
n = random.randint(70, 114)
ua = f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{n}.0.{random.randint(1000, 5000)}.{random.randint(1, 181)} Safari/537.36'
headers = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Cache-Control': 'no-cache',
'User-Agent': ua,
}
r = requests.get("https://www.baidu.com", timeout=5,headers=headers)
print(r.status_code)
return True
except Exception as e:
print(e)
return False
def pppoe_ip():
nums = 0
while True:
try:
print('开始切换ip')
result = subprocess.run(['sh', '/root/pppoe.sh'], capture_output=True, text=True, check=True)
print(result.stdout)
time.sleep(2)
print(result.stderr) # 捕获stderr输出
print('执行 sh 脚本 完成')
time.sleep(1)
# 要执行的系统命令
# 调用函数检测网络连接是否可用
if is_internet_available():
print("有网络连接")
# 设置共享值为True,表示操作已经完成
break
else:
print("没有网络连接",nums)
nums += 3
time.sleep(nums * 2)
except subprocess.CalledProcessError as e:
print('切换ip 失败',e)
continue
if __name__ == '__main__':
is_internet_available()
\ No newline at end of file
import re
import pandas as pd
class CheckAsinDetail(object):
def __init__(self):
self.df_asin_detail = pd.DataFrame()
def check_asin(self, asin):
if len(asin) != 10:
return f"Error_asin_01: 长度异常--{asin}"
elif not bool(re.match(r'^[a-zA-Z0-9]+$', asin)):
return f"Error_asin_02: 包含非数字或者字母异常--{asin}"
def check_price(self, price):
pass
def run(self):
self.check_asin()
self.check_price()
import platform
# 需要用到的表
DB_REQUESTS_ASIN_PARAMS = {
"db_syn": "us_all_syn_st",
"db_variat": "us_variat", # 更改
"db_cookies": "us_cookies",
"db_us_erp_asin_syn": "us_erp_asin_syn",
"db_us_erp_asin": "us_erp_asin",
"db_seller_account_syn": "us_seller_account_syn",
"db_seller_asin_account": "us_seller_asin_account",
"db_asin_image": "us_asin_image",
"db_seller_account_feedback_5000_asin_syn": "us_seller_account_feedback_5000_asin_syn",
"db_amazon_seller_account": "amazon_seller_account",
"db_ip_16yun": "ip_16yun",
"db_request_param_val": "us_request_param_val",
"db_seller_account_feedback": "us_seller_account_feedback",
"db_self_asin_detail": "us_self_asin_detail",
"db_seller_account_product_syn": "us_seller_account_product_syn",
"db_seller_account_feedback_new": "us_seller_account_feedback_report",
"db_competitive_aisn_syn": "us_competitive_aisn_syn",
"db_competitive_aisn": "us_competitive_aisn",
"db_asin_detail_product": "fr_seller_asin_product",
"db_search_term": "us_search_term",
"db_variation_total": "us_variation_total",
"db_asin_star": "uk_asin_star",
"db_asin_add_cart": "us_asin_fb_together",
"db_brand_asin": "us_brand_asin",
"db_bs_category_asin_detail": "fr_bs_category_asin_detail",
"db_asin_of_concern": "us_asin_of_concern",
"db_potential_product_asin_syn": "us_potential_product_asin_syn",
"db_potential_product_asin": "us_potential_product_asin",
'db_user_collection_syn': 'user_collection_syn',
'db_self_product_detail': "us_self_product_detail",
"int_threading_num": 9,
"if_ip_num_proxy": 100,
"if_ip_num": 20,
"requests_timeout": 15,
"de_requests_timeout": 15,
"de_int_threading_num": 8,
"feedback_threading_num": 8
}
DB_SEARCH_TERM_PARAMS_SPIDER = {
"db_search_term": "us_search_term",
"db_asin_detail_simply": "us_asin_detail_simply",
"db_search_term_serverip": "us_search_term_serverip",
"db_search_term_zr": "us_search_term_rank_zr",
"db_search_term_sp": "us_search_term_rank_sp",
"db_search_term_sb": "us_search_term_rank_sb",
"db_search_term_ac": "us_search_term_rank_ac",
"db_search_term_bs": "us_search_term_rank_bs",
"db_search_term_er": "us_search_term_rank_er",
"db_search_term_hr": "us_search_term_rank_hr",
"db_search_term_tr": "us_search_term_rank_tr",
"db_other_search_term": "us_other_search_term",
"us_brand_analytics": "us_brand_analytics",
}
DB_SEARCH_TERM_PARAMS_SPIDER_MYSQL = {
"db_search_term": "us_search_term",
"db_asin_detail_simply": "us_asin_detail_simply",
"db_search_term_serverip": "us_search_term_serverip",
"db_search_term_zr": "us_search_term_rank_zr",
"db_search_term_sp": "us_search_term_rank_sp",
"db_search_term_sb": "us_search_term_rank_sb",
"db_search_term_ac": "us_search_term_rank_ac",
"db_search_term_bs": "us_search_term_rank_bs",
"db_search_term_er": "us_search_term_rank_er",
"db_search_term_hr": "us_search_term_rank_hr",
"db_search_term_tr": "us_search_term_rank_tr",
"db_potential_product_st_syn": "us_potential_product_st_syn",
"db_potential_product_st": "us_potential_product_st",
"db_other_search_term": "us_other_search_term",
"us_brand_analytics": "us_brand_analytics",
}
# 连接mysql数据库参数
DB_CONN_DICT = {
"mysql_port": 3306,
"mysql_db": "selection",
"mysql_user": "adv_yswg",
"mysql_pwd": "Gd1pGJog1ysLMLBdML8w81",
"mysql_host": "rm-wz9yg9bsb2zf01ea4yo.mysql.rds.aliyuncs.com",
}
# 连接pg14数据库参数 113.100.143.162:5432
if platform.system().lower() == 'windows':
PG_CONN_DICT = {
"pg_port": 5432,
"pg_db": "selection",
"pg_user": "postgres",
"pg_pwd": "fazAqRRVV9vDmwDNRNb593ht5TxYVrfTyHJSJ3BS",
"pg_host": "192.168.10.223",
}
else:
PG_CONN_DICT = {
"pg_port": 54328,
"pg_db": "selection",
"pg_user": "postgres",
"pg_pwd": "fazAqRRVV9vDmwDNRNb593ht5TxYVrfTyHJSJ3BS",
"pg_host": "61.145.136.61",
}
# 连接pg6数据库参数 113.100.143.162:5432
if platform.system().lower() == 'windows':
PG_CONN_DICT_6 = {
"pg_port": 5432,
"pg_db": "selection",
"pg_user": "postgres",
"pg_pwd": "fazAqRRVV9vDmwDNRNb593ht5TxYVrfTyHJSJ3BS",
"pg_host": "113.100.143.162",
}
else:
PG_CONN_DICT_6 = {
"pg_port": 5432,
"pg_db": "selection",
"pg_user": "postgres",
"pg_pwd": "fazAqRRVV9vDmwDNRNb593ht5TxYVrfTyHJSJ3BS",
"pg_host": "113.100.143.162",
}
# 连接pg12数据库参数 113.100.143.162:5443
if platform.system().lower() == 'windows':
PG_CONN_DICT_21 = {
"pg_port": 5443,
"pg_db": "selection",
"pg_user": "postgres",
"pg_pwd": "fazAqRRVV9vDmwDNRNb593ht5TxYVrfTyHJSJ3BS",
"pg_host": "113.100.143.162",
}
else:
PG_CONN_DICT_21 = {
"pg_port": 5443,
"pg_db": "selection",
"pg_user": "postgres",
"pg_pwd": "fazAqRRVV9vDmwDNRNb593ht5TxYVrfTyHJSJ3BS",
"pg_host": "113.100.143.162",
}
# doris
DORIS_CONN = {
"doris_port": 19030,
"doris_db": "selection",
"doris_user": "root",
"doris_host": "113.100.143.162",
"doris_pwd": ""
}
# redis
REDIS_CONN = {
"redis_host": "113.100.143.162",
"redis_port": 6379,
"redis_pwd": "yswg2023",
"redis_db": 14
}
# 连接starrocks数据库参数
if platform.system().lower() == 'windows':
starrocks_CONN = {
"mysql_port": 19030,
"mysql_db": "us_spider",
"mysql_user": "pengyanbing",
"mysql_pwd": "pengyanbing12345",
"mysql_host": "113.100.143.162"
}
else:
starrocks_CONN = {
"mysql_port": 19030,
"mysql_db": "us_spider",
"mysql_user": "pengyanbing",
"mysql_pwd": "pengyanbing12345",
"mysql_host": "192.168.10.151"
}
\ No newline at end of file
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.ssl_ import create_urllib3_context
import random
ORIGIN_CIPHERS = ('ECDH+AESGCM:DH+AESGCM:ECDH+AES256:DH+AES256:ECDH+AES128:DH+AES:ECDH+HIGH:'
'DH+HIGH:ECDH+3DES:DH+3DES:RSA+AESGCM:RSA+AES:RSA+HIGH:RSA+3DES')
class DESAdapter(HTTPAdapter):
def __init__(self, *args, **kwargs):
"""
A TransportAdapter that re-enables 3DES support in Requests.
"""
CIPHERS = ORIGIN_CIPHERS.split(':')
random.shuffle(CIPHERS)
CIPHERS = ':'.join(CIPHERS)
md5_list = [':!aNULL:!eNULL:!MD5', ':!aNULL:!MD5:!DSS']
self.CIPHERS = CIPHERS + random.choice(md5_list)
# self.CIPHERS = CIPHERS + ':!aNULL:!MD5:!DSS'
super().__init__(*args, **kwargs)
def init_poolmanager(self, *args, **kwargs):
context = create_urllib3_context(ciphers=self.CIPHERS)
kwargs['ssl_context'] = context
return super(DESAdapter, self).init_poolmanager(*args, **kwargs)
def proxy_manager_for(self, *args, **kwargs):
context = create_urllib3_context(ciphers=self.CIPHERS)
kwargs['ssl_context'] = context
return super(DESAdapter, self).proxy_manager_for(*args, **kwargs)
\ No newline at end of file
This source diff could not be displayed because it is too large. You can view the blob instead.
import sys
import os
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
# from get_cookie import Get_cookie
import requests
import logging
logging.captureWarnings(True)
import os
from lxml import etree
import re
os.environ['NO_PROXY'] = 'stackoverflow.com'
from datetime import datetime, timedelta
from sqlalchemy import create_engine, delete
import pandas as pd
import json
from multiprocessing import Pool
import time
from all_connect import ConnectSpider
Con = ConnectSpider()
import threading
from queue import Queue
import queue
from switch_ip import pppoe_ip
import random
from ast import literal_eval
from cookie_list import cookie_list
from amazon_params import py_ja3
from search_term import search_terms
class Get_asin:
def __init__(self):
self.headers = {
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'accept-language': 'zh-CN,zh;q=0.9',
'sec-ch-ua': '"Google Chrome";v="123", "Not:A-Brand";v="8", "Chromium";v="123"',
'sec-ch-ua-full-version': '"123.0.6312.59"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-model': '""',
'sec-ch-ua-platform': '"Windows"',
'sec-ch-ua-platform-version': '"10.0.0"',
'sec-fetch-dest': 'document',
'sec-fetch-mode': 'navigate',
'sec-fetch-site': 'none',
'sec-fetch-user': '?1',
'upgrade-insecure-requests': '1',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36',
}
self.item_queue = Queue()
self.MAX_RETRIES = 3
def get_search_term(self):
search_term = Con.get_ebay_search_term()
print(len(search_term))
with open('search_term.py','w',encoding='utf-8') as f:
f.write(str(search_term))
def random_ua(self):
first_num = random.randint(55, 62)
third_num = random.randint(0, 3200)
fourth_num = random.randint(0, 140)
os_type = [
'(Windows NT 6.1; WOW64)',
'(Windows NT 10.0; WOW64)',
'(X11; Linux x86_64)',
'(Macintosh; Intel Mac OS X 10_12_6)'
]
chrome_version = 'Chrome/{}.0.{}.{}'.format(first_num, third_num, fourth_num)
ua = ' '.join(['Mozilla/5.0', random.choice(os_type), 'AppleWebKit/537.36',
'(KHTML, like Gecko)', chrome_version, 'Safari/537.36']
)
# userAgent = {"user-agent": ua}
self.headers['user-agent'] = ua
def get_asin(self, i,page, cookie_dict,task_queue, retry_count=1):
url = 'https://www.ebay.com/sch/i.html'
retries_left = self.MAX_RETRIES - retry_count
try:
sess = requests.Session()
sess.mount('https://www.ebay.com/', py_ja3.DESAdapter())
params = {
'_from': 'R40',
'_fcid': '1',
'_nkw': f'{i}',
'_sacat': '1',
'rt': 'nc',
'_pgn': f'{page}',
}
self.random_ua()
cookies = literal_eval(cookie_dict)
response = sess.get(url, params=params, headers=self.headers,cookies=cookies, verify=False, timeout=30)
if response.status_code == 200:
pattern = r'"text":"United States - USA"'
matches = re.findall(pattern, response.text)
len_cookie = len(matches)
if len_cookie == 2: # 判断邮编
html = etree.HTML(response.text)
# 获取asin
hrefs = html.xpath('//ul[@class="srp-results srp-list clearfix"]/li//div[@class="s-item__image"]/a/@href')
if hrefs:
print(f'{i}的第{page}页')
asins = [re.findall('www.ebay.com/itm/(.*?)\?', hh)[0] for hh in hrefs]
for asin in asins:
item = {'asin': asin, 'created_time': datetime.now(), 'state': 3}
try:
self.item_queue.put(item,block=True, timeout=3)
print(f'{asin}已写入item_queue,111111111111')
except Exception as e:
print('item_queue0队列已满,写入失败')
return True
else:
print(f'{i} 的最后一页')
return False
else:
cookie_list.remove(cookie_dict)
new_cookie = random.choice(cookie_list)
print(f'邮编不是us,{i}第{page}页,更换cookie')
print('cookie剩余:', len(cookie_list))
if new_cookie:
self.get_asin(i, page, cookie_dict, retry_count + 1)
else:
print(response.status_code)
try:
print(22222222222)
task_queue.put(i, block=True, timeout=3)
print(f'{i}已写入task_queue,222222222222')
except Exception as e:
print('item_queue1队列已满,写入失败')
except Exception as exc:
if retries_left > 0:
wait_time = 2 ** retry_count
print(f"遇到错误,将在{wait_time}秒后重试...")
time.sleep(wait_time)
self.get_asin(i, page, cookie_dict, retry_count + 1)
else:
print(f"重试次数耗尽,将search: {i}放进队列, 错误: {exc}")
try:
print(3333333333333333)
task_queue.put(i, block=True, timeout=3)
print(f'{i}已写入task_queue,3333333333')
except Exception as e:
print('item_queue2队列已满,写入失败')
def workers(self,task_queue,cookie_dict):
while not task_queue.empty():
try:
search = task_queue.get(timeout=3)
page = 1
while page <= 3: # 更改条件,以便在循环内控制是否继续
not_last_page = self.get_asin(search, page, cookie_dict,task_queue)
if not not_last_page or page == 3: # 如果没有更多页面了
break # 退出循环
page += 1
task_queue.task_done()
except Exception as e:
print('task_queue1队列为空,get失败')
def run(self,search_list,cookie_dict):
task_queue = queue.Queue()
for search in search_list:
try:
task_queue.put(search,block=True, timeout=3)
except Exception as e:
print('task_queue3队列已满,写入失败')
threads = []
for th in range(1):
t = threading.Thread(target=self.workers, args=(task_queue,cookie_dict))
threads.append(t)
t.start()
# 等待所有线程完成
for t in threads:
t.join()
items_to_save = []
# 保存所有收集到的数据
while not self.item_queue.empty():
print('准备取出数据..............')
try:
item = self.item_queue.get(block=True, timeout=3)
items_to_save.append(item)
except Exception as e:
print('item_queue队列为空,get失败')
print('准备保存数据..............')
if items_to_save:
try:
print('开始保存数据..............')
Con.save_ebay_asins(items_to_save)
except Exception as e:
print(e)
def workers(start_id, limit):
search_list = search_terms[start_id:limit]
cookie_dict = random.choice(cookie_list)
print(start_id,limit,search_list)
Get_asin().run(search_list,cookie_dict)
def main():
# pppoe_ip()
start_time = datetime.now()
total = 10000
parts = 100
step = total // parts
intervals = [[i * step, (i + 1) * step] for i in range(parts)]
print(intervals)
# start_index = 4700 // 100 # 找到起始点对应的区间索引
# parts = (10000 - start_index * 100) // 100 + 1 # 计算从起始点到结束点的有效区间数
# step = 100
# intervals = [[(start_index + i) * step, ((start_index + i) + 1) * step] for i in range(parts)]
# print(intervals)
for section in intervals:
print(f'第{section}批次')
minid = section[0]
maxid = section[1]
num_processes = 5
batch_size = 20
p = Pool(num_processes)
data_range = []
while minid < maxid:
data_range.append((minid, minid + batch_size))
minid += batch_size
for start_id, limit in data_range:
p.apply_async(workers, args=(start_id, limit))
p.close()
p.join()
current_time = datetime.now()
elapsed_time = current_time - start_time
if elapsed_time >= timedelta(minutes=5):
print('时间超过5分钟,切换IP')
pppoe_ip()
start_time = current_time # 重置开始时间
else:
random_wait_time = random.randint(120, 200)
print(f"等待 {random_wait_time} 秒...")
time.sleep(random_wait_time)
pppoe_ip()
if __name__ == '__main__':
main()
This source diff could not be displayed because it is too large. You can view the blob instead.
import subprocess
import time
import requests
import random
def is_internet_available():
try:
n = random.randint(70, 114)
ua = f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{n}.0.{random.randint(1000, 5000)}.{random.randint(1, 181)} Safari/537.36'
headers = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Cache-Control': 'no-cache',
'User-Agent': ua,
}
r = requests.get("https://www.baidu.com", timeout=5,headers=headers)
print(r.status_code)
return True
except Exception as e:
print(e)
return False
def pppoe_ip():
nums = 0
while True:
try:
print('开始切换ip')
result = subprocess.run(['sh', '/root/pppoe.sh'], capture_output=True, text=True, check=True)
print(result.stdout)
time.sleep(2)
print(result.stderr) # 捕获stderr输出
print('执行 sh 脚本 完成')
time.sleep(1)
# 要执行的系统命令
# 调用函数检测网络连接是否可用
if is_internet_available():
print("有网络连接")
# 设置共享值为True,表示操作已经完成
break
else:
print("没有网络连接",nums)
nums += 3
time.sleep(nums * 2)
except subprocess.CalledProcessError as e:
print('切换ip 失败',e)
continue
if __name__ == '__main__':
is_internet_available()
\ No newline at end of file
from all_connect import ConnectSpider
Con = ConnectSpider()
from lxml import etree
import requests
import json
import datetime
from urllib.parse import quote
class HomeDepot:
def __init__(self):
self.headers = {
'Referer': 'https://www.homedepot.com/s/02%20sensor%20downstream?NCNI-5',
'Upgrade-Insecure-Requests': '1',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0.0.0 Safari/537.36',
'sec-ch-ua': '"Not)A;Brand";v="99", "Google Chrome";v="127", "Chromium";v="127"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
}
self.cookies = {
'AMCV_F6421253512D2C100A490D45%40AdobeOrg': 'MCMID|16129746417315936432497630021679731371',
}
def fetch_page(self,keyword,page):
encoded_keyword = quote(keyword)
url = f'https://www.homedepot.com/s/{encoded_keyword}?NCNI-5&Nao={page}'
response = requests.get(url, headers=self.headers,cookies=self.cookies)
if response.status_code == 200:
return response.text
else:
print(f"Error fetching page {page}: Status code {response.status_code}")
return None
def parse_html(self,html_content,page):
Html = etree.HTML(html_content)
try:
try:
not_find = Html.xpath('//*[@id="root"]/div/div/div/div[2]/div/div/div/span[1]/text()')[0]
print(not_find,page)
if "Hmm...we couldn't find" in not_find and page ==0:
print('无效')
except:
data_list = json.loads(Html.xpath('//script[@id="thd-helmet__script--browseSearchStructuredData"]/text()')[0])
if isinstance(data_list, list) and len(data_list) > 0:
return data_list[0]['mainEntity']['offers']['itemOffered']
else:
return []
except Exception as e:
print(f"Parsing error: {e}")
return []
def save_data(self,project_list,keyword):
# 在这里实现保存到数据库或其他存储方式的逻辑
data_list = []
for project in project_list:
item = {}
# 使用 .get() 方法并提供默认值以避免 KeyError
item['title'] = project.get('name', '')
offers = project.get('offers', {})
price = offers.get('price')
if isinstance(price, str):
# 如果价格是字符串,则尝试将其转换为浮点数
try:
# 尝试将价格转换为浮点数
price = float(price) if price.replace('.', '', 1).isdigit() or (
price.count('.') == 1 and price.replace('.', '', 1).isdigit()) else None
except ValueError:
# 如果转换失败,设置为 None
price = None
elif not isinstance(price, (int, float)):
# 如果既不是字符串也不是数值类型,则设置为 None
price = None
item['price'] = price
item['url'] = offers.get('url', '')
item['image'] = project.get('image', '')
item['sku'] = project.get('sku', '')
item['search_term'] = keyword
item['state'] = 1
item['created_at'] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# print(item)
data_list.append(item)
print(data_list)
Con.save_project_items(data_list)
def process_keyword(self,keyword):
max_pages = 3 # 只爬取前三页
for page in range(0, max_pages * 24, 24): # 每次增加24
print(f"爬取{keyword}第{page // 24 + 1}页...")
html_content = self.fetch_page(keyword, page)
if html_content is None:
break
project_list = self.parse_html(html_content,page)
if not project_list:
print(f"{keyword} Reached the last page.")
break
# self.save_data(project_list, keyword)
def main(self):
keywords1 = ['apple watch bands for women red']
for keyword in keywords1:
self.process_keyword(keyword)
if __name__ == "__main__":
depot = HomeDepot()
depot.main()
import platform
# 需要用到的表
DB_REQUESTS_ASIN_PARAMS = {
"db_syn": "us_all_syn_st",
"db_variat": "us_variat", # 更改
"db_cookies": "us_cookies",
"db_us_erp_asin_syn": "us_erp_asin_syn",
"db_us_erp_asin": "us_erp_asin",
"db_seller_account_syn": "us_seller_account_syn",
"db_seller_asin_account": "us_seller_asin_account",
"db_asin_image": "us_asin_image",
"db_seller_account_feedback_5000_asin_syn": "us_seller_account_feedback_5000_asin_syn",
"db_amazon_seller_account": "amazon_seller_account",
"db_ip_16yun": "ip_16yun",
"db_request_param_val": "us_request_param_val",
"db_seller_account_feedback": "us_seller_account_feedback",
"db_self_asin_detail": "us_self_asin_detail",
"db_seller_account_product_syn": "us_seller_account_product_syn",
"db_seller_account_feedback_new": "us_seller_account_feedback_report",
"db_competitive_aisn_syn": "us_competitive_aisn_syn",
"db_competitive_aisn": "us_competitive_aisn",
"db_asin_detail_product": "fr_seller_asin_product",
"db_search_term": "us_search_term",
"db_variation_total": "us_variation_total",
"db_asin_star": "uk_asin_star",
"db_asin_add_cart": "us_asin_fb_together",
"db_brand_asin": "us_brand_asin",
"db_bs_category_asin_detail": "fr_bs_category_asin_detail",
"db_asin_of_concern": "us_asin_of_concern",
"db_potential_product_asin_syn": "us_potential_product_asin_syn",
"db_potential_product_asin": "us_potential_product_asin",
'db_user_collection_syn': 'user_collection_syn',
'db_self_product_detail': "us_self_product_detail",
"int_threading_num": 9,
"if_ip_num_proxy": 100,
"if_ip_num": 20,
"requests_timeout": 15,
"de_requests_timeout": 15,
"de_int_threading_num": 8,
"feedback_threading_num": 8
}
DB_SEARCH_TERM_PARAMS_SPIDER = {
"db_search_term": "us_search_term",
"db_asin_detail_simply": "us_asin_detail_simply",
"db_search_term_serverip": "us_search_term_serverip",
"db_search_term_zr": "us_search_term_rank_zr",
"db_search_term_sp": "us_search_term_rank_sp",
"db_search_term_sb": "us_search_term_rank_sb",
"db_search_term_ac": "us_search_term_rank_ac",
"db_search_term_bs": "us_search_term_rank_bs",
"db_search_term_er": "us_search_term_rank_er",
"db_search_term_hr": "us_search_term_rank_hr",
"db_search_term_tr": "us_search_term_rank_tr",
"db_other_search_term": "us_other_search_term",
"us_brand_analytics": "us_brand_analytics",
}
DB_SEARCH_TERM_PARAMS_SPIDER_MYSQL = {
"db_search_term": "us_search_term",
"db_asin_detail_simply": "us_asin_detail_simply",
"db_search_term_serverip": "us_search_term_serverip",
"db_search_term_zr": "us_search_term_rank_zr",
"db_search_term_sp": "us_search_term_rank_sp",
"db_search_term_sb": "us_search_term_rank_sb",
"db_search_term_ac": "us_search_term_rank_ac",
"db_search_term_bs": "us_search_term_rank_bs",
"db_search_term_er": "us_search_term_rank_er",
"db_search_term_hr": "us_search_term_rank_hr",
"db_search_term_tr": "us_search_term_rank_tr",
"db_potential_product_st_syn": "us_potential_product_st_syn",
"db_potential_product_st": "us_potential_product_st",
"db_other_search_term": "us_other_search_term",
"us_brand_analytics": "us_brand_analytics",
}
# 连接mysql数据库参数
DB_CONN_DICT = {
"mysql_port": 3306,
"mysql_db": "selection",
"mysql_user": "adv_yswg",
"mysql_pwd": "Gd1pGJog1ysLMLBdML8w81",
"mysql_host": "rm-wz9yg9bsb2zf01ea4yo.mysql.rds.aliyuncs.com",
}
# 连接pg14数据库参数 113.100.143.162:5432
if platform.system().lower() == 'windows':
PG_CONN_DICT = {
"pg_port": 5432,
"pg_db": "selection",
"pg_user": "postgres",
"pg_pwd": "fazAqRRVV9vDmwDNRNb593ht5TxYVrfTyHJSJ3BS",
"pg_host": "192.168.10.223",
}
else:
PG_CONN_DICT = {
"pg_port": 54328,
"pg_db": "selection",
"pg_user": "postgres",
"pg_pwd": "fazAqRRVV9vDmwDNRNb593ht5TxYVrfTyHJSJ3BS",
"pg_host": "61.145.136.61",
}
# 连接pg6数据库参数 113.100.143.162:5432
if platform.system().lower() == 'windows':
PG_CONN_DICT_6 = {
"pg_port": 5432,
"pg_db": "selection",
"pg_user": "postgres",
"pg_pwd": "fazAqRRVV9vDmwDNRNb593ht5TxYVrfTyHJSJ3BS",
"pg_host": "113.100.143.162",
}
else:
PG_CONN_DICT_6 = {
"pg_port": 5432,
"pg_db": "selection",
"pg_user": "postgres",
"pg_pwd": "fazAqRRVV9vDmwDNRNb593ht5TxYVrfTyHJSJ3BS",
"pg_host": "113.100.143.162",
}
# 连接pg12数据库参数 113.100.143.162:5443
if platform.system().lower() == 'windows':
PG_CONN_DICT_21 = {
"pg_port": 5443,
"pg_db": "selection",
"pg_user": "postgres",
"pg_pwd": "fazAqRRVV9vDmwDNRNb593ht5TxYVrfTyHJSJ3BS",
"pg_host": "113.100.143.162",
}
else:
PG_CONN_DICT_21 = {
"pg_port": 5443,
"pg_db": "selection",
"pg_user": "postgres",
"pg_pwd": "fazAqRRVV9vDmwDNRNb593ht5TxYVrfTyHJSJ3BS",
"pg_host": "113.100.143.162",
}
# doris
DORIS_CONN = {
"doris_port": 19030,
"doris_db": "selection",
"doris_user": "root",
"doris_host": "113.100.143.162",
"doris_pwd": ""
}
# redis
REDIS_CONN = {
"redis_host": "113.100.143.162",
"redis_port": 6379,
"redis_pwd": "yswg2023",
"redis_db": 14
}
# 连接starrocks数据库参数
if platform.system().lower() == 'windows':
starrocks_CONN = {
"mysql_port": 19030,
"mysql_db": "us_spider",
"mysql_user": "pengyanbing",
"mysql_pwd": "pengyanbing12345",
"mysql_host": "113.100.143.162"
}
else:
starrocks_CONN = {
"mysql_port": 19030,
"mysql_db": "us_spider",
"mysql_user": "pengyanbing",
"mysql_pwd": "pengyanbing12345",
"mysql_host": "192.168.10.151"
}
\ No newline at end of file
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.ssl_ import create_urllib3_context
import random
ORIGIN_CIPHERS = ('ECDH+AESGCM:DH+AESGCM:ECDH+AES256:DH+AES256:ECDH+AES128:DH+AES:ECDH+HIGH:'
'DH+HIGH:ECDH+3DES:DH+3DES:RSA+AESGCM:RSA+AES:RSA+HIGH:RSA+3DES')
class DESAdapter(HTTPAdapter):
def __init__(self, *args, **kwargs):
"""
A TransportAdapter that re-enables 3DES support in Requests.
"""
CIPHERS = ORIGIN_CIPHERS.split(':')
random.shuffle(CIPHERS)
CIPHERS = ':'.join(CIPHERS)
md5_list = [':!aNULL:!eNULL:!MD5', ':!aNULL:!MD5:!DSS']
self.CIPHERS = CIPHERS + random.choice(md5_list)
# self.CIPHERS = CIPHERS + ':!aNULL:!MD5:!DSS'
super().__init__(*args, **kwargs)
def init_poolmanager(self, *args, **kwargs):
context = create_urllib3_context(ciphers=self.CIPHERS)
kwargs['ssl_context'] = context
return super(DESAdapter, self).init_poolmanager(*args, **kwargs)
def proxy_manager_for(self, *args, **kwargs):
context = create_urllib3_context(ciphers=self.CIPHERS)
kwargs['ssl_context'] = context
return super(DESAdapter, self).proxy_manager_for(*args, **kwargs)
\ No newline at end of file
import json
import ast
from all_connect import ConnectSpider
Con = ConnectSpider()
import ast
import gzip
import requests
from lxml import etree
from amazon_params import py_ja3
from datetime import datetime
import re
from datetime import datetime
from collections import defaultdict
import json
import io
from multiprocessing import Pool
from multiprocessing import Queue
from multiprocessing import Process, Manager, Lock
import queue
# asins = Con.get_asin_html_06_asin()
# print(asins)
def decompress_bytes(input_bytes):
if isinstance(input_bytes, str):
input_bytes = ast.literal_eval(input_bytes)
return gzip.decompress(input_bytes).decode('utf-8')
def execution_parse(asin_html):
keywords_scraper_url_list = asin_html.split('||-||')
asin_org = keywords_scraper_url_list[0]
html_org = keywords_scraper_url_list[1]
print(f'当前解析:{asin_org}')
html_str = json.loads(html_org)
html = decompress_bytes(html_str)
filename = f"{asin_org}1.html"
with open(filename, 'w', encoding='utf-8') as f:
f.write(html)
def run():
asin_html_pairs = Con.look_html()
if asin_html_pairs:
for asin_html in asin_html_pairs:
execution_parse(asin_html)
else:
print("没有符合条件的数据需要处理")
return
run()
import subprocess
import time
import requests
import random
def is_internet_available():
try:
n = random.randint(70, 114)
ua = f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{n}.0.{random.randint(1000, 5000)}.{random.randint(1, 181)} Safari/537.36'
headers = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Cache-Control': 'no-cache',
'User-Agent': ua,
}
r = requests.get("https://www.baidu.com", timeout=5,headers=headers)
print(r.status_code)
return True
except Exception as e:
print(e)
return False
def pppoe_ip():
nums = 0
while True:
try:
print('开始切换ip')
result = subprocess.run(['sh', '/root/pppoe.sh'], capture_output=True, text=True, check=True)
print(result.stdout)
time.sleep(2)
print(result.stderr) # 捕获stderr输出
print('执行 sh 脚本 完成')
time.sleep(1)
# 要执行的系统命令
# 调用函数检测网络连接是否可用
if is_internet_available():
print("有网络连接")
# 设置共享值为True,表示操作已经完成
break
else:
print("没有网络连接",nums)
nums += 3
time.sleep(nums * 2)
except subprocess.CalledProcessError as e:
print('切换ip 失败',e)
continue
if __name__ == '__main__':
is_internet_available()
\ No newline at end of file
# 输入项目名称 类似python input
#echo "Input name of the project"
#fruits=("requests_asin_competitive_8.py" "Poll_site_spider.py" "all_site_asin_detail_9.py" "h4_asin_detail.py" "linux_spider_asin.py" "Poll_site_spider_pg.py" "site_search_term.py")
fruits=("amazon_comment")
for i in "${fruits[@]}"; do
PID=$(ps -ef | grep $i | grep -v grep | awk '{print $2}')
for k in "${PID[@]}"; do
echo "id $k"
kill -9 $k ;
done;
echo "pid: $PID ---"
done
#chmod +x k_spider.sh
#30 23 * * * cd /mnt/hezhe && ./k_spider.sh > ki.log 2>&1 &
#cd /mnt/hezhe && ./k_spider.sh > ki.log 2>&1 &
# cd /mnt/hezhe/amazon_spider/amazon_spider && ./k_spider.sh > ki.log 2>&1 &
\ No newline at end of file
from time import sleep
from random import randint
from all_connect import ConnectSpider
Con = ConnectSpider()
import imaplib
import email
import os
os.environ['NO_PROXY'] = 'stackoverflow.com'
import logging
logging.captureWarnings(True)
from DrissionPage import ChromiumPage
import json
import requests
import re
import random
import time
from datetime import datetime, timedelta
import concurrent.futures
import time
import threading
class Download():
def __init__(self):
self.account = ''
self.pwd = ''
self.headers = {
'accept': 'application/json',
'accept-language': 'zh-CN,zh;q=0.9',
'content-type': 'application/json',
'newrelic': 'eyJ2IjpbMCwxXSwiZCI6eyJ0eSI6IkJyb3dzZXIiLCJhYyI6Ijk2NzIzMiIsImFwIjoiMTU4ODYzMjc5MiIsImlkIjoiMDdjNDZhYTI3ZTBlMTAyZiIsInRyIjoiOGI4ODQ3MzNiNjFjNDNlY2YxMGEzOTQ2MzQ4MDE2NzQiLCJ0aSI6MTczNTk5NzEzNjEyOH19',
'origin': 'https://www.shutterstock.com',
'priority': 'u=1, i',
'referer': 'https://www.shutterstock.com/zh/catalog/',
'sec-ch-ua': '"Not)A;Brand";v="99", "Google Chrome";v="127", "Chromium";v="127"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'same-origin',
'traceparent': '00-8b884733b61c43ecf10a394634801674-07c46aa27e0e102f-01',
'tracestate': '967232@nr=0-1-967232-1588632792-07c46aa27e0e102f----1735997136128',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0.0.0 Safari/537.36',
'x-end-app-name': 'next-web',
'x-end-app-version': '5ca4a4c05d8',
'x-newrelic-id': 'XQAAU1VRGwIEVVhaBgYGUlI=',
'x-request-id': '15754a73-f152-4983-99b4-6af058379880',
}
def download_image(self, file_path, res):
with open(file_path, "wb") as f:
f.write(res.content)
def analyze_pic(self, account_id, image_id, image_url, image_title):
try:
start_time = datetime.now().strftime("%m-%d %H:%M:%S")
if '@' in account_id:
file_name = account_id.split('@')[0]
else:
file_name = account_id
save_folder = f'/home/wangjing/picture_material/stock_summery/all_pic/{file_name}'
pic_name = f'{str(image_id)}+{image_title}'.replace(':', '_').replace('\t', '').replace('\r', '').replace('|', '').replace('/', '').replace('"', "'").replace(' ', '_').replace(',', '_').replace('.','_').replace('\n', '_')
if any('\u4e00' <= char <= '\u9fff' for char in pic_name):
# 如果包含中文,截取前80个字符
pic_name = pic_name[:50] + '.jpg'
else:
# 不包含中文时,按照原策略处理,但总长不超过160字符
pic_name = re.sub(r'[\\/*?:"<>|]', '_', pic_name)[:160] + '.jpg'
res = requests.get(image_url, timeout=600)
# 构建完整的文件路径
file_path = os.path.join(save_folder, pic_name)
self.download_image(file_path, res)
Con.update_url_state_to_3(image_id)
now_time = datetime.now().strftime("%m-%d %H:%M:%S")
print(f'pic_name:{pic_name[:38]},time:{start_time}——{now_time}下载成功')
except Exception as e:
print(e)
import time
def process_item_id(self, item_id):
has_new_data = False # 用于标记当前item_id是否有新数据被处理
print(f"开始处理item_id: {item_id}")
retry_count = 0 # 初始化重试计数器
max_retries = 3 # 最大重试次数
while True: # 在每次处理一个item_id时,使用while True来确保可以重复检查新增的任务
try:
account_list = Con.get_cookie_account(item_id)
if not account_list: # 如果没有获取到账号信息,则跳过
break
account = account_list[0]
pic_data_list = Con.get_pic_urls(account)
if pic_data_list:
has_new_data = True # 标记有新数据被处理
print(f'{account}有{len(pic_data_list)}个需要下载')
for pic_data in pic_data_list:
image_url, image_id, image_title = pic_data.split("||", 2)
try:
self.analyze_pic(account, image_id, image_url, image_title)
except Exception as e:
if 'Expected axis has 0 elements, new values have 2 elements' in str(e):
print(f'{account}已全部下载完成')
else:
print(f"处理图片时出错: {e}")
else:
print(f"{account} 没有新的图片需要下载")
break # 如果没有新的图片需要下载,退出当前item_id的处理循环
# 再次检查是否有新增的任务需要处理
new_pic_data_list = Con.get_pic_urls(account)
if new_pic_data_list == pic_data_list or not new_pic_data_list:
break # 如果没有新增任务或与之前相同,则退出循环
pic_data_list = new_pic_data_list # 更新pic_data_list为最新的列表,准备下一轮处理
except Exception as e:
if 'server closed the connection unexpectedly' in str(e) and retry_count < max_retries:
retry_count += 1
print(f"连接中断,正在第{retry_count}次重试...")
time.sleep(5) # 等待5秒后重试
continue
else:
print(f"处理item_id: {item_id}时发生错误: {e}")
break
# 完成一个item_id的所有任务后,打印通知
print(f"item_id: {item_id} 的所有任务已完成")
return has_new_data # 返回是否有新数据被处理
def run(self):
while True:
all_completed = True # 标记是否所有item_id都已完成处理
for item_id in range(1, 33): # 循环遍历所有item_id
if self.process_item_id(item_id):
all_completed = False # 如果任何一个item_id有任务处理,则标记为未全部完成
if all_completed:
print('目前没有需要下载的,等待半小时')
time.sleep(1800) # 等待30分钟后再次检查
if __name__ == '__main__':
Download().run()
scrapy==2.7.1
requests
redis
httpx==0.18.2
jsonpath
PyMySQL
mysqlclient
fake_useragent
attrs
typing-extensions==4.3.0
mysql-connector
nltk
func_timeout
pillow
\ No newline at end of file
# Automatically created by: scrapy startproject
#
# For more information about the [deploy] section see:
# https://scrapyd.readthedocs.io/en/latest/deploy.html
[settings]
default = amazon_spider.settings
[deploy:local]
url = http://localhost:6800/
project = amazon_spider
[deploy:hadoop10]
url = http://192.168.10.219:6800/
project = amazon_spider
username = hezhe
password = admin
import os
os.environ['NO_PROXY'] = 'stackoverflow.com'
import logging
logging.captureWarnings(True)
from DrissionPage import ChromiumPage,ChromiumOptions
import time
from datetime import datetime, timedelta
from time import sleep
from random import randint
import requests
import math
import pandas as pd
import redis
import json
from pathlib import Path
import re
class TkOrderExport():
def __init__(self):
co = ChromiumOptions().headless()
self.page = ChromiumPage(co)
# 修改请求头
self.headers = {
'accept': '*/*',
'accept-language': 'en-US,en;q=0.9', # 'en-US,en;q=0.9'
'cache-control': 'no-cache',
'content-type': 'application/json; charset=UTF-8',
'origin': 'https://www.tiktok.com',
'pragma': 'no-cache',
'priority': 'u=1, i',
'referer': 'https://www.tiktok.com/',
'sec-ch-ua': '"Google Chrome";v="135", "Not-A.Brand";v="8", "Chromium";v="135"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'cross-site',
'sec-fetch-storage-access': 'active',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36',
}
self.shop_code_ceshi = "CNUSCBR2ELPJ"
# 店铺账号
self.account = 'Hiboomtiktok@outlook.com'
# 下载文件路径
self.download_folder = r"D:\Downloads"
self.file_suffix = ".xlsx" # 设置你期望的文件名后缀(包括点)
self.receiver_name = 'wangjing5'
# Redis 配置信息
self.REDIS_CONFIG = {
'host': '120.79.147.190',
'port': 6379,
'password': 'fG7#vT6kQ1pX',
'db': 13,
'decode_responses': True
}
self.url = "https://affiliate.tiktokglobalshop.com/product/order?shop_region=US"
self.cookies = self.get_cookies()
def login_with_cookies(self):
# 检查是否登录状态
try:
self.page.get(self.url)
self.page.set.window.max()
tou_xiang = self.page.ele('xpath://span[@class="m4b-avatar-image"]', timeout=10)
tou_xiang.click()
sleep(randint(5, 10))
self.store_name = self.page.ele('xpath://div[@class="text-body-m-medium text-neutral-text1"]',timeout=10).text
print('店铺名:', self.store_name)
sleep(randint(1, 3))
self.shop_code = self.page.ele('xpath://div[@class="text-body-s text-neutral-text4"]/span', timeout=10).text
self.shop_code = self.shop_code.split(': ')[1]
print('店铺编号:', self.shop_code)
sleep(randint(1, 3))
if self.shop_code and str(self.shop_code) == self.shop_code_ceshi:
print("在登录状态")
else:
print("不一样")
raise
except:
print("需要登录")
try:
# 打开一个空白页,否则无法设置 cookie
self.page.get('https://www.tiktok.com')
# 设置 cookies
for ck in self.cookies:
self.page.set.cookies(ck)
# 重新打开目标网址,此时应为已登录状态
self.page.get(self.url)
time.sleep(5)
except:
print("登录失败")
def save_cookie(self, cookies):
"""将整个 cookies 列表一次性存入 Redis,作为一条 JSON 数据"""
key = f"tk_shop_cookie:{self.shop_code}:order:list"
# 将整个 cookies 列表转换为 JSON 字符串
value = json.dumps(cookies, ensure_ascii=False)
# 存储到 Redis(覆盖旧数据)
self.r.set(key, value)
print(f"💾 已将 {len(cookies)} 个 cookie 存入 Redis,键为: {key}")
def get_cookies(self):
self.r = redis.StrictRedis(**self.REDIS_CONFIG)
"""从 Redis 中获取并解析 cookies"""
key = f"tk_shop_cookie:{self.shop_code_ceshi}:order:list"
cookie_json = self.r.get(key)
if cookie_json:
cookies = json.loads(cookie_json)
print(f"🔄 从 Redis 成功读取 {len(cookies)} 个 cookie")
return cookies
else:
print("❌ 未找到对应的 cookie 数据")
return None
def change_language(self):
self.login_with_cookies()
retry_num = 0
max_retry = 1 # 最多重试次数
while retry_num <= max_retry:
try:
tou_xiang = self.page.ele('xpath://span[@class="m4b-avatar-image"]', timeout=10)
tou_xiang.click()
sleep(randint(5, 10))
self.store_name = self.page.ele('xpath://div[@class="text-body-m-medium text-neutral-text1"]',timeout=10).text
print('店铺名:', self.store_name)
sleep(randint(1, 3))
self.shop_code = self.page.ele('xpath://div[@class="text-body-s text-neutral-text4"]/span',
timeout=10).text
self.shop_code = self.shop_code.split(': ')[1]
print('店铺编号:', self.shop_code)
sleep(randint(1, 3))
cookies = self.page.cookies()
self.save_cookie(cookies)
self.send_success_message_via_wechat()
break
except Exception as e:
print(f"change_language出现错误: {e}")
self.send_error_notification_via_wechat(e)
if "没有" in str(e):
retry_num += 1
sleep(5)
else:
raise
self.page.quit()
def connect_redis(self):
"""建立 Redis 连接"""
self.r = redis.StrictRedis(**self.REDIS_CONFIG)
try:
self.r.ping() # 测试连接
print("✅ 成功连接到 Redis")
except redis.exceptions.ConnectionError as e:
print(f"❌ 无法连接到 Redis: {e}")
raise
def send_success_message_via_wechat(self):
webhook_url = 'http://47.112.96.71:8082/selection/sendMessage' # 替换为你的企业微信机器人的Webhook URL
data = {
"account": self.receiver_name,
"title": '【TK获取cookie成功提醒】',
"content": f'账号: {self.account}, 店铺: {self.store_name}, 程序运行时间: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'
}
response = requests.post(url=webhook_url, data=data, timeout=15)
if response.status_code == 200:
print("已成功发送通知到企业微信")
else:
print(f"发送通知失败: {response.text}")
def send_error_notification_via_wechat(self, error_message):
webhook_url = 'http://47.112.96.71:8082/selection/sendMessage' # 替换为你的企业微信机器人的Webhook URL
data = {
"account": self.receiver_name,
'title': '【TK获取cookie异常提醒】',
'content': f'账号:{self.account},店铺:{self.store_name},错误信息:{error_message}, 时间: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'
}
response = requests.post(url=webhook_url, data=data, timeout=15)
if response.status_code == 200:
print("已成功发送错误通知到企业微信")
else:
print(f"发送错误通知失败: {response.text}")
def run(self):
self.connect_redis()
self.change_language()
# self.page.quit()
if __name__ == '__main__':
TkOrderExport().run()
import os
os.environ['NO_PROXY'] = 'stackoverflow.com'
import logging
logging.captureWarnings(True)
from DrissionPage import ChromiumPage,ChromiumOptions
import time
from datetime import datetime, timedelta
from time import sleep
from random import randint
import requests
import math
import pandas as pd
import redis
import json
from pathlib import Path
import re
class TkOrderExport():
def __init__(self):
co = ChromiumOptions().headless()
self.page = ChromiumPage(co)
# 修改请求头
self.headers = {
'accept': '*/*',
'accept-language': 'en-US,en;q=0.9', # 'en-US,en;q=0.9'
'cache-control': 'no-cache',
'content-type': 'application/json; charset=UTF-8',
'origin': 'https://www.tiktok.com',
'pragma': 'no-cache',
'priority': 'u=1, i',
'referer': 'https://www.tiktok.com/',
'sec-ch-ua': '"Google Chrome";v="135", "Not-A.Brand";v="8", "Chromium";v="135"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'cross-site',
'sec-fetch-storage-access': 'active',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36',
}
self.shop_code_ceshi = "CNUSCBX2ELPN"
# 店铺账号
self.account = 'SATINIOR456tk@outlook.com'
# 下载文件路径
self.download_folder = r"D:\Downloads"
self.file_suffix = ".xlsx" # 设置你期望的文件名后缀(包括点)
self.receiver_name = 'wangjing5'
# Redis 配置信息
self.REDIS_CONFIG = {
'host': '120.79.147.190',
'port': 6379,
'password': 'fG7#vT6kQ1pX',
'db': 13,
'decode_responses': True
}
self.url = "https://affiliate.tiktokglobalshop.com/product/order?shop_region=US"
self.cookies = self.get_cookies()
def login_with_cookies(self):
# 检查是否登录状态
try:
self.page.get(self.url)
self.page.set.window.max()
tou_xiang = self.page.ele('xpath://span[@class="m4b-avatar-image"]', timeout=10)
tou_xiang.click()
sleep(randint(5, 10))
self.store_name = self.page.ele('xpath://div[@class="text-body-m-medium text-neutral-text1"]',timeout=10).text
print('店铺名:', self.store_name)
sleep(randint(1, 3))
self.shop_code = self.page.ele('xpath://div[@class="text-body-s text-neutral-text4"]/span', timeout=10).text
self.shop_code = self.shop_code.split(': ')[1]
print('店铺编号:', self.shop_code)
sleep(randint(1, 3))
if self.shop_code and str(self.shop_code) == self.shop_code_ceshi:
print("在登录状态")
else:
print("不一样")
raise
except:
print("需要登录")
try:
# 打开一个空白页,否则无法设置 cookie
self.page.get('https://www.tiktok.com')
# 设置 cookies
for ck in self.cookies:
self.page.set.cookies(ck)
# 重新打开目标网址,此时应为已登录状态
self.page.get(self.url)
time.sleep(5)
except:
print("登录失败")
def save_cookie(self, cookies):
"""将整个 cookies 列表一次性存入 Redis,作为一条 JSON 数据"""
key = f"tk_shop_cookie:{self.shop_code}:order:list"
# 将整个 cookies 列表转换为 JSON 字符串
value = json.dumps(cookies, ensure_ascii=False)
# 存储到 Redis(覆盖旧数据)
self.r.set(key, value)
print(f"💾 已将 {len(cookies)} 个 cookie 存入 Redis,键为: {key}")
def get_cookies(self):
self.r = redis.StrictRedis(**self.REDIS_CONFIG)
"""从 Redis 中获取并解析 cookies"""
key = f"tk_shop_cookie:{self.shop_code_ceshi}:order:list"
cookie_json = self.r.get(key)
if cookie_json:
cookies = json.loads(cookie_json)
print(f"🔄 从 Redis 成功读取 {len(cookies)} 个 cookie")
return cookies
else:
print("❌ 未找到对应的 cookie 数据")
return None
def change_language(self):
self.login_with_cookies()
retry_num = 0
max_retry = 1 # 最多重试次数
while retry_num <= max_retry:
try:
tou_xiang = self.page.ele('xpath://span[@class="m4b-avatar-image"]', timeout=10)
tou_xiang.click()
sleep(randint(5, 10))
self.store_name = self.page.ele('xpath://div[@class="text-body-m-medium text-neutral-text1"]',timeout=10).text
print('店铺名:', self.store_name)
sleep(randint(1, 3))
self.shop_code = self.page.ele('xpath://div[@class="text-body-s text-neutral-text4"]/span',
timeout=10).text
self.shop_code = self.shop_code.split(': ')[1]
print('店铺编号:', self.shop_code)
sleep(randint(1, 3))
cookies = self.page.cookies()
self.save_cookie(cookies)
self.send_success_message_via_wechat()
break
except Exception as e:
print(f"change_language出现错误: {e}")
self.send_error_notification_via_wechat(e)
if "没有" in str(e):
retry_num += 1
sleep(5)
else:
raise
self.page.quit()
def connect_redis(self):
"""建立 Redis 连接"""
self.r = redis.StrictRedis(**self.REDIS_CONFIG)
try:
self.r.ping() # 测试连接
print("✅ 成功连接到 Redis")
except redis.exceptions.ConnectionError as e:
print(f"❌ 无法连接到 Redis: {e}")
raise
def send_success_message_via_wechat(self):
webhook_url = 'http://47.112.96.71:8082/selection/sendMessage' # 替换为你的企业微信机器人的Webhook URL
data = {
"account": self.receiver_name,
"title": '【TK获取cookie成功提醒】',
"content": f'账号: {self.account}, 店铺: {self.store_name}, 程序运行时间: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'
}
response = requests.post(url=webhook_url, data=data, timeout=15)
if response.status_code == 200:
print("已成功发送通知到企业微信")
else:
print(f"发送通知失败: {response.text}")
def send_error_notification_via_wechat(self, error_message):
webhook_url = 'http://47.112.96.71:8082/selection/sendMessage' # 替换为你的企业微信机器人的Webhook URL
data = {
"account": self.receiver_name,
'title': '【TK获取cookie异常提醒】',
'content': f'账号:{self.account},店铺:{self.store_name},错误信息:{error_message}, 时间: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'
}
response = requests.post(url=webhook_url, data=data, timeout=15)
if response.status_code == 200:
print("已成功发送错误通知到企业微信")
else:
print(f"发送错误通知失败: {response.text}")
def run(self):
self.connect_redis()
self.change_language()
# self.page.quit()
if __name__ == '__main__':
TkOrderExport().run()
import os
os.environ['NO_PROXY'] = 'stackoverflow.com'
import logging
logging.captureWarnings(True)
from DrissionPage import ChromiumPage,ChromiumOptions
import time
from datetime import datetime, timedelta
from time import sleep
from random import randint
import requests
import math
import pandas as pd
import redis
import json
from pathlib import Path
import re
class TkOrderExport():
def __init__(self):
co = ChromiumOptions().headless()
self.page = ChromiumPage(co)
# 修改请求头
self.headers = {
'accept': '*/*',
'accept-language': 'en-US,en;q=0.9', # 'en-US,en;q=0.9'
'cache-control': 'no-cache',
'content-type': 'application/json; charset=UTF-8',
'origin': 'https://www.tiktok.com',
'pragma': 'no-cache',
'priority': 'u=1, i',
'referer': 'https://www.tiktok.com/',
'sec-ch-ua': '"Google Chrome";v="135", "Not-A.Brand";v="8", "Chromium";v="135"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'cross-site',
'sec-fetch-storage-access': 'active',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36',
}
self.shop_code_ceshi = "CNUSCBW2ELPC"
# 店铺账号
self.account = 'boao123tk@outlook.com'
# 下载文件路径
self.download_folder = r"D:\Downloads"
self.file_suffix = ".xlsx" # 设置你期望的文件名后缀(包括点)
self.receiver_name = 'wangjing5'
# Redis 配置信息
self.REDIS_CONFIG = {
'host': '120.79.147.190',
'port': 6379,
'password': 'fG7#vT6kQ1pX',
'db': 13,
'decode_responses': True
}
self.url = "https://affiliate.tiktokglobalshop.com/product/order?shop_region=US"
self.cookies = self.get_cookies()
def login_with_cookies(self):
# 检查是否登录状态
try:
self.page.get(self.url)
self.page.set.window.max()
tou_xiang = self.page.ele('xpath://span[@class="m4b-avatar-image"]', timeout=10)
tou_xiang.click()
sleep(randint(5, 10))
self.store_name = self.page.ele('xpath://div[@class="text-body-m-medium text-neutral-text1"]',timeout=10).text
print('店铺名:', self.store_name)
sleep(randint(1, 3))
self.shop_code = self.page.ele('xpath://div[@class="text-body-s text-neutral-text4"]/span', timeout=10).text
self.shop_code = self.shop_code.split(': ')[1]
print('店铺编号:', self.shop_code)
sleep(randint(1, 3))
if self.shop_code and str(self.shop_code) == self.shop_code_ceshi:
print("在登录状态")
else:
print("不一样")
raise
except:
print("需要登录")
try:
# 打开一个空白页,否则无法设置 cookie
self.page.get('https://www.tiktok.com')
# 设置 cookies
for ck in self.cookies:
self.page.set.cookies(ck)
# 重新打开目标网址,此时应为已登录状态
self.page.get(self.url)
time.sleep(5)
except:
print("登录失败")
def save_cookie(self, cookies):
"""将整个 cookies 列表一次性存入 Redis,作为一条 JSON 数据"""
key = f"tk_shop_cookie:{self.shop_code}:order:list"
# 将整个 cookies 列表转换为 JSON 字符串
value = json.dumps(cookies, ensure_ascii=False)
# 存储到 Redis(覆盖旧数据)
self.r.set(key, value)
print(f"💾 已将 {len(cookies)} 个 cookie 存入 Redis,键为: {key}")
def get_cookies(self):
self.r = redis.StrictRedis(**self.REDIS_CONFIG)
"""从 Redis 中获取并解析 cookies"""
key = f"tk_shop_cookie:{self.shop_code_ceshi}:order:list"
cookie_json = self.r.get(key)
if cookie_json:
cookies = json.loads(cookie_json)
print(f"🔄 从 Redis 成功读取 {len(cookies)} 个 cookie")
return cookies
else:
print("❌ 未找到对应的 cookie 数据")
return None
def change_language(self):
self.login_with_cookies()
retry_num = 0
max_retry = 1 # 最多重试次数
while retry_num <= max_retry:
try:
tou_xiang = self.page.ele('xpath://span[@class="m4b-avatar-image"]', timeout=10)
tou_xiang.click()
sleep(randint(5, 10))
self.store_name = self.page.ele('xpath://div[@class="text-body-m-medium text-neutral-text1"]',timeout=10).text
print('店铺名:', self.store_name)
sleep(randint(1, 3))
self.shop_code = self.page.ele('xpath://div[@class="text-body-s text-neutral-text4"]/span',
timeout=10).text
self.shop_code = self.shop_code.split(': ')[1]
print('店铺编号:', self.shop_code)
sleep(randint(1, 3))
cookies = self.page.cookies()
self.save_cookie(cookies)
self.send_success_message_via_wechat()
break
except Exception as e:
print(f"change_language出现错误: {e}")
self.send_error_notification_via_wechat(e)
if "没有" in str(e):
retry_num += 1
sleep(5)
else:
raise
self.page.quit()
def connect_redis(self):
"""建立 Redis 连接"""
self.r = redis.StrictRedis(**self.REDIS_CONFIG)
try:
self.r.ping() # 测试连接
print("✅ 成功连接到 Redis")
except redis.exceptions.ConnectionError as e:
print(f"❌ 无法连接到 Redis: {e}")
raise
def send_success_message_via_wechat(self):
webhook_url = 'http://47.112.96.71:8082/selection/sendMessage' # 替换为你的企业微信机器人的Webhook URL
data = {
"account": self.receiver_name,
"title": '【TK获取cookie成功提醒】',
"content": f'账号: {self.account}, 店铺: {self.store_name}, 程序运行时间: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'
}
response = requests.post(url=webhook_url, data=data, timeout=15)
if response.status_code == 200:
print("已成功发送通知到企业微信")
else:
print(f"发送通知失败: {response.text}")
def send_error_notification_via_wechat(self, error_message):
webhook_url = 'http://47.112.96.71:8082/selection/sendMessage' # 替换为你的企业微信机器人的Webhook URL
data = {
"account": self.receiver_name,
'title': '【TK获取cookie异常提醒】',
'content': f'账号:{self.account},店铺:{self.store_name},错误信息:{error_message}, 时间: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'
}
response = requests.post(url=webhook_url, data=data, timeout=15)
if response.status_code == 200:
print("已成功发送错误通知到企业微信")
else:
print(f"发送错误通知失败: {response.text}")
def run(self):
self.connect_redis()
self.change_language()
# self.page.quit()
if __name__ == '__main__':
TkOrderExport().run()
import os
os.environ['NO_PROXY'] = 'stackoverflow.com'
import logging
logging.captureWarnings(True)
from DrissionPage import ChromiumPage,ChromiumOptions
import time
from datetime import datetime, timedelta
from time import sleep
from random import randint
import requests
import math
import pandas as pd
import redis
import json
from pathlib import Path
import re
class TkOrderExport():
def __init__(self):
co = ChromiumOptions().headless()
self.page = ChromiumPage(co)
# 修改请求头
self.headers = {
'accept': '*/*',
'accept-language': 'en-US,en;q=0.9', # 'en-US,en;q=0.9'
'cache-control': 'no-cache',
'content-type': 'application/json; charset=UTF-8',
'origin': 'https://www.tiktok.com',
'pragma': 'no-cache',
'priority': 'u=1, i',
'referer': 'https://www.tiktok.com/',
'sec-ch-ua': '"Google Chrome";v="135", "Not-A.Brand";v="8", "Chromium";v="135"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'cross-site',
'sec-fetch-storage-access': 'active',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36',
}
self.shop_code_ceshi = "CNUSCB78ELWE"
# 店铺账号
self.account = 'PatelaiChank@outlook.com'
# 下载文件路径
self.download_folder = r"D:\Downloads"
self.file_suffix = ".xlsx" # 设置你期望的文件名后缀(包括点)
self.receiver_name = 'wangjing5'
# Redis 配置信息
self.REDIS_CONFIG = {
'host': '120.79.147.190',
'port': 6379,
'password': 'fG7#vT6kQ1pX',
'db': 13,
'decode_responses': True
}
self.url = "https://affiliate.tiktokglobalshop.com/product/order?shop_region=US"
self.cookies = self.get_cookies()
def login_with_cookies(self):
# 检查是否登录状态
try:
self.page.get(self.url)
self.page.set.window.max()
tou_xiang = self.page.ele('xpath://span[@class="m4b-avatar-image"]', timeout=10)
tou_xiang.click()
sleep(randint(5, 10))
self.store_name = self.page.ele('xpath://div[@class="text-body-m-medium text-neutral-text1"]',timeout=10).text
print('店铺名:', self.store_name)
sleep(randint(1, 3))
self.shop_code = self.page.ele('xpath://div[@class="text-body-s text-neutral-text4"]/span', timeout=10).text
self.shop_code = self.shop_code.split(': ')[1]
print('店铺编号:', self.shop_code)
sleep(randint(1, 3))
if self.shop_code and str(self.shop_code) == self.shop_code_ceshi:
print("在登录状态")
else:
print("不一样")
raise
except:
print("需要登录")
try:
# 打开一个空白页,否则无法设置 cookie
self.page.get('https://www.tiktok.com')
# 设置 cookies
for ck in self.cookies:
self.page.set.cookies(ck)
# 重新打开目标网址,此时应为已登录状态
self.page.get(self.url)
time.sleep(5)
except:
print("登录失败")
def save_cookie(self, cookies):
"""将整个 cookies 列表一次性存入 Redis,作为一条 JSON 数据"""
key = f"tk_shop_cookie:{self.shop_code}:order:list"
# 将整个 cookies 列表转换为 JSON 字符串
value = json.dumps(cookies, ensure_ascii=False)
# 存储到 Redis(覆盖旧数据)
self.r.set(key, value)
print(f"💾 已将 {len(cookies)} 个 cookie 存入 Redis,键为: {key}")
def get_cookies(self):
self.r = redis.StrictRedis(**self.REDIS_CONFIG)
"""从 Redis 中获取并解析 cookies"""
key = f"tk_shop_cookie:{self.shop_code_ceshi}:order:list"
cookie_json = self.r.get(key)
if cookie_json:
cookies = json.loads(cookie_json)
print(f"🔄 从 Redis 成功读取 {len(cookies)} 个 cookie")
return cookies
else:
print("❌ 未找到对应的 cookie 数据")
return None
def change_language(self):
self.login_with_cookies()
retry_num = 0
max_retry = 1 # 最多重试次数
while retry_num <= max_retry:
try:
tou_xiang = self.page.ele('xpath://span[@class="m4b-avatar-image"]', timeout=10)
tou_xiang.click()
sleep(randint(5, 10))
self.store_name = self.page.ele('xpath://div[@class="text-body-m-medium text-neutral-text1"]',timeout=10).text
print('店铺名:', self.store_name)
sleep(randint(1, 3))
self.shop_code = self.page.ele('xpath://div[@class="text-body-s text-neutral-text4"]/span',
timeout=10).text
self.shop_code = self.shop_code.split(': ')[1]
print('店铺编号:', self.shop_code)
sleep(randint(1, 3))
cookies = self.page.cookies()
self.save_cookie(cookies)
self.send_success_message_via_wechat()
break
except Exception as e:
print(f"change_language出现错误: {e}")
self.send_error_notification_via_wechat(e)
if "没有" in str(e):
retry_num += 1
sleep(5)
else:
raise
self.page.quit()
def connect_redis(self):
"""建立 Redis 连接"""
self.r = redis.StrictRedis(**self.REDIS_CONFIG)
try:
self.r.ping() # 测试连接
print("✅ 成功连接到 Redis")
except redis.exceptions.ConnectionError as e:
print(f"❌ 无法连接到 Redis: {e}")
raise
def send_success_message_via_wechat(self):
webhook_url = 'http://47.112.96.71:8082/selection/sendMessage' # 替换为你的企业微信机器人的Webhook URL
data = {
"account": self.receiver_name,
"title": '【TK获取cookie成功提醒】',
"content": f'账号: {self.account}, 店铺: {self.store_name}, 程序运行时间: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'
}
response = requests.post(url=webhook_url, data=data, timeout=15)
if response.status_code == 200:
print("已成功发送通知到企业微信")
else:
print(f"发送通知失败: {response.text}")
def send_error_notification_via_wechat(self, error_message):
webhook_url = 'http://47.112.96.71:8082/selection/sendMessage' # 替换为你的企业微信机器人的Webhook URL
data = {
"account": self.receiver_name,
'title': '【TK获取cookie异常提醒】',
'content': f'账号:{self.account},店铺:{self.store_name},错误信息:{error_message}, 时间: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'
}
response = requests.post(url=webhook_url, data=data, timeout=15)
if response.status_code == 200:
print("已成功发送错误通知到企业微信")
else:
print(f"发送错误通知失败: {response.text}")
def run(self):
self.connect_redis()
self.change_language()
# self.page.quit()
if __name__ == '__main__':
TkOrderExport().run()
import os
os.environ['NO_PROXY'] = 'stackoverflow.com'
import logging
logging.captureWarnings(True)
from DrissionPage import ChromiumPage,ChromiumOptions
import time
from datetime import datetime, timedelta
from time import sleep
from random import randint
import requests
import math
import pandas as pd
import redis
import json
from pathlib import Path
import re
class TkOrderExport():
def __init__(self):
co = ChromiumOptions().headless()
self.page = ChromiumPage(co)
# 修改请求头
self.headers = {
'accept': '*/*',
'accept-language': 'en-US,en;q=0.9', # 'en-US,en;q=0.9'
'cache-control': 'no-cache',
'content-type': 'application/json; charset=UTF-8',
'origin': 'https://www.tiktok.com',
'pragma': 'no-cache',
'priority': 'u=1, i',
'referer': 'https://www.tiktok.com/',
'sec-ch-ua': '"Google Chrome";v="135", "Not-A.Brand";v="8", "Chromium";v="135"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'cross-site',
'sec-fetch-storage-access': 'active',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36',
}
self.shop_code_ceshi = "CNUSCBS2EL7D"
# 店铺账号
self.account = 'qiuaner.tiktok@outlook.com'
# 下载文件路径
self.download_folder = r"D:\Downloads"
self.file_suffix = ".xlsx" # 设置你期望的文件名后缀(包括点)
self.receiver_name = 'wangjing5'
# Redis 配置信息
self.REDIS_CONFIG = {
'host': '120.79.147.190',
'port': 6379,
'password': 'fG7#vT6kQ1pX',
'db': 13,
'decode_responses': True
}
self.url = "https://affiliate.tiktokglobalshop.com/product/order?shop_region=US"
self.cookies = self.get_cookies()
def login_with_cookies(self):
# 检查是否登录状态
try:
self.page.get(self.url)
self.page.set.window.max()
tou_xiang = self.page.ele('xpath://span[@class="m4b-avatar-image"]', timeout=10)
tou_xiang.click()
sleep(randint(5, 10))
self.store_name = self.page.ele('xpath://div[@class="text-body-m-medium text-neutral-text1"]',timeout=10).text
print('店铺名:', self.store_name)
sleep(randint(1, 3))
self.shop_code = self.page.ele('xpath://div[@class="text-body-s text-neutral-text4"]/span', timeout=10).text
self.shop_code = self.shop_code.split(': ')[1]
print('店铺编号:', self.shop_code)
sleep(randint(1, 3))
if self.shop_code and str(self.shop_code) == self.shop_code_ceshi:
print("在登录状态")
else:
print("不一样")
raise
except:
print("需要登录")
try:
# 打开一个空白页,否则无法设置 cookie
self.page.get('https://www.tiktok.com')
# 设置 cookies
for ck in self.cookies:
self.page.set.cookies(ck)
# 重新打开目标网址,此时应为已登录状态
self.page.get(self.url)
time.sleep(5)
except:
print("登录失败")
def save_cookie(self, cookies):
"""将整个 cookies 列表一次性存入 Redis,作为一条 JSON 数据"""
key = f"tk_shop_cookie:{self.shop_code}:order:list"
# 将整个 cookies 列表转换为 JSON 字符串
value = json.dumps(cookies, ensure_ascii=False)
# 存储到 Redis(覆盖旧数据)
self.r.set(key, value)
print(f"💾 已将 {len(cookies)} 个 cookie 存入 Redis,键为: {key}")
def get_cookies(self):
self.r = redis.StrictRedis(**self.REDIS_CONFIG)
"""从 Redis 中获取并解析 cookies"""
key = f"tk_shop_cookie:{self.shop_code_ceshi}:order:list"
cookie_json = self.r.get(key)
if cookie_json:
cookies = json.loads(cookie_json)
print(f"🔄 从 Redis 成功读取 {len(cookies)} 个 cookie")
return cookies
else:
print("❌ 未找到对应的 cookie 数据")
return None
def change_language(self):
self.login_with_cookies()
retry_num = 0
max_retry = 1 # 最多重试次数
while retry_num <= max_retry:
try:
tou_xiang = self.page.ele('xpath://span[@class="m4b-avatar-image"]', timeout=10)
tou_xiang.click()
sleep(randint(5, 10))
self.store_name = self.page.ele('xpath://div[@class="text-body-m-medium text-neutral-text1"]',timeout=10).text
print('店铺名:', self.store_name)
sleep(randint(1, 3))
self.shop_code = self.page.ele('xpath://div[@class="text-body-s text-neutral-text4"]/span',
timeout=10).text
self.shop_code = self.shop_code.split(': ')[1]
print('店铺编号:', self.shop_code)
sleep(randint(1, 3))
cookies = self.page.cookies()
self.save_cookie(cookies)
self.send_success_message_via_wechat()
break
except Exception as e:
print(f"change_language出现错误: {e}")
self.send_error_notification_via_wechat(e)
if "没有" in str(e):
retry_num += 1
sleep(5)
else:
raise
self.page.quit()
def connect_redis(self):
"""建立 Redis 连接"""
self.r = redis.StrictRedis(**self.REDIS_CONFIG)
try:
self.r.ping() # 测试连接
print("✅ 成功连接到 Redis")
except redis.exceptions.ConnectionError as e:
print(f"❌ 无法连接到 Redis: {e}")
raise
def send_success_message_via_wechat(self):
webhook_url = 'http://47.112.96.71:8082/selection/sendMessage' # 替换为你的企业微信机器人的Webhook URL
data = {
"account": self.receiver_name,
"title": '【TK获取cookie成功提醒】',
"content": f'账号: {self.account}, 店铺: {self.store_name}, 程序运行时间: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'
}
response = requests.post(url=webhook_url, data=data, timeout=15)
if response.status_code == 200:
print("已成功发送通知到企业微信")
else:
print(f"发送通知失败: {response.text}")
def send_error_notification_via_wechat(self, error_message):
webhook_url = 'http://47.112.96.71:8082/selection/sendMessage' # 替换为你的企业微信机器人的Webhook URL
data = {
"account": self.receiver_name,
'title': '【TK获取cookie异常提醒】',
'content': f'账号:{self.account},店铺:{self.store_name},错误信息:{error_message}, 时间: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'
}
response = requests.post(url=webhook_url, data=data, timeout=15)
if response.status_code == 200:
print("已成功发送错误通知到企业微信")
else:
print(f"发送错误通知失败: {response.text}")
def run(self):
self.connect_redis()
self.change_language()
# self.page.quit()
if __name__ == '__main__':
TkOrderExport().run()
import os
os.environ['NO_PROXY'] = 'stackoverflow.com'
import logging
logging.captureWarnings(True)
from DrissionPage import ChromiumPage,ChromiumOptions
import time
from datetime import datetime, timedelta
from time import sleep
from random import randint
import requests
import math
import pandas as pd
import redis
import json
from pathlib import Path
import re
class TkOrderExport():
def __init__(self):
co = ChromiumOptions().headless()
self.page = ChromiumPage(co)
# 修改请求头
self.headers = {
'accept': '*/*',
'accept-language': 'en-US,en;q=0.9', # 'en-US,en;q=0.9'
'cache-control': 'no-cache',
'content-type': 'application/json; charset=UTF-8',
'origin': 'https://www.tiktok.com',
'pragma': 'no-cache',
'priority': 'u=1, i',
'referer': 'https://www.tiktok.com/',
'sec-ch-ua': '"Google Chrome";v="135", "Not-A.Brand";v="8", "Chromium";v="135"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'cross-site',
'sec-fetch-storage-access': 'active',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36',
}
self.shop_code_ceshi = "CNUSCBL8ELNG"
# 店铺账号
self.account = 'tiktokSesionafim@outlook.com'
# 下载文件路径
self.download_folder = r"D:\Downloads"
self.file_suffix = ".xlsx" # 设置你期望的文件名后缀(包括点)
self.receiver_name = 'wangjing5'
# Redis 配置信息
self.REDIS_CONFIG = {
'host': '120.79.147.190',
'port': 6379,
'password': 'fG7#vT6kQ1pX',
'db': 13,
'decode_responses': True
}
self.url = "https://affiliate.tiktokglobalshop.com/product/order?shop_region=US"
self.cookies = self.get_cookies()
def login_with_cookies(self):
# 检查是否登录状态
try:
self.page.get(self.url)
self.page.set.window.max()
tou_xiang = self.page.ele('xpath://span[@class="m4b-avatar-image"]', timeout=10)
tou_xiang.click()
sleep(randint(5, 10))
self.store_name = self.page.ele('xpath://div[@class="text-body-m-medium text-neutral-text1"]',timeout=10).text
print('店铺名:', self.store_name)
sleep(randint(1, 3))
self.shop_code = self.page.ele('xpath://div[@class="text-body-s text-neutral-text4"]/span', timeout=10).text
self.shop_code = self.shop_code.split(': ')[1]
print('店铺编号:', self.shop_code)
sleep(randint(1, 3))
if self.shop_code and str(self.shop_code) == self.shop_code_ceshi:
print("在登录状态")
else:
print("不一样")
raise
except:
print("需要登录")
try:
# 打开一个空白页,否则无法设置 cookie
self.page.get('https://www.tiktok.com')
# 设置 cookies
for ck in self.cookies:
self.page.set.cookies(ck)
# 重新打开目标网址,此时应为已登录状态
self.page.get(self.url)
time.sleep(5)
except:
print("登录失败")
def save_cookie(self, cookies):
"""将整个 cookies 列表一次性存入 Redis,作为一条 JSON 数据"""
key = f"tk_shop_cookie:{self.shop_code}:order:list"
# 将整个 cookies 列表转换为 JSON 字符串
value = json.dumps(cookies, ensure_ascii=False)
# 存储到 Redis(覆盖旧数据)
self.r.set(key, value)
print(f"💾 已将 {len(cookies)} 个 cookie 存入 Redis,键为: {key}")
def get_cookies(self):
self.r = redis.StrictRedis(**self.REDIS_CONFIG)
"""从 Redis 中获取并解析 cookies"""
key = f"tk_shop_cookie:{self.shop_code_ceshi}:order:list"
cookie_json = self.r.get(key)
if cookie_json:
cookies = json.loads(cookie_json)
print(f"🔄 从 Redis 成功读取 {len(cookies)} 个 cookie")
return cookies
else:
print("❌ 未找到对应的 cookie 数据")
return None
def change_language(self):
self.login_with_cookies()
retry_num = 0
max_retry = 1 # 最多重试次数
while retry_num <= max_retry:
try:
tou_xiang = self.page.ele('xpath://span[@class="m4b-avatar-image"]', timeout=10)
tou_xiang.click()
sleep(randint(5, 10))
self.store_name = self.page.ele('xpath://div[@class="text-body-m-medium text-neutral-text1"]',timeout=10).text
print('店铺名:', self.store_name)
sleep(randint(1, 3))
self.shop_code = self.page.ele('xpath://div[@class="text-body-s text-neutral-text4"]/span',
timeout=10).text
self.shop_code = self.shop_code.split(': ')[1]
print('店铺编号:', self.shop_code)
sleep(randint(1, 3))
cookies = self.page.cookies()
self.save_cookie(cookies)
self.send_success_message_via_wechat()
break
except Exception as e:
print(f"change_language出现错误: {e}")
self.send_error_notification_via_wechat(e)
if "没有" in str(e):
retry_num += 1
sleep(5)
else:
raise
self.page.quit()
def connect_redis(self):
"""建立 Redis 连接"""
self.r = redis.StrictRedis(**self.REDIS_CONFIG)
try:
self.r.ping() # 测试连接
print("✅ 成功连接到 Redis")
except redis.exceptions.ConnectionError as e:
print(f"❌ 无法连接到 Redis: {e}")
raise
def send_success_message_via_wechat(self):
webhook_url = 'http://47.112.96.71:8082/selection/sendMessage' # 替换为你的企业微信机器人的Webhook URL
data = {
"account": self.receiver_name,
"title": '【TK获取cookie成功提醒】',
"content": f'账号: {self.account}, 店铺: {self.store_name}, 程序运行时间: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'
}
response = requests.post(url=webhook_url, data=data, timeout=15)
if response.status_code == 200:
print("已成功发送通知到企业微信")
else:
print(f"发送通知失败: {response.text}")
def send_error_notification_via_wechat(self, error_message):
webhook_url = 'http://47.112.96.71:8082/selection/sendMessage' # 替换为你的企业微信机器人的Webhook URL
data = {
"account": self.receiver_name,
'title': '【TK获取cookie异常提醒】',
'content': f'账号:{self.account},店铺:{self.store_name},错误信息:{error_message}, 时间: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'
}
response = requests.post(url=webhook_url, data=data, timeout=15)
if response.status_code == 200:
print("已成功发送错误通知到企业微信")
else:
print(f"发送错误通知失败: {response.text}")
def run(self):
self.connect_redis()
self.change_language()
# self.page.quit()
if __name__ == '__main__':
TkOrderExport().run()
import os
os.environ['NO_PROXY'] = 'stackoverflow.com'
import logging
logging.captureWarnings(True)
from DrissionPage import ChromiumPage,ChromiumOptions
import time
from datetime import datetime, timedelta
from time import sleep
from random import randint
import requests
import math
import pandas as pd
import redis
import json
from pathlib import Path
import re
class TkOrderExport():
def __init__(self):
# self.page = ChromiumPage()
# 修改请求头
self.headers = {
'accept': '*/*',
'accept-language': 'en-US,en;q=0.9', # 'en-US,en;q=0.9'
'cache-control': 'no-cache',
'content-type': 'application/json; charset=UTF-8',
'origin': 'https://www.tiktok.com',
'pragma': 'no-cache',
'priority': 'u=1, i',
'referer': 'https://www.tiktok.com/',
'sec-ch-ua': '"Google Chrome";v="135", "Not-A.Brand";v="8", "Chromium";v="135"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'cross-site',
'sec-fetch-storage-access': 'active',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36',
}
# 配置你的参数
# 店铺账号
self.shop_code_ceshi = "CNUSCB92ELRB"
self.account = 'yswg313@gmail.com'
# 下载文件路径
self.download_folder = r"D:\Downloads"
self.file_suffix = ".xlsx" # 设置你期望的文件名后缀(包括点)
self.receiver_name = 'wangjing5'
# Redis 配置信息
self.REDIS_CONFIG = {
'host': '120.79.147.190',
'port': 6379,
'password': 'fG7#vT6kQ1pX',
'db': 13,
'decode_responses': True
}
# 配置 Chrome 浏览器 - 端口 9222
chrome_options = ChromiumOptions()
chrome_options.set_browser_path(r'C:\Program Files\Google\Chrome\Application\chrome.exe')
chrome_options.set_local_port(9221) # 设置 Chrome 的调试端口
chrome_options.headless()
self.page = ChromiumPage(addr_or_opts=chrome_options)
print(f"Chrome 浏览器运行在端口: {9221}")
self.url = "https://affiliate.tiktokglobalshop.com/product/order?shop_region=US"
self.cookies = self.get_cookies()
def login_with_cookies(self):
# 检查是否登录状态
try:
self.page.get(self.url)
self.page.set.window.max()
tou_xiang = self.page.ele('xpath://span[@class="m4b-avatar-image"]', timeout=10)
tou_xiang.click()
sleep(randint(5, 10))
self.store_name = self.page.ele('xpath://div[@class="text-body-m-medium text-neutral-text1"]',timeout=10).text
print('店铺名:', self.store_name)
sleep(randint(1, 3))
self.shop_code = self.page.ele('xpath://div[@class="text-body-s text-neutral-text4"]/span', timeout=10).text
self.shop_code = self.shop_code.split(': ')[1]
print('店铺编号:', self.shop_code)
sleep(randint(1, 3))
if self.shop_code and str(self.shop_code) == self.shop_code_ceshi:
print("在登录状态")
else:
print("不一样")
raise
except:
print("需要登录")
try:
# 打开一个空白页,否则无法设置 cookie
self.page.get('https://www.tiktok.com')
# 设置 cookies
for ck in self.cookies:
self.page.set.cookies(ck)
# 重新打开目标网址,此时应为已登录状态
self.page.get(self.url)
time.sleep(5)
except:
print("登录失败")
def save_cookie(self, cookies):
"""将整个 cookies 列表一次性存入 Redis,作为一条 JSON 数据"""
key = f"tk_shop_cookie:{self.shop_code}:order:list"
# 将整个 cookies 列表转换为 JSON 字符串
value = json.dumps(cookies, ensure_ascii=False)
# 存储到 Redis(覆盖旧数据)
self.r.set(key, value)
print(f"💾 已将 {len(cookies)} 个 cookie 存入 Redis,键为: {key}")
def get_cookies(self):
self.r = redis.StrictRedis(**self.REDIS_CONFIG)
"""从 Redis 中获取并解析 cookies"""
key = f"tk_shop_cookie:{self.shop_code_ceshi}:order:list"
cookie_json = self.r.get(key)
if cookie_json:
cookies = json.loads(cookie_json)
print(f"🔄 从 Redis 成功读取 {len(cookies)} 个 cookie")
return cookies
else:
print("❌ 未找到对应的 cookie 数据")
return None
def change_language(self):
self.login_with_cookies()
retry_num = 0
max_retry = 1 # 最多重试次数
while retry_num <= max_retry:
try:
tou_xiang = self.page.ele('xpath://span[@class="m4b-avatar-image"]', timeout=10)
tou_xiang.click()
sleep(randint(5, 10))
self.store_name = self.page.ele('xpath://div[@class="text-body-m-medium text-neutral-text1"]',timeout=10).text
print('店铺名:', self.store_name)
sleep(randint(1, 3))
self.shop_code = self.page.ele('xpath://div[@class="text-body-s text-neutral-text4"]/span',
timeout=10).text
self.shop_code = self.shop_code.split(': ')[1]
print('店铺编号:', self.shop_code)
sleep(randint(1, 3))
cookies = self.page.cookies()
self.save_cookie(cookies)
self.send_success_message_via_wechat()
break
except Exception as e:
print(f"change_language出现错误: {e}")
self.send_error_notification_via_wechat(e)
if "没有" in str(e):
retry_num += 1
sleep(5)
else:
raise
self.page.quit()
def connect_redis(self):
"""建立 Redis 连接"""
self.r = redis.StrictRedis(**self.REDIS_CONFIG)
try:
self.r.ping() # 测试连接
print("✅ 成功连接到 Redis")
except redis.exceptions.ConnectionError as e:
print(f"❌ 无法连接到 Redis: {e}")
raise
def send_success_message_via_wechat(self):
webhook_url = 'http://47.112.96.71:8082/selection/sendMessage' # 替换为你的企业微信机器人的Webhook URL
data = {
"account": self.receiver_name,
"title": '【TK获取cookie成功提醒】',
"content": f'账号: {self.account}, 店铺: {self.store_name}, 程序运行时间: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'
}
response = requests.post(url=webhook_url, data=data, timeout=15)
if response.status_code == 200:
print("已成功发送通知到企业微信")
else:
print(f"发送通知失败: {response.text}")
def send_error_notification_via_wechat(self, error_message):
webhook_url = 'http://47.112.96.71:8082/selection/sendMessage' # 替换为你的企业微信机器人的Webhook URL
data = {
"account": self.receiver_name,
'title': '【TK获取cookie异常提醒】',
'content': f'账号:{self.account},店铺:{self.store_name},错误信息:{error_message}, 时间: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'
}
response = requests.post(url=webhook_url, data=data, timeout=15)
if response.status_code == 200:
print("已成功发送错误通知到企业微信")
else:
print(f"发送错误通知失败: {response.text}")
def run(self):
self.connect_redis()
self.change_language()
# self.page.quit()
if __name__ == '__main__':
TkOrderExport().run()
import os
os.environ['NO_PROXY'] = 'stackoverflow.com'
import logging
logging.captureWarnings(True)
from DrissionPage import ChromiumPage,ChromiumOptions
import time
from datetime import datetime, timedelta
from time import sleep
from random import randint
import requests
import math
import pandas as pd
import redis
import json
from pathlib import Path
import re
class TkOrderExport():
def __init__(self):
co = ChromiumOptions().headless()
self.page = ChromiumPage(co)
# 修改请求头
self.headers = {
'accept': '*/*',
'accept-language': 'en-US,en;q=0.9', # 'en-US,en;q=0.9'
'cache-control': 'no-cache',
'content-type': 'application/json; charset=UTF-8',
'origin': 'https://www.tiktok.com',
'pragma': 'no-cache',
'priority': 'u=1, i',
'referer': 'https://www.tiktok.com/',
'sec-ch-ua': '"Google Chrome";v="135", "Not-A.Brand";v="8", "Chromium";v="135"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'cross-site',
'sec-fetch-storage-access': 'active',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36',
}
self.shop_code_ceshi = "CNUSCBP8EL8M"
# 店铺账号
self.account = 'ZehyaanuaHome@outlook.com'
# 下载文件路径
self.download_folder = r"D:\Downloads"
self.file_suffix = ".xlsx" # 设置你期望的文件名后缀(包括点)
self.receiver_name = 'wangjing5'
# Redis 配置信息
self.REDIS_CONFIG = {
'host': '120.79.147.190',
'port': 6379,
'password': 'fG7#vT6kQ1pX',
'db': 13,
'decode_responses': True
}
self.url = "https://affiliate.tiktokglobalshop.com/product/order?shop_region=US"
self.cookies = self.get_cookies()
def login_with_cookies(self):
# 检查是否登录状态
try:
self.page.get(self.url)
self.page.set.window.max()
tou_xiang = self.page.ele('xpath://span[@class="m4b-avatar-image"]', timeout=10)
tou_xiang.click()
sleep(randint(5, 10))
self.store_name = self.page.ele('xpath://div[@class="text-body-m-medium text-neutral-text1"]',timeout=10).text
print('店铺名:', self.store_name)
sleep(randint(1, 3))
self.shop_code = self.page.ele('xpath://div[@class="text-body-s text-neutral-text4"]/span', timeout=10).text
self.shop_code = self.shop_code.split(': ')[1]
print('店铺编号:', self.shop_code)
sleep(randint(1, 3))
if self.shop_code and str(self.shop_code) == self.shop_code_ceshi:
print("在登录状态")
else:
print("不一样")
raise
except:
print("需要登录")
try:
# 打开一个空白页,否则无法设置 cookie
self.page.get('https://www.tiktok.com')
# 设置 cookies
for ck in self.cookies:
self.page.set.cookies(ck)
# 重新打开目标网址,此时应为已登录状态
self.page.get(self.url)
time.sleep(5)
except:
print("登录失败")
def save_cookie(self, cookies):
"""将整个 cookies 列表一次性存入 Redis,作为一条 JSON 数据"""
key = f"tk_shop_cookie:{self.shop_code}:order:list"
# 将整个 cookies 列表转换为 JSON 字符串
value = json.dumps(cookies, ensure_ascii=False)
# 存储到 Redis(覆盖旧数据)
self.r.set(key, value)
print(f"💾 已将 {len(cookies)} 个 cookie 存入 Redis,键为: {key}")
def get_cookies(self):
self.r = redis.StrictRedis(**self.REDIS_CONFIG)
"""从 Redis 中获取并解析 cookies"""
key = f"tk_shop_cookie:{self.shop_code_ceshi}:order:list"
cookie_json = self.r.get(key)
if cookie_json:
cookies = json.loads(cookie_json)
print(f"🔄 从 Redis 成功读取 {len(cookies)} 个 cookie")
return cookies
else:
print("❌ 未找到对应的 cookie 数据")
return None
def change_language(self):
self.login_with_cookies()
retry_num = 0
max_retry = 1 # 最多重试次数
while retry_num <= max_retry:
try:
tou_xiang = self.page.ele('xpath://span[@class="m4b-avatar-image"]', timeout=10)
tou_xiang.click()
sleep(randint(5, 10))
self.store_name = self.page.ele('xpath://div[@class="text-body-m-medium text-neutral-text1"]',timeout=10).text
print('店铺名:', self.store_name)
sleep(randint(1, 3))
self.shop_code = self.page.ele('xpath://div[@class="text-body-s text-neutral-text4"]/span',
timeout=10).text
self.shop_code = self.shop_code.split(': ')[1]
print('店铺编号:', self.shop_code)
sleep(randint(1, 3))
cookies = self.page.cookies()
self.save_cookie(cookies)
self.send_success_message_via_wechat()
break
except Exception as e:
print(f"change_language出现错误: {e}")
self.send_error_notification_via_wechat(e)
if "没有" in str(e):
retry_num += 1
sleep(5)
else:
raise
self.page.quit()
def connect_redis(self):
"""建立 Redis 连接"""
self.r = redis.StrictRedis(**self.REDIS_CONFIG)
try:
self.r.ping() # 测试连接
print("✅ 成功连接到 Redis")
except redis.exceptions.ConnectionError as e:
print(f"❌ 无法连接到 Redis: {e}")
raise
def send_success_message_via_wechat(self):
webhook_url = 'http://47.112.96.71:8082/selection/sendMessage' # 替换为你的企业微信机器人的Webhook URL
data = {
"account": self.receiver_name,
"title": '【TK获取cookie成功提醒】',
"content": f'账号: {self.account}, 店铺: {self.store_name}, 程序运行时间: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'
}
response = requests.post(url=webhook_url, data=data, timeout=15)
if response.status_code == 200:
print("已成功发送通知到企业微信")
else:
print(f"发送通知失败: {response.text}")
def send_error_notification_via_wechat(self, error_message):
webhook_url = 'http://47.112.96.71:8082/selection/sendMessage' # 替换为你的企业微信机器人的Webhook URL
data = {
"account": self.receiver_name,
'title': '【TK获取cookie异常提醒】',
'content': f'账号:{self.account},店铺:{self.store_name},错误信息:{error_message}, 时间: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'
}
response = requests.post(url=webhook_url, data=data, timeout=15)
if response.status_code == 200:
print("已成功发送错误通知到企业微信")
else:
print(f"发送错误通知失败: {response.text}")
def run(self):
self.connect_redis()
self.change_language()
# self.page.quit()
if __name__ == '__main__':
TkOrderExport().run()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment