Commit 60a70d74 by Peng

no message

parent efc3b2e1
......@@ -8,7 +8,7 @@ import sys
import time
import uuid
from urllib.parse import urlparse
from threading import Lock
import urllib3
from lxml import etree
......@@ -35,8 +35,6 @@ class Requests_param_val(BaseUtils):
print("站点名称:", self.site_name, '抓取项目', "代理ip:", self.proxy_name)
self.cookies_queue = Queue() # cookie队列
self.kafuka_producer_str = self.kafuka_connect()
self.next_page_lock = Lock()
self.headers_num_int_s = 0
def init_db_names(self):
self.engine_pg = self.pg_connect()
......@@ -353,7 +351,6 @@ class Requests_param_val(BaseUtils):
time.sleep(15)
def hex_md5(self, input_string):
# 要加密的字符串
# 创建一个MD5哈希对象
md5_hash = hashlib.md5()
# 使用输入字符串的字节更新哈希对象
......@@ -368,28 +365,26 @@ class Requests_param_val(BaseUtils):
def on_send_error(self, excp):
print("消息发送失败", excp)
def send_kafka(self, items=None, html_data=None, topic=None, num=3):
def send_kafka(self, items=None, html_data=None, topic=None):
print('向Kafka发送数据')
for i in range(5):
try:
if items:
print('232323232323')
del items['div_id_list']
future = self.kafuka_producer_str.send(topic, json.dumps(items))
future.add_callback(self.on_send_success).add_errback(self.on_send_error)
future.get(30)
if html_data:
future = self.kafuka_producer_str.send(topic, html_data)
future.add_callback(self.on_send_success).add_errback(self.on_send_error)
future.get(30)
print('向Kafka发送数据 发送成功')
with self.next_page_lock:
self.headers_num_int_s += 1
if self.headers_num_int_s % 10 == 0:
self.kafuka_producer_str.flush()
break
except Exception as e:
print(f"kafka发送失败(第{i + 1}/5次)", e)
time.sleep(2)
if i >= 1 and i % 2 == 1:
self.kafuka_producer_str = self.kafuka_connect(acks=True)
\ No newline at end of file
print(e)
if i >= 1:
self.kafuka_producer_str = self.kafuka_connect() # 调用kafka
try:
self.kafuka_producer_str.flush(timeout=30)
except KafkaTimeoutError as e:
print("flush 超时,跳过这次等待:", e)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment