Commit 8dd9963f by hezhe

''

parent bab731a0
......@@ -6,13 +6,9 @@ from playwright.sync_api import Browser, BrowserContext, Page, sync_playwright,
import redis as rd
import ddddocr
import re
import cv2
import base64
from io import BytesIO
from PIL import Image
import numpy as np
from func_timeout import func_set_timeout
from func_timeout.exceptions import FunctionTimedOut
# REDIS = {
......@@ -30,7 +26,7 @@ REDIS = {
'password': 'yswg@2019',
'db': 1
}
#
def singleton(cls, *args, **kw):
"""singleton mode.
......@@ -67,6 +63,7 @@ def md5(src: str, algorithm: str = "md5", digits: int = 32) -> str:
else:
return algorithm.hexdigest()
@singleton
class Redis(object):
def __init__(self):
......@@ -74,18 +71,21 @@ class Redis(object):
self.port = REDIS['port']
self.db = REDIS['db']
self.password = REDIS['password']
def get_instance(self):
self.pool = rd.ConnectionPool(
host=self.host,
port=self.port,
db=self.db,
password=self.password,
max_connections=3,
socket_timeout=5,
socket_connect_timeout=5,
retry_on_timeout=True,
)
def get_instance(self):
return rd.Redis(connection_pool=self.pool)
@func_set_timeout(10)
def sadd(key, value, use_md5=True):
"""add key-value to the sorted set.
......@@ -132,7 +132,6 @@ def ladd(key, value, use_md5=True):
return added == 1
@func_set_timeout(30)
def listpop(key) -> list:
"""lpop
......@@ -161,7 +160,16 @@ class ChinataxSpider(object):
self.context: BrowserContext = None
self.page: Page = None
self.padding_error = 1
self.seeds = [{'@class': 'com.alibaba.fastjson.JSONObject', 'fpdm': None, 'kprq': '20231229', 'u_key': '1742416930985218050', 'kjje': '3861.62', 'fphm': '23952000000063440070', 'jym': None}]
self.seeds = [
{
"u_key": "1934929493120057346",
"fpdm": "",
"fphm": "25429165833000096487",
"kprq": "20250519",
"kjje": "264.50",
"jym": ""
}
]
def base64_to_image(self, base64_str):
"""
......@@ -178,45 +186,36 @@ class ChinataxSpider(object):
return img
def get_img_base64(self):
# yzminfo = ""
yzminfo = ""
color = "black"
# img_base64 = ""
# count = 1
yzminfo = self.page.query_selector("#yzminfo").as_element().text_content()
self.page.wait_for_timeout(1000)
img_base64 = self.page.query_selector("#yzm_img").get_attribute("src")
self.page.wait_for_timeout(1000)
img_base64 = ""
count = 1
while len(img_base64) <= len("images/code.png") and count <= 5:
yzminfo = self.page.query_selector("#yzminfo").as_element().text_content()
img_base64 = self.page.query_selector("#yzm_img").get_attribute("src")
self.page.wait_for_timeout(1000)
count = count + 1
if "蓝色" in yzminfo:
color = 'blue'
if "红色" in yzminfo:
color = 'red'
if "黄色" in yzminfo:
color = 'yellow'
return color, img_base64
def get_img(self, count):
# count = 1
if self.page.query_selector("#yzm_img").get_attribute("src") == "images/code.png":
self.page.query_selector("#yzm_img").click()
self.page.wait_for_timeout(3000)
self.page.wait_for_load_state()
elif count != 0:
def get_img(self):
count = 1
color, img_base64 = self.get_img_base64()
print(f"第{count}次是{color}色")
while color != 'black':
self.page.query_selector("#yzm_img").click()
self.page.wait_for_timeout(3000)
self.page.wait_for_load_state()
color, img_base64 = self.get_img_base64()
# print(f"第{count}次是{color}色")
# while color != 'black':
# self.page.query_selector("#yzm_img").click()
# self.page.wait_for_timeout(3000)
# self.page.wait_for_load_state()
# count = count + 1
# color, img_base64 = self.get_img_base64()
# color = color
# img_base64 = img_base64
# print(f"第{count}次是{color}色")
count = count + 1
color, img_base64 = self.get_img_base64()
color = color
img_base64 = img_base64
print(f"第{count}次是{color}色")
# self.base64_to_image(img_base64).save(f"test第{count}次.png")
# image = self.base64_to_image(img_base64)
# image.save("test.png")
......@@ -227,59 +226,20 @@ class ChinataxSpider(object):
img = self.base64_to_image(img_base64)
if color == 'black':
return ocr.classification(img)
# 转换为cv2可以使用的对象
image = np.asarray(img)
one_color_img = self.save_appointed_color(image, color)
img = Image.fromarray(cv2.cvtColor(one_color_img, cv2.COLOR_BGR2RGB))
# todo 预处理
return ocr.classification(img)
def save_appointed_color(self, image, color):
hsv_image = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
if color == "red":
lower_red = np.array([0, 100, 100])
upper_red = np.array([10, 255, 255])
mask = cv2.inRange(hsv_image, lower_red, upper_red)
print("保留 red")
elif color == "blue":
# 定义要保留的颜色的范围
lower_blue = np.array([100, 50, 50])
upper_blue = np.array([140, 255, 255])
mask = cv2.inRange(hsv_image, lower_blue, upper_blue)
print("保留 blue")
elif color == "yellow":
# 定义要保留的颜色的范围
lower_yellow = np.array([26, 43, 46])
upper_yellow = np.array([34, 255, 255])
mask = cv2.inRange(hsv_image, lower_yellow, upper_yellow)
print("保留 yellow")
else:
# 定义要保留的颜色的范围(这里以红色为例)
lower_red = np.array([0, 50, 50])
upper_red = np.array([10, 255, 255])
# HSV颜色空间中红色的另一个范围
lower_red2 = np.array([170, 50, 50])
upper_red2 = np.array([180, 255, 255])
mask1 = cv2.inRange(hsv_image, lower_red, upper_red)
mask2 = cv2.inRange(hsv_image, lower_red2, upper_red2)
mask = mask1 + mask2
white = np.full_like(image, (255, 255, 255), dtype=np.uint8)
result = cv2.bitwise_and(white, white, mask=mask)
return result
def get_seed(self):
while True:
try:
seeds = listpop(self.seed_key)
break
except FunctionTimedOut as e:
logging.info(f"get seed timeout {e}")
continue
# seeds = self.seeds.pop()
seeds = spop(self.seed_key, 1)
# if self.seeds:
# seeds = self.seeds.pop()
# else:
# return None
if seeds:
seed = json.loads(seeds)
seed = json.loads(seeds[0])
# seed = seeds
if seed.get("fphm") and seed.get("kprq") and (seed.get("kjje") or seed.get("jym")):
seed = {k: v.strip() if isinstance(v, str) else "" for k,v in seed.items()}
if seed.get("fphm") and seed.get("kprq"):
return seed
else:
error_msg = "seed error"
......@@ -293,46 +253,46 @@ class ChinataxSpider(object):
else:
return None
def recaptcha(self, count):
color, img_base64 = self.get_img(count)
def recaptcha(self):
color, img_base64 = self.get_img()
code = self.ddddocr_imge_get_code(color, img_base64)
if not code:
logging.info("验证码识别失败")
return False
logging.info(f"验证码识别 {code}")
self.page.locator("#yzm").fill(code)
self.page.wait_for_timeout(1000)
# 点击空白页失去焦点
self.page.locator("#pageshow").click()
self.page.wait_for_timeout(1000)
if "display: none" not in self.page.query_selector("#checkfp").get_attribute("style"):
self.page.locator("#checkfp").click()
self.page.wait_for_timeout(1000)
# 判断验证码是否通过
if not self.page.query_selector("#dialog-body"):
if error_msg := self.page.query_selector("#popup_message"):
if "超过该张发票当日查验次数" in error_msg.text_content():
self.page.locator("#popup_ok").click()
self.page.wait_for_timeout(1000)
return "count error"
if "验证码请求次数过于频繁" in error_msg.text_content():
self.page.locator("#popup_ok").click()
self.page.wait_for_timeout(1000)
return "recaptcha count error"
self.page.locator("#popup_ok").click()
self.page.wait_for_timeout(1000)
# 刷新验证码
# self.page.locator("#yzm_img").click()
# self.page.wait_for_timeout(1000)
logging.info("验证码处理错误")
return False
if color == 'black':
self.page.locator("#yzm").fill(code)
# 点击空白页失去焦点
self.page.locator("#pageshow").click()
if "display: none" not in self.page.query_selector("#checkfp").get_attribute("style"):
self.page.locator("#checkfp").click()
self.page.wait_for_timeout(2000)
# 判断验证码是否通过
if not self.page.query_selector("#dialog-body"):
if error_msg := self.page.query_selector("#popup_message"):
if "超过该张发票当日查验次数" in error_msg.text_content():
self.page.locator("#popup_ok").click()
self.page.wait_for_timeout(2000)
return "count error"
if "验证码请求次数过于频繁" in error_msg.text_content():
self.page.locator("#popup_ok").click()
self.page.wait_for_timeout(2000)
return "recaptcha count error"
self.page.locator("#popup_ok").click()
# 刷新验证码
self.page.locator("#yzm_img").click()
self.page.wait_for_timeout(2000)
logging.info("验证码处理错误")
return False
else:
return True
else:
return True
return "seed error"
else:
return "seed error"
return False
def get_item(self, seed):
def crawl(self, url, seed):
# 需要打开的网站
self.page.goto(url)
self.page.wait_for_timeout(1000)
logging.info(f"fpdm --> {seed.get('fpdm', '')}")
logging.info(f"seed --> {seed}")
# fpdm --> None
......@@ -345,10 +305,10 @@ class ChinataxSpider(object):
self.page.wait_for_timeout(3000)
if "开具金额" in self.page.query_selector(
"span[id='context']").text_content() or "价税合计" in self.page.query_selector(
"span[id='context']").text_content() or '票价' in self.page.query_selector(
"span[id='context']").text_content():
kjje = seed.get("kjje")
else:
kjje = seed.get("jym", "")[-6::]
self.page.locator("#kjje").fill(kjje)
......@@ -360,7 +320,7 @@ class ChinataxSpider(object):
"dom": error_msg,
}
xadd(self.save_key, data)
logging.info(f"{error_msg}")
logging.info(f"fpdmjy - >>{error_msg}")
self.page.close()
self.context.close()
elif error_msg := self.page.query_selector("#fphmjy").text_content().strip():
......@@ -369,6 +329,15 @@ class ChinataxSpider(object):
"dom": error_msg,
}
xadd(self.save_key, data)
logging.info(f"fphmjy - >>{error_msg}")
self.page.close()
self.context.close()
elif self.page.query_selector("xpath=.//div[@class='tip_common_wrong font_red tip_common_right']"):
data = {
"u_key": seed.get("u_key"),
"dom": "发票号码有误!",
}
xadd(self.save_key, data)
logging.info(f"{error_msg}")
self.page.close()
self.context.close()
......@@ -378,7 +347,7 @@ class ChinataxSpider(object):
"dom": error_msg,
}
xadd(self.save_key, data)
logging.info(f"{error_msg}")
logging.info(f"kprqjy - >>{error_msg}")
self.page.close()
self.context.close()
elif error_msg := self.page.query_selector("#kjjejy").text_content().strip():
......@@ -387,14 +356,14 @@ class ChinataxSpider(object):
"dom": error_msg,
}
xadd(self.save_key, data)
logging.info(f"{error_msg}")
logging.info(f"kjjejy - >>{error_msg}")
self.page.close()
self.context.close()
else:
error = 0
for i in range(8):
# 判断验证码是否通过
if error_msg := self.recaptcha(i):
if error_msg := self.recaptcha():
if error_msg in ["count error", "seed error"]:
data = {
"u_key": seed.get("u_key"),
......@@ -427,16 +396,10 @@ class ChinataxSpider(object):
if error == 0:
logging.info("失败8次处理,将任务重新推送到redis")
ladd(self.seed_key, json.dumps(seed), use_md5=False)
sadd(self.seed_key, json.dumps(seed), use_md5=False)
self.page.close()
self.context.close()
def crawl(self, url, seed):
# 需要打开的网站
self.page.goto(url)
self.page.wait_for_timeout(1000)
self.get_item(seed)
def change_user(self):
user_agent = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.45 Safari/537.36'
self.context = self.browser.new_context(
......@@ -455,15 +418,20 @@ class ChinataxSpider(object):
Object.defineProperties(navigator, {webdriver:{get:()=>undefined}});
"""
self.page.add_init_script(js)
self.page.set_default_timeout(300000)
# self.page.add_init_script(
# "const newProto = navigator.__proto__; delete newProto.webdriver; navigator.__proto__ = newProto;")
def run(self):
while True:
seed = self.get_seed()
try:
seed = self.get_seed()
except rd.exceptions.ConnectionError as e:
logging.info(f"ConnectionError error {e}")
continue
try:
if seed:
logging.info("获取任务成功")
self.change_user()
url = "https://inv-veri.chinatax.gov.cn/index.html"
self.crawl(url, seed)
......@@ -471,23 +439,23 @@ class ChinataxSpider(object):
time.sleep(30)
logging.info('no task sleep 30s')
except Error as e:
logging.info(f"--> playwright error ")
logging.info(f"playwright error {e}")
self.page.close()
self.context.close()
ladd(self.seed_key, json.dumps(seed), use_md5=False)
sadd(self.seed_key, json.dumps(seed), use_md5=False)
continue
except FunctionTimedOut as e:
logging.info(f"--> FunctionTimedOut ")
except rd.exceptions.ConnectionError as e:
logging.info(f"ConnectionError error {e}")
self.page.close()
self.context.close()
ladd(self.seed_key, json.dumps(seed), use_md5=False)
sadd(self.seed_key, json.dumps(seed), use_md5=False)
continue
except Exception as e:
self.page.close()
self.context.close()
if f"{e}" == "Incorrect padding":
ladd(self.seed_key, json.dumps(seed), use_md5=False)
logging.info(f"--> Incorrect padding error {e}")
sadd(self.seed_key, json.dumps(seed), use_md5=False)
logging.info(f"Incorrect padding error {e}")
self.padding_error += 1
if self.padding_error >= 5:
time.sleep(1200)
......@@ -503,17 +471,6 @@ class ChinataxSpider(object):
xadd(self.save_key, data)
continue
def send_msg(self, account, title, content):
import requests
url = 'http://47.112.96.71:8082/selection/sendMessage'
data = {
'account': account,
'title': title,
'content': content
}
print(data)
requests.post(url=url, data=data, timeout=15)
def main(self):
headless = False
# headless = True
......@@ -529,16 +486,10 @@ class ChinataxSpider(object):
# )
self.browser = _playwright.chromium.launch(
headless=False,
executable_path="C:\Program Files\Google\Chrome\Application\chrome.exe",
# executable_path="C:\Program Files\Google\Chrome\Application\chrome.exe",
executable_path=r"C:\Program Files (x86)\ChatAI Chrome\ChatAI_Chrome.exe",
)
try:
self.run()
except Exception as e:
print(e)
account = 'hezhe'
title = f'税务爬取脚本 error'
content = f"税务爬取脚本 error 时间:{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}"
self.send_msg(account, title, content)
self.run()
if __name__ == '__main__':
......@@ -553,3 +504,4 @@ if __name__ == '__main__':
# }
# ladd('finance:sp_invoice_queue', json.dumps(seed), use_md5=False)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment