Commit ab22cc42 by Peng

no message

parent d51420d8
......@@ -16,7 +16,7 @@ import datetime
from amazon_spider.VPS_IP import pppoe_ip
from curl_cffi import requests
from utils.secure_db_client import get_remote_engine
from threading_spider.post_to_dolphin import DolphinschedulerHelper
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
import json
import uuid
......@@ -24,7 +24,25 @@ import uuid
"""抓取亚马逊 bsr分类,和 bsr asin top100"""
class bsr_catgory(BaseUtils):
class bsr_category(BaseUtils):
_SITE_CONFIG = {
'us': ('https://www.amazon.com', 'www.amazon.com'),
'uk': ('https://www.amazon.co.uk', 'www.amazon.co.uk'),
'de': ('https://www.amazon.de', 'www.amazon.de'),
'fr': ('https://www.amazon.fr', 'www.amazon.fr'),
'es': ('https://www.amazon.es', 'www.amazon.es'),
'it': ('https://www.amazon.it', 'www.amazon.it'),
}
_BSR_TITLE_PREFIX = {
'us': 'Best Sellers in',
'uk': 'Best Sellers in',
'de': 'Bestseller in',
'fr': 'Les meilleures ventes en',
'es': 'Los más vendidos en',
'it': 'Bestseller in',
}
def __init__(self, site_name):
super().__init__()
self.site_name = site_name
......@@ -63,13 +81,13 @@ class bsr_catgory(BaseUtils):
print('切换IP...')
pppoe_ip()
self._last_ip_switch_time = time.time()
time.sleep(5)
time.sleep(5) # sleep 移到锁外,避免长时间阻塞其他线程
return True
def db_engine_us(self, site_name, db_type):
engine_mysql = get_remote_engine(
site_name=site_name, # -> database "selection"
db_type=db_type, # -> 服务端 alias "mysql"
site_name=site_name,
db_type=db_type,
)
return engine_mysql
......@@ -91,65 +109,98 @@ class bsr_catgory(BaseUtils):
engine_us_mysql = self.db_engine_us(site, 'mysql')
else:
engine_us_mysql = self.db_engine_us(self.site_name, 'mysql')
print(select_state1_sql)
df = engine_us_mysql.read_sql(select_state1_sql)
return df
except Exception as e:
import traceback
traceback.print_exc() # ★ 打印完整栈到终端
traceback.print_exc()
print(e, 'db_cursor_connect_msyql_read 报错:', select_state1_sql)
def init_db(self, site_name):
self.engine = self.mysql_connect()
self.engine_pg = self.pg_connect()
self.engine_pg6 = self.pg_connect_6()
if site_name == "us":
self.site_url = 'https://www.amazon.com'
self.host = 'www.amazon.com'
elif site_name == 'uk':
self.site_url = 'https://www.amazon.co.uk' # 站点url
self.host = 'www.amazon.co.uk'
elif site_name == 'de':
self.site_url = 'https://www.amazon.de'
self.host = 'www.amazon.de'
elif site_name == 'fr':
self.site_url = 'https://www.amazon.fr'
self.host = 'www.amazon.fr'
elif site_name == 'es':
self.site_url = 'https://www.amazon.es'
self.host = 'www.amazon.es'
elif site_name == 'it':
self.site_url = 'https://www.amazon.it'
self.host = 'www.amazon.it'
self.site_url, self.host = self._SITE_CONFIG[site_name]
self.reuests_para_val = Requests_param_val(site_name=self.site_name)
self.year_month = f'{self.year}_{self.month}'
sele_sql = f"SELECT `week` FROM week_20_to_30 WHERE `year_month`='{self.year}_{self.month}'"
print(sele_sql)
df_year_week = self.db_cursor_connect_msyql_read(site='us', select_state1_sql=sele_sql)
self.year_week = list(df_year_week['week'])[-1]
# self.year_week = 6
print(self.year_week, '====当前周===1232333')
def safeIndex(self, list: list, index: int, default: object = None):
"""
安全获取list的索引对应的值
:param list: 列表
:param index: 索引
:param default: 默认值
:return:
"""
if (index <= len(list) - 1):
"""安全获取list的索引对应的值"""
if index <= len(list) - 1:
return list[index]
return default
def _build_headers(self, url):
"""构建随机请求头,防反爬"""
n = random.randint(70, 114)
ua = (f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 '
f'(KHTML, like Gecko) Chrome/{n}.0.{random.randint(1000, 5000)}'
f'.{random.randint(1, 181)} Safari/537.36')
headers = {
'connection': 'close',
'authority': self.host,
'accept': 'text/html,*/*',
'accept-language': 'zh-CN,zh;q=0.9',
'cache-control': 'no-cache',
'content-type': 'application/x-www-form-urlencoded;charset=UTF-8',
'origin': url,
'referer': url,
'sec-ch-ua-mobile': '?0',
'user-agent': ua,
}
alphabet = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n']
k = ""
for i in (0, random.randint(0, 5)):
k += random.choice(alphabet)
headers[k] = str(uuid.uuid4())
return headers
def _is_captcha(self, text):
"""检测页面是否出现验证码"""
return any(kw in text for kw in [
"Enter the characters you see below",
"Geben Sie die Zeichen unten ein",
"Introduce los caracteres que se muestran",
"Saisissez les caractères que vous voyez",
"Inserisci i caratteri visualizzati nello spazio",
])
def _parse_h1_name(self, response):
"""从页面 h1 解析 BSR 分类名称"""
prefix = self._BSR_TITLE_PREFIX.get(self.site_name, '')
if not prefix:
return None
h1_list = response.xpath(f"//h1[contains(text(),'{prefix}')]/text()")
if h1_list:
return h1_list[0].replace(prefix, '').strip()
return None
def _get_sibling_first_id(self, response, bsr_class_name, bum, bsr_url_category_id_dict):
"""获取同级节点的 first_id 映射,用于 redirect_first_id 更新"""
if (bum - 1) < 2:
return []
xpath_herf_list = [
f'//div[@role="treeitem"]/span[contains(text(),"{bsr_class_name}")]/parent::div/parent::div//a/@href',
f'//div[@role="treeitem"]/span[contains(text(),"{bsr_class_name}")]/parent::div/parent::div/parent::div//a/@href',
f'//div[@role="treeitem"]/span[contains(text(),"{bsr_class_name[0:5]}")]/parent::div/parent::div/parent::div//a/@href',
]
for xpath_herf in xpath_herf_list:
a_herf_list = response.xpath(xpath_herf)
if a_herf_list:
a_url_dict = self.parse_url(bum, self.site_url + a_herf_list[0])
return [[bsr_url_category_id_dict['category_first_id'], a_url_dict['category_first_id']]]
return []
def parse_url(self, nodes_num: int, url: str):
"""
统一解析链接获取 bsr 分类id等数据
:param nodes_num:
:param url:
:return:
"""
"""统一解析链接获取 bsr 分类id等数据"""
arr = url.split("/")
if not "ref=" in url:
if "ref=" not in url:
ref_suffix = None
category_id = self.safeIndex(arr, len(arr) - 1, None)
category_first_id = self.safeIndex(arr, len(arr) - 2, None)
......@@ -204,44 +255,22 @@ class bsr_catgory(BaseUtils):
def html_4(self, bum):
print('bum 当前请求的层级:', bum)
while True:
if self.name_path_queue.empty() == False:
if not self.name_path_queue.empty():
querys = self.name_path_queue.get()
print('请求=====:', querys)
bsr_url = querys[2]
bsr_class_name = querys[1]
bsr_id = querys[0]
category_id = querys[3]
cookie_str = self.get_cookeis()
n = random.randint(70, 114)
ua = f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{n}.0.{random.randint(1000, 5000)}.{random.randint(1, 181)} Safari/537.36'
headers = {
'connection': 'close',
'authority': self.host,
'accept': 'text/html,*/*',
'accept-language': 'zh-CN,zh;q=0.9',
'cache-control': 'no-cache',
'content-type': 'application/x-www-form-urlencoded;charset=UTF-8',
'origin': f'{bsr_url}',
'referer': f'{bsr_url}',
'sec-ch-ua-mobile': '?0',
'user-agent': ua
}
alphabet = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n']
k = ""
for i in (0, random.randint(0, 5)):
k += random.choice(alphabet)
headers[k] = str(uuid.uuid4())
cookie_str = self.get_cookies()
headers = self._build_headers(bsr_url)
headers["cookie"] = cookie_str
num = 0
while True:
try:
resp = requests.get(bsr_url, headers=headers,
timeout=20, impersonate="chrome")
if ("Enter the characters you see below" in resp.text) or (
"Geben Sie die Zeichen unten ein" in resp.text) or (
"Introduce los caracteres que se muestran" in resp.text) or (
"Saisissez les caractères que vous voyez" in resp.text) or (
"Inserisci i caratteri visualizzati nello spazio" in resp.text):
if self._is_captcha(resp.text):
print('请求出现验证码:')
num += 1
if num > 20:
......@@ -263,67 +292,24 @@ class bsr_catgory(BaseUtils):
f'//div[@role="treeitem"]/span[contains(text(),"{bsr_class_name}")]/parent::div/following-sibling::div[@role="group"]/div[@role="treeitem"]/a')
print(
f'//div[@role="treeitem"]/span[contains(text(),"{bsr_class_name}")]/parent::div/following-sibling::div[@role="group"]/div[@role="treeitem"]/a')
if self.site_name == 'us' or self.site_name == 'uk':
h1_bsr_name_lsit = response.xpath("//h1[contains(text(),'Best Sellers in')]/text()")
if h1_bsr_name_lsit:
h1_bsr_name = h1_bsr_name_lsit[0].replace("Best Sellers in", '').strip()
else:
h1_bsr_name = None
elif self.site_name == 'de':
h1_bsr_name_lsit = response.xpath("//h1[contains(text(),'Bestseller in')]/text()")
if h1_bsr_name_lsit:
h1_bsr_name = h1_bsr_name_lsit[0].replace("Bestseller in", '').strip()
else:
h1_bsr_name = None
elif self.site_name == 'fr':
h1_bsr_name_lsit = response.xpath("//h1[contains(text(),'Les meilleures ventes en')]/text()")
if h1_bsr_name_lsit:
h1_bsr_name = h1_bsr_name_lsit[0].replace("Les meilleures ventes en", '').strip()
else:
h1_bsr_name = None
elif self.site_name == 'es':
h1_bsr_name_lsit = response.xpath("//h1[contains(text(),'Los más vendidos en')]/text()")
if h1_bsr_name_lsit:
h1_bsr_name = h1_bsr_name_lsit[0].replace("Los más vendidos en", '').strip()
else:
h1_bsr_name = None
elif self.site_name == 'it':
h1_bsr_name_lsit = response.xpath("//h1[contains(text(),'Bestseller in')]/text()")
if h1_bsr_name_lsit:
h1_bsr_name = h1_bsr_name_lsit[0].replace("Bestseller in", '').strip()
else:
h1_bsr_name = None
h1_bsr_name = self._parse_h1_name(response)
# self.id_and_en_name_list 1 代表还有 子分类 2代表没有子分类,为最终节点
bsr_url_category_id_dict = self.parse_url(bum - 1, bsr_url)
print('bsr_url_category_id_dict', bsr_url_category_id_dict)
redirect_first_id_list = []
print('获取下一级数量:', len(a_2_list))
if a_2_list:
self.id_and_en_name_list.append((bsr_id, h1_bsr_name, 1, bsr_url_category_id_dict['category_id'],
bsr_url_category_id_dict['category_parent_id']))
print(self.id_and_en_name_list, '23434343434343434343434')
# 获取同级节点url
if (bum - 1) >= 2:
xpath_herf_list = [
f'//div[@role="treeitem"]/span[contains(text(),"{bsr_class_name}")]/parent::div/parent::div//a/@href',
f'//div[@role="treeitem"]/span[contains(text(),"{bsr_class_name}")]/parent::div/parent::div/parent::div//a/@href',
f'//div[@role="treeitem"]/span[contains(text(),"{bsr_class_name[0:5]}")]/parent::div/parent::div/parent::div//a/@href']
for xpath_herf in xpath_herf_list:
a_herf_list = response.xpath(xpath_herf)
if a_herf_list:
a_url_dict = self.parse_url(bum, self.site_url + a_herf_list[0])
print('a_url_dict22222:::', a_url_dict)
redirect_first_id_list.append(
[bsr_url_category_id_dict['category_first_id'],
a_url_dict['category_first_id']])
break
redirect_first_id_list = self._get_sibling_first_id(
response, bsr_class_name, bum, bsr_url_category_id_dict)
name_2_href_list = []
for a_2 in a_2_list:
name_2 = a_2.xpath('./text()')[0].strip()
name_2_href = a_2.xpath('./@href')[0].strip()
if 'com/-/zh/-' in name_2_href:
int_time = int(time.time() * 1000)
with open(f'{int_time}_{name_2}.html', 'w', encoding='utf-8')as f:
with open(f'{int_time}_{name_2}.html', 'w', encoding='utf-8') as f:
f.write(resp.text)
href = self.site_url + name_2_href
id_pid_data_dict = self.parse_url(bum, href)
......@@ -333,34 +319,19 @@ class bsr_catgory(BaseUtils):
# 父级类目自增id 类目名称 二级类目, 链接
self.insert_list.append((bsr_id, name_2, bum, href, id_pid_data_dict['category_id'],
id_pid_data_dict['category_parent_id']))
# 存储当前分类id,和页面获取的下一级url
# 存储当前分类id,和页面获取的下一级url
name_2_href_list.append(href)
# 存储path 和 名称
self.path_en_name_list.append([href, name_2])
self.catgory_next_path_list.append(
[bsr_url_category_id_dict['category_id'], '|-|'.join(name_2_href_list)])
else:
# 获取同级节点url
if (bum - 1) >= 2:
xpath_herf_list = [
f'//div[@role="treeitem"]/span[contains(text(),"{bsr_class_name}")]/parent::div/parent::div//a/@href',
f'//div[@role="treeitem"]/span[contains(text(),"{bsr_class_name}")]/parent::div/parent::div/parent::div//a/@href',
f'//div[@role="treeitem"]/span[contains(text(),"{bsr_class_name[0:5]}")]/parent::div/parent::div/parent::div//a/@href']
print('获取同类节点数量,', len(xpath_herf_list))
for xpath_herf in xpath_herf_list:
a_herf_list = response.xpath(xpath_herf)
if a_herf_list:
print('获取同类节点数量', self.site_url + a_herf_list[0])
a_url_dict = self.parse_url(bum, self.site_url + a_herf_list[0])
print('获取同类节点数量 a_url_dict1111:::', a_url_dict)
redirect_first_id_list.append(
[bsr_url_category_id_dict['category_first_id'],
a_url_dict['category_first_id']])
break
redirect_first_id_list = self._get_sibling_first_id(
response, bsr_class_name, bum, bsr_url_category_id_dict)
self.id_and_en_name_list.append((bsr_id, h1_bsr_name, 2, bsr_url_category_id_dict['category_id'],
bsr_url_category_id_dict['category_parent_id']))
if redirect_first_id_list:
# 获取当前请求url 分类id对 first_id 拼接
# 获取当前请求url 分类id对 first_id 拼接
self.redirect_first_id_list.append([bsr_url, '|-|'.join(redirect_first_id_list[0])])
print(self.year_week, '====当前周===', self.week)
ele_next = response.xpath(
......@@ -368,7 +339,7 @@ class bsr_catgory(BaseUtils):
data_client_recs_list = response.xpath(
"//div[@class='p13n-desktop-grid']/@data-client-recs-list")
if data_client_recs_list:
data_list = eval(data_client_recs_list[0])
data_list = json.loads(data_client_recs_list[0])
print('第一页获取asin:', len(data_list))
for data in data_list:
asin = data['id']
......@@ -380,7 +351,8 @@ class bsr_catgory(BaseUtils):
self.get_bsr_asin_data(ele_next, bsr_id, bsr_url_category_id_dict['category_id'], asin_dict_={})
else:
break
def get_cookeis(self):
def get_cookies(self):
if self.cookies_queue.empty():
cookies_dict = self.reuests_para_val.get_cookie()
self.cookie_dict_delete_id = cookies_dict
......@@ -411,48 +383,27 @@ class bsr_catgory(BaseUtils):
return cookie_str
else:
return ''
def get_bsr_asin_data(self, ele_next, c_id, bsr_url_category_id, asin_dict_):
next_url = self.site_url + ele_next[0]
num = 0
while True:
try:
cookie_str = self.get_cookeis()
cookie_str = self.get_cookies()
if not cookie_str:
cookie_str = self.get_cookeis()
n = random.randint(70, 114)
ua = f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{n}.0.{random.randint(1000, 5000)}.{random.randint(1, 181)} Safari/537.36'
headers = {
'connection': 'close',
'accept': 'text/html,*/*',
'accept-language': 'zh-CN,zh;q=0.9',
'cache-control': 'no-cache',
'content-type': 'application/x-www-form-urlencoded;charset=UTF-8',
'origin': f'{next_url}',
'referer': f'{next_url}',
'sec-ch-ua-mobile': '?0',
'user-agent': ua
}
cookie_str = self.get_cookies()
headers = self._build_headers(next_url)
headers['Sec-Fetch-Site'] = 'same-origin'
headers['Sec-Fetch-Mode'] = 'cors'
headers['Sec-Fetch-Dest'] = 'empty'
headers['Accept'] = '*/*'
headers['X-Requested-With'] = 'XMLHttpRequest'
alphabet = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n']
k = ""
for i in (0, random.randint(0, 5)):
k += random.choice(alphabet)
headers[k] = str(uuid.uuid4())
headers["cookie"] = cookie_str
print('请求第二页::', next_url)
resp = requests.get(next_url, headers=headers, timeout=15, impersonate="chrome")
if ("Enter the characters you see below" in resp.text) or (
"Geben Sie die Zeichen unten ein" in resp.text) or (
"Introduce los caracteres que se muestran" in resp.text) or (
"Saisissez les caractères que vous voyez" in resp.text) or (
"Inserisci i caratteri visualizzati nello spazio" in resp.text):
if self._is_captcha(resp.text):
print("出现验证码")
num += 1
if num > 20:
......@@ -481,7 +432,7 @@ class bsr_catgory(BaseUtils):
data_client_recs_list = response.xpath("//div[@class='p13n-desktop-grid']/@data-client-recs-list")
if data_client_recs_list:
data_list = eval(data_client_recs_list[0])
data_list = json.loads(data_client_recs_list[0])
print('第二页获取asin:', len(data_list))
for data in data_list:
asin = data['id']
......@@ -494,17 +445,18 @@ class bsr_catgory(BaseUtils):
def save_asin_data(self):
while True:
try:
self.engine = self.mysql_connect()
self.engine_pg = self.pg_connect()
print('存储====::', len(self.item_list))
df = pd.DataFrame(data=self.item_list, columns=self.columns)
df['date_info'] = self.time_strftime_
df.drop_duplicates(['asin', 'bsr_rank', 'cate_current_id'], inplace=True) # 去重
print(df.shape)
self.engine.to_sql(df, f'{self.site_name}_bs_category_top100_asin', if_exists="append")
self.engine_pg.to_sql(df, f'{self.site_name}_bs_top100_asin_2026', if_exists="append")
self.item_list = []
except Exception as e:
print(e, '报错存储')
print(e, '报错存储 pg')
time.sleep(10)
self.engine_pg = self.pg_connect()
continue
break
......@@ -562,7 +514,6 @@ class bsr_catgory(BaseUtils):
sql2 = f"update {self.site_name}_bs_category set delete_time = '{_strftime_}' where id in {tuple(group2_id)}"
print('sql2::', sql2)
self.db_cursor_connect_update(sql2, self.site_name)
except Exception as e:
print('更新 delete_time 报错', traceback.format_exc())
try:
......@@ -584,8 +535,6 @@ class bsr_catgory(BaseUtils):
update_first_id_sql = f'''update {self.site_name}_bs_category set redirect_first_id="{redirect_first_id_name_id_list[1]}" where `path`="{category_bsr_url}"'''
print('update_first_id_sql::', update_first_id_sql)
self.db_cursor_connect_update(update_first_id_sql, self.site_name)
except Exception as e:
print('更新 redirect_first_id_name_id_list 报错', traceback.format_exc())
self.init_db(self.site_name)
......@@ -738,7 +687,6 @@ class bsr_catgory(BaseUtils):
for t2 in html_thread:
t2.join()
print(f"当前 {num} 级分类 全部请求完毕", "获取二级类目:", len(self.insert_list))
elif num > 15:
break
num += 1
......@@ -749,23 +697,23 @@ class bsr_catgory(BaseUtils):
# 查询 子节点的顶级父类id
select_sql_1 = f'select id from {self.site_name}_bs_category where nodes_num=2'
df_id = self.db_cursor_connect_msyql_read(site=None, select_state1_sql=select_sql_1)
df_id_lsit = df_id.values.tolist()
for id in df_id_lsit:
df_id_list = df_id.values.tolist()
for id_ in df_id_list:
en_name_id_list = []
select_p_id = f"select t3.id,t4.en_name from (select t1.id,t1.parent_id,if(find_in_set(parent_id, @pids) > 0, @pids := concat(@pids, ',',id), 0) as ischild from (select id,p_id as parent_id from {self.site_name}_bs_category t order by p_id,id) t1,(select @pids := {id[0]}) t2) t3 LEFT JOIN {self.site_name}_bs_category t4 on t3.id = t4.id where ischild != 0;"
select_p_id = f"select t3.id,t4.en_name from (select t1.id,t1.parent_id,if(find_in_set(parent_id, @pids) > 0, @pids := concat(@pids, ',',id), 0) as ischild from (select id,p_id as parent_id from {self.site_name}_bs_category t order by p_id,id) t1,(select @pids := {id_[0]}) t2) t3 LEFT JOIN {self.site_name}_bs_category t4 on t3.id = t4.id where ischild != 0;"
print('select_p_id::', select_p_id)
df_all_id = self.db_cursor_connect_msyql_read(site=None, select_state1_sql=select_p_id)
if not df_all_id.empty:
all_id_lsit = df_all_id.values.tolist()
for en_name_id in all_id_lsit:
all_id_list = df_all_id.values.tolist()
for en_name_id in all_id_list:
en_name_id_list.append(en_name_id[0])
id_tuple = tuple(en_name_id_list)
print(len(id_tuple))
try:
if len(id_tuple) == 1:
update_sql = f"""UPDATE {self.site_name}_bs_category set one_category_id={id[0]} where id in ('{id_tuple[0]}')"""
update_sql = f"""UPDATE {self.site_name}_bs_category set one_category_id={id_[0]} where id in ('{id_tuple[0]}')"""
else:
update_sql = f'update {self.site_name}_bs_category set one_category_id={id[0]} where id in {id_tuple}'
update_sql = f'update {self.site_name}_bs_category set one_category_id={id_[0]} where id in {id_tuple}'
self.db_cursor_connect_update(update_sql, self.site_name)
except Exception as e:
print(e)
......@@ -830,8 +778,8 @@ class bsr_catgory(BaseUtils):
df_id_tuple = self.db_cursor_connect_msyql_read(site=None, select_state1_sql=id_sql)
id_tuple = df_id_tuple.values.tolist()
id_list = []
for id in id_tuple:
id_list.append(id[0])
for id_ in id_tuple:
id_list.append(id_[0])
if id_list:
try:
self.init_db(self.site_name)
......@@ -844,38 +792,31 @@ class bsr_catgory(BaseUtils):
self.init_db(self.site_name)
time.sleep(10)
def send_ms(self):
def send_big_data_selection(self):
for i in range(3):
try:
print('发送消息通知:')
url = 'http://selection.yswg.com.cn:8080/soundasia_selection/workflow/emit'
data = {"dateType": "day", "reportDate": self.time_strftime_, "statusVal": 3,
"siteName": self.site_name,
"remark": "bsr榜单爬取完毕 ", "isEnd": "是", "tableName": f"{self.site_name}_bs_category_top100_asin",
"status": "bsr榜单爬取完毕"}
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json'
DolphinschedulerHelper.start_process_instance_common(
project_name="big_data_selection",
process_df_name='日-bsr+nsr+asin导出',
startParams={
"site_name": self.site_name,
"date_type": "day",
"date_info": self.time_strftime_
}
t = requests.post(url, headers=headers, json=data)
print(t.text)
)
break
except:
self.init_db(self.site_name)
time.sleep(10)
time.sleep(30)
continue
def updata_category_state(self):
for i in range(3):
try:
days = ((datetime.datetime.now()) + datetime.timedelta(days=-8)).strftime("%Y-%m-%d")
_day = ((datetime.datetime.now()) + datetime.timedelta()).strftime("%Y-%m-%d")
update_sql = f'UPDATE {self.site_name}_bs_category set category_state=1;'
self.db_cursor_connect_update(update_sql, self.site_name)
delect_sql = f"DELETE FROM {self.site_name}_bs_category_top100_asin WHERE date_info < '{days}';"
print(delect_sql)
self.db_cursor_connect_update(delect_sql, self.site_name)
self.engine_pg = self.pg_connect()
inset_sql = f"""INSERT INTO {self.site_name}_bs_category_reprot (
id, p_id, ch_name, en_name, nodes_num, `path`, created_at, updated_at, is_show,
......@@ -891,10 +832,10 @@ class bsr_catgory(BaseUtils):
print(inset_sql)
self.db_cursor_connect_update(inset_sql, self.site_name)
_0_days = ((datetime.datetime.now()) + datetime.timedelta(days=0)).strftime("%Y-%m-%d")
select_sql = f"select count(id) FROM {self.site_name}_bs_category_top100_asin WHERE date_info = '{_0_days}';"
select_sql = f"select count(id) as cnt FROM {self.site_name}_bs_top100_asin_2026 WHERE date_info = '{_0_days}';"
print(select_sql)
df_count_data_num = self.db_cursor_connect_msyql_read(site=None, select_state1_sql=select_sql)
count_data_num = df_count_data_num['count(id)'][0]
df_count_data_num = self.engine_pg.read_sql(select_sql)
count_data_num = df_count_data_num['cnt'][0]
print('count_data_num::', count_data_num)
self.send_ms_count_data_num(self.site_name, count_data_num, _0_days)
break
......@@ -904,25 +845,6 @@ class bsr_catgory(BaseUtils):
time.sleep(20)
continue
# def sele_msyql_category(self, site):
# engine_mysql = self.db_engine_us(site, 'mysql')
# sql = f'select path, nodes_num,id from {site}_bs_category where nodes_num>1'
# df = engine_mysql.read_sql(sql)
# values_list = df.values.tolist()
# with engine_mysql.begin() as conn_6:
# for value in values_list:
# print(value)
# items = self.parse_url(value[1], value[0])
# items['id'] = value[2]
# print(items)
# try:
# # {'category_id': '1722264031', 'category_first_id': 'baby', 'category_parent_id': '60244031', 'id': 67478}
# ai_sql1 = f"update {site}_bs_category set category_id = '{items['category_id']}',category_parent_id='{items['category_parent_id']}',category_first_id='{items['category_first_id']}' where id={items['id']}"
# print(ai_sql1)
# conn_6.execute(ai_sql1)
# except:
# self.try_sql(ai_sql1)
def sele_msyql_category(self, site):
engine_mysql = self.db_engine_us(site, 'mysql')
sql = f"select path, nodes_num, id from {site}_bs_category where nodes_num>1"
......@@ -939,7 +861,6 @@ class bsr_catgory(BaseUtils):
except Exception:
self.try_sql(site, ai_sql1)
except Exception as e:
# 这里就能抓到 __exit__ 抛的 RuntimeError 了
print("事务退出时报错:", repr(e))
def try_sql(self, site, ai_sql):
......@@ -986,17 +907,17 @@ class bsr_catgory(BaseUtils):
if __name__ == '__main__':
pppoe_ip()
time.sleep(5)
# pppoe_ip()
# time.sleep(5)
print("如果运行 run 函数 有个别类目 别名没有抓取到,结束之后 运行 run_start 抓取,获取所有 and_en_name 为空的类目id,path")
for site in ['us', 'de', 'uk']:
spider_us = bsr_catgory(site_name=site)
spider_us = bsr_category(site_name=site)
spider_us.run()
spider_us.update_category_state()
spider_us.run_update_redirect_flag()
spider_us.updata_category_first_id()
spider_us.send_ms()
spider_us.send_big_data_selection()
for site in ['us', 'de', 'uk']:
spider_us = bsr_catgory(site_name=site)
spider_us = bsr_category(site_name=site)
spider_us.updata_category_state()
spider_us.sele_msyql_category(site)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment