Commit d065bab7 by Peng

no message

parent 44057a7b
import sys
import os
import sys
import traceback
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from curl_cffi import requests
......@@ -14,7 +15,8 @@ import html
import re
from html import unescape
import urllib.parse
from sqlalchemy import text
from datetime import datetime as date_time
class recall_cases():
......@@ -25,7 +27,6 @@ class recall_cases():
self.uk_drug_device_url = 'https://www.gov.uk/drug-device-alerts'
self.mysql_connect1()
week = time.strftime("%W")
self.yer_week = f'2025_{week}'
def mysql_connect1(self):
self.mysql_db = BaseUtils().mysql_connect()
......@@ -34,7 +35,7 @@ class recall_cases():
"""
支持三种日期格式:
- 中文或英文格式:"十月 04, 2011" 或 "September 28, 2011" (月份在前)
- 英文格式:"18 February 2025" (日在前)
- 英文格式:"18 February 2026" (日在前)
返回标准的 "YYYY-MM-DD" 格式日期。
"""
# 定义中英文月份映射
......@@ -107,7 +108,7 @@ class recall_cases():
image_url = 'https://www.cpsc.gov' + image_url_list[0].strip() if image_url_list else None # 图片
if recall_date:
recall_date = self._parse_date_str(recall_date)
data_list = ['us_recalls_product', recall_date, product_title, hazard, image_url, a_href,brand]
data_list = ['us_recalls_product', recall_date, product_title, hazard, image_url, a_href, brand]
return data_list
else:
return None
......@@ -149,8 +150,8 @@ class recall_cases():
df = pd.DataFrame(data=save_data_list,
columns=['data_type', 'recall_date', 'product_title', 'hazard',
'image_url',
'ext_url','brand', 'recall_title', 'country'])
df.to_sql('recall_cases_data', con=self.mysql_db, if_exists="append", index=False)
'ext_url', 'brand', 'recall_title', 'country'])
self.mysql_db.to_sql(df, 'recall_cases_data', if_exists="append")
except:
is_None = False
break
......@@ -216,7 +217,7 @@ class recall_cases():
dict_item = response.json()
data_lists = dict_item['data']
for data in data_lists:
print(data,'344444444')
print(data, '344444444')
data_list = []
try:
# 逐项解码
......@@ -244,26 +245,28 @@ class recall_cases():
response2 = self._request(headers=headers, url=url)
response_detail = etree.HTML(response2.text)
src_list = response_detail.xpath("//div[@id='recall-photos']//img/@src")
Brand_list = response_detail.xpath("//div[contains(text(),'Brand Name')]/following-sibling::div//text()")
Brand_list = response_detail.xpath(
"//div[contains(text(),'Brand Name')]/following-sibling::div//text()")
if Brand_list:
brand = ''.join(Brand_list).strip()
else:
brand = None
print(brand,'Brand_list::',Brand_list)
print(brand, 'Brand_list::', Brand_list)
if src_list:
image_url = 'https://www.fda.gov' + src_list[0]
else:
image_url = None
print('image_url:', image_url)
data_list.append(['us_fba_recalls', date, link_text, hazard, image_url, url, recall_title, 'us',
product_category,brand])
product_category, brand])
try:
df = pd.DataFrame(data=data_list,
columns=['data_type', 'recall_date', 'product_title', 'hazard',
'image_url',
'ext_url', 'recall_title', 'country', 'product_category','brand'])
'ext_url', 'recall_title', 'country', 'product_category',
'brand'])
df.drop_duplicates(['recall_date', 'product_title', 'ext_url'], inplace=True)
df.to_sql('recall_cases_data', con=self.mysql_db, if_exists="append", index=False)
self.mysql_db.to_sql(df, 'recall_cases_data', if_exists="append")
except:
is_None = False
break
......@@ -336,7 +339,7 @@ class recall_cases():
brand = brands[0].get('brand')
else:
brand = None
print('brand::1',brand)
print('brand::1', brand)
hazard = items['risk']['versions'][0]['riskDescription']
print(hazard)
ext_url = 'https://ec.europa.eu/safety-gate-alerts/screen/webReport/alertDetail/' + str(
......@@ -350,29 +353,29 @@ class recall_cases():
print(image_url)
data_list.append(
[date, product_category, product_title, recall_title, hazard, 'eu_recall', image_url, 'eu',
ext_url,data_json,brand])
ext_url, data_json, brand])
keys = [
"recall_date", "product_category", "product_title", "recall_title",
"hazard", "data_type", "image_url", "country", "ext_url", "data_json", "brand"
]
# 把 list of list 转成 list of dict
dict_list = [dict(zip(keys, row)) for row in data_list]
result = [
[d['recall_date'], d['product_category'], d['product_title'], d['recall_title'], d['hazard'],
d['data_type'],
d['image_url'], d['country'], d['ext_url'], d['data_json'], d['brand']] for d in dict_list]
print(result)
for i in range(4):
try:
with self.mysql_db.begin() as conn:
conn.execute(
text("""
INSERT INTO recall_cases_data
(recall_date, product_category, product_title, recall_title, hazard, data_type, image_url, country, ext_url, data_json, brand)
VALUES (:recall_date, :product_category, :product_title, :recall_title, :hazard, :data_type, :image_url, :country, :ext_url, :data_json, :brand)
ON DUPLICATE KEY UPDATE
recall_date = VALUES(recall_date),
product_title = VALUES(product_title),
ext_url = VALUES(ext_url)
"""),
dict_list
)
f"insert into recall_cases_data (recall_date, product_category, product_title, recall_title, hazard, data_type, image_url, country, ext_url, data_json, brand) values (%s, %s, %s, %s,%s, %s, %s, %s,%s, %s, %s) ON DUPLICATE KEY UPDATE recall_date = values(recall_date),product_title = values(product_title),ext_url = values(ext_url)",
result)
break
except:
print('报错32222222',traceback.format_exc())
time.sleep(20)
if is_None == False:
break
else:
......@@ -419,7 +422,7 @@ class recall_cases():
print('产品类型:', product_category)
product_title = resp_html.xpath("//p[contains(text(),'Product: ')]/text()")
print('产品标题:', product_title)
hazard_list = resp_html.xpath("//p[contains(text(),'Hazard:')]/text()")
hazard_list = resp_html.xpath("//p[contains(text(),'Hazard:')]/text()|//h2[contains(text(),'Hazard')]/following-sibling::p[1]/text()")
print('风险:', hazard_list)
Brand_list = resp_html.xpath("//td[contains(text(),'Brand')]/following-sibling::td/text()")
brand = Brand_list[0].strip() if Brand_list else None
......@@ -430,16 +433,16 @@ class recall_cases():
image_url_list = image_url_list[0].strip() if image_url_list else None
data_list.append(
[recall_title, detail_url, recall_date, product_category, product_title,
hazard_list, image_url_list, 'uk_recall', 'uk',brand])
hazard_list, image_url_list, 'uk_recall', 'uk', brand])
if data_list:
try:
df = pd.DataFrame(data=data_list,
columns=['recall_title', 'ext_url', 'recall_date', 'product_category',
'product_title',
'hazard', 'image_url', 'data_type', 'country','brand'])
'hazard', 'image_url', 'data_type', 'country', 'brand'])
df.drop_duplicates(['recall_date', 'product_title', 'ext_url'], inplace=True)
df.to_sql('recall_cases_data', con=self.mysql_db, if_exists="append", index=False)
self.mysql_db.to_sql(df, 'recall_cases_data', if_exists="append")
except:
is_None = False
break
......@@ -522,7 +525,7 @@ class recall_cases():
'hazard', 'image_url', 'data_type', 'country'])
df.drop_duplicates(['recall_date', 'product_title', 'ext_url'], inplace=True)
df.to_sql('recall_cases_data', con=self.mysql_db, if_exists="append", index=False)
self.mysql_db.to_sql(df, 'recall_cases_data', if_exists="append")
except:
is_None = False
break
......@@ -571,7 +574,7 @@ class recall_cases():
print('page:', page)
df = pd.DataFrame(data=data_list,
columns=['data_json', 'page'])
df.to_sql('global_recalls_data', con=self.mysql_db, if_exists="append", index=False)
self.mysql_db.to_sql(df, 'global_recalls_data', if_exists="append")
break
except Exception as e:
wait_time = (i + 1) * 2
......@@ -585,11 +588,12 @@ class recall_cases():
def get_globalrecalls(self):
# sql = 'SELECT data_json FROM global_recalls_data'
# df_data = pd.read_sql(sql, con=self.mysql_db)
list_url = 'https://globalrecalls.oecd.org/ws/search.xqy?end=0&lang=en&order=desc&q=&sort=date&start=-20&uiLang=en' # 第一页url
list_url = 'https://globalrecalls.oecd.org/ws/search.xqy?end=20&lang=en&order=desc&q=&sort=date&start=0&uiLang=en'
# list_url = f'https://globalrecalls.oecd.org/ws/search.xqy?end={i}&lang=en&order=desc&q=&sort=date&start={i - 20}&uiLang=en'
print('请求url', list_url)
# 'https://globalrecalls.oecd.org/ws/search.xqy?end=200&lang=en&order=desc&q=&sort=date&start=180&uiLang=en'
# list_urls = [40, 60, 80, 100, 120, 140, 160, 180, 200, 220, 240, 260, 280, 300, 320, 340, 360, 380, 400, 420, 440, 460, 480, 500]
# for url_num in list_urls:
# list_url = f'https://globalrecalls.oecd.org/ws/search.xqy?end={url_num}&lang=en&order=desc&q=&sort=date&start={url_num-20}&uiLang=en'
print('请求url111', list_url)
headers = {
'Accept': '*/*',
'Accept-Encoding': 'gzip, deflate, br, zstd',
......@@ -631,7 +635,7 @@ class recall_cases():
imaurl = "https://globalrecalls.oecd.org/ws/getdocument.xqy?uri=" + encoded_url
url = f'https://ec.europa.eu/safety-gate-alerts/public/api/notification/{_id[0]}?language=en'
print('请求url:', url)
resp = requests.get(url, headers=headers, timeout=60)
resp = requests.get(url, headers=headers, timeout=60,verify=False, impersonate="chrome")
if 'ENTITY_NOT_FOUN' in resp.text:
continue
items_data = resp.json()
......@@ -654,24 +658,25 @@ class recall_cases():
items['image_url'] = imaurl
items['data_type'] = 'global_recalls'
items['product_title'] = re.findall(r'^(.*?)\s*;', title_name + ';')[0]
items['ext_url'] = extUrl
items['ext_url'] = extUrl[:255] if extUrl else None
items['brand'] = brand
data_json = json.dumps(items_data)
data_list.append([items['data_type'], items['product_title'], items['productCategory'],
items['reacll_time'], items['riskDescription'], items['country'],
items['image_url'],
items['recall_title'], items['ext_url'], data_json,items['brand']])
print('itemsitems::',items)
try:
df = pd.DataFrame(data=data_list,
columns=['data_type', 'product_title', 'product_category', 'recall_date',
'hazard',
'country', 'image_url', 'recall_title', 'ext_url', 'data_json','brand'])
df.to_sql('recall_cases_data', con=self.mysql_db, if_exists="append", index=False)
except:
print('数据重复=====')
continue
items['recall_title'], items['ext_url'], data_json, items['brand']])
print('itemsitems::', items)
with self.mysql_db.begin() as conn:
conn.execute(
'INSERT IGNORE INTO recall_cases_data '
'(data_type, product_title, product_category, recall_date, hazard, '
'country, image_url, recall_title, ext_url, data_json, brand) '
'VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)',
data_list
)
print('存储成功', len(data_list))
else:
print('没有解析到id')
items = {}
......@@ -679,57 +684,184 @@ class recall_cases():
items['country'] = countryId # 站点
encoded_url = urllib.parse.quote(url)
_url = 'https://globalrecalls.oecd.org/ws/getrecall.xqy?uiLang=en&uri=' + encoded_url
print('_url::',_url)
print('_url::', _url)
resp = requests.get(_url, headers=headers, timeout=60)
result = resp.json()
print("result::", result)
recall_detail = resp.json() # 避免覆盖外层循环变量 result
print("result::", recall_detail)
time.sleep(random.uniform(1, 3))
extUrl = result['recall']['extUrl'] # 详情url
imageUri = result['recall']['images'][0]['imageUri']
extUrl = recall_detail['recall']['extUrl'] # 详情url
images = recall_detail['recall'].get('images', [])
if images:
imageUri = images[0]['imageUri']
encode_imageUri = urllib.parse.quote(imageUri)
imaurl = f"https://globalrecalls.oecd.org/ws/getdocument.xqy?uri={encode_imageUri}" # 图片
date_time = result['recall']['date']
imaurl = f"https://globalrecalls.oecd.org/ws/getdocument.xqy?uri={encode_imageUri}"
else:
imaurl = None
date_time = recall_detail['recall']['date']
items['reacll_time'] = date_time
title_name = result['recall']['product.name']
recall_title = result['recall']['product.desc']
title_name = recall_detail['recall']['product.name']
recall_title = recall_detail['recall']['product.desc']
if recall_title is None:
recall_title = result['recall']['images'][0]['alt.text']
recall_title = images[0].get('alt.text') if images else None
if recall_title:
recall_title.replace('Image of ', '')
hazard = result['recall']['hazard']
recall_title = recall_title.replace('Image of ', '') # 修复:结果需赋值
hazard = recall_detail['recall']['hazard']
items['recall_title'] = recall_title
items['productCategory'] = result['recall']['product.type']
items['productCategory'] = recall_detail['recall']['product.type']
items['riskDescription'] = hazard
items['image_url'] = imaurl
items['data_type'] = 'global_recalls'
items['product_title'] = re.findall(r'^(.*?)\s*;', title_name + ';')[0]
items['ext_url'] = extUrl
data_json = json.dumps(result)
items['product_title'] = re.findall(r'^(.*?)\s*;', (title_name or '') + ';')[0] if title_name else None
items['ext_url'] = extUrl[:255] if extUrl else None
# product_title 为 None 时唯一索引(product_title,recall_date,ext_url)失效
# MySQL NULL≠NULL,需手动按 ext_url 查重
if items['product_title'] is None and items['ext_url']:
safe_url = items['ext_url'].replace("'", "''")
df_check = self.mysql_db.read_sql(
f"SELECT COUNT(*) as cnt FROM recall_cases_data WHERE ext_url = '{safe_url}'"
)
if df_check['cnt'].iloc[0] > 0:
print('已存在跳过(product_title为空):', items['ext_url'][:80])
continue
data_json = json.dumps(recall_detail)
data_list.append([items['data_type'], items['product_title'], items['productCategory'],
items['reacll_time'], items['riskDescription'], items['country'],
items['image_url'],
items['recall_title'], items['ext_url'], data_json])
print('没有解析到id的数据:', items)
try:
df = pd.DataFrame(data=data_list,
columns=['data_type', 'product_title', 'product_category', 'recall_date',
with self.mysql_db.begin() as conn:
conn.execute(
'INSERT IGNORE INTO recall_cases_data '
'(data_type, product_title, product_category, recall_date, hazard, '
'country, image_url, recall_title, ext_url, data_json) '
'VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)',
data_list
)
print('没有解析到id 存储成功', len(data_list))
def webgate_ec(self):
'欧盟食品和饲料快速预警系统'
headers = {
"Accept": "application/json, text/plain, */*",
"Accept-Encoding": "gzip, deflate, br, zstd",
"Accept-Language": "zh-CN,zh;q=0.9",
"Cache-Control": "No-Cache",
"Connection": "keep-alive",
"Content-Length": "378",
"Content-Type": "application/json",
"Host": "webgate.ec.europa.eu",
"Origin": "https://webgate.ec.europa.eu",
"Pragma": "no-cache",
"Referer": "https://webgate.ec.europa.eu/rasff-window/screen/search?searchQueries=eyJkYXRlIjp7InN0YXJ0UmFuZ2UiOiIiLCJlbmRSYW5nZSI6IiJ9LCJjb3VudHJpZXMiOnt9LCJ0eXBlIjp7fSwibm90aWZpY2F0aW9uU3RhdHVzIjp7fSwicHJvZHVjdCI6e30sInJpc2siOnt9LCJyZWZlcmVuY2UiOiIiLCJzdWJqZWN0IjoiRm9vZCBjb250YWN0IG1hdGVyaWFscyJ9",
"Sec-Ch-Ua": '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"',
"Sec-Ch-Ua-Mobile": "?0",
"Sec-Ch-Ua-Platform": '"Windows"',
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"X-Requested-With": "XMLHttpRequest",
}
data = {"parameters": {"pageNumber": 1, "itemsPerPage": 25}, "notificationReference": None,
"subject": "Food contact materials", "notifyingCountry": None, "originCountry": None,
"distributionCountry": None, "notificationType": None, "notificationStatus": None,
"notificationClassification": None, "notificationBasis": None, "productCategory": None,
"actionTaken": None,
"hazardCategory": None, "riskDecision": None}
url = 'https://webgate.ec.europa.eu/rasff-window/backend/public/notification/search/consolidated/'
response = requests.post(url, headers=headers, json=data, timeout=120)
print(response.text)
response_json = json.loads(response.text)
notifications_list = response_json.get("notifications", [])
if notifications_list:
num_list = []
for notif in notifications_list:
items_data = {}
productType = notif['productType']['description']
items_data['product_category'] = productType if productType else None
subject_title = notif['subject'] # 召回标题
items_data['recall_title'] = subject_title if subject_title else None
items_data['hazard'] = items_data['recall_title']
recall_date = notif['ecValidationDate'] # 召回日期
if recall_date:
dt = date_time.strptime(recall_date, "%d-%m-%Y %H:%M:%S")
# 只要年月日(字符串)
items_data['recall_date'] = dt.strftime("%Y-%m-%d")
else:
items_data['recall_date'] = None
country = notif['notifyingCountry']['isoCode'] # 站点
items_data['country'] = country if country else None
ext_url = 'https://webgate.ec.europa.eu/rasff-window/screen/notification/' + str(
notif['notifId']) # 页面展示链接。跳转
items_data['ext_url'] = ext_url
num_list = self.webgate_ec_product(notif['notifId'], items_data, num_list)
time.sleep(random.uniform(5, 10))
if len(num_list) > 3:
print('跳出循环。连续存储 3 条数据相同。默认没有最新数据')
break
def webgate_ec_product(self, notif_id, items_data, num_list):
headers1 = {
"Accept": "application/json, text/plain, */*",
"Accept-Encoding": "gzip, deflate, br, zstd",
"Accept-Language": "zh-CN,zh;q=0.9",
"Cache-Control": "No-Cache",
"Connection": "keep-alive",
"Host": "webgate.ec.europa.eu",
"Pragma": "no-cache",
"Referer": f"https://webgate.ec.europa.eu/rasff-window/screen/notification/{notif_id}",
"Sec-Ch-Ua": '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"',
"Sec-Ch-Ua-Mobile": "?0",
"Sec-Ch-Ua-Platform": '"Windows"',
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"X-Requested-With": "XMLHttpRequest",
}
url1 = f'https://webgate.ec.europa.eu/rasff-window/backend/public/notification/view/id/{notif_id}/'
print('详情url:', url1)
response = requests.get(url1, headers=headers1, timeout=120)
response_json = json.loads(response.text)
product = response_json.get("product", {})
title = product.get("description")
items_data['product_title'] = title
items_data['data_type'] = 'europa_ec'
print("欧盟食品和饲料快速预警系统: ", items_data)
items_data['data_json'] = response.text
columns = ['data_type', 'product_title', 'product_category', 'recall_date',
'hazard',
'country', 'image_url', 'recall_title', 'ext_url', 'data_json'])
df.to_sql('recall_cases_data', con=self.mysql_db, if_exists="append", index=False)
'country', 'recall_title', 'ext_url', 'data_json',
]
data_list = []
i_list = []
for i in columns:
i_list.append(items_data[i])
data_list.append(i_list)
df = pd.DataFrame(data=data_list, columns=columns)
try:
self.mysql_db.to_sql(df, 'recall_cases_data', if_exists="append")
print('存储成功', len(data_list))
except:
print('没有解析到id 存储 数据重复=====')
continue
print('存储 数据重复=====')
num_list.append(1)
return num_list
def run(self):
# self.global_recalls()
self.global_recalls()
self.get_globalrecalls()
self.us_recalls()
self.us_fda_gov()
self.ec_europa_eu()
self.ec_europa_uk()
self.gov_uk()
self.webgate_ec()
# """
# 数据类型,属于那个国的
# eu_recall
# global_recalls
......@@ -768,3 +900,4 @@ class recall_cases():
if __name__ == '__main__':
recall_cases = recall_cases()
recall_cases.run()
#
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment