Commit 6360879f by Peng

no message

parent 3a4d4a7e
......@@ -17,7 +17,7 @@ print('存储 asin 到pg数据库')
class Save_asin_detail(BaseUtils):
def __init__(self, site_name=None, proxy_name=None, week=None, month=None, spider_int=None):
def __init__(self, site_name=None, proxy_name=None, week=None, month=None, spider_int=None, skip_read_init=False):
super().__init__()
self.site_name = site_name # 站点
self.month = month
......@@ -29,6 +29,12 @@ class Save_asin_detail(BaseUtils):
self.init_db_names()
self.cols = self.reuests_para_val.db_column(site_name)
self.redis_client = self.redis_db()
# 子进程不需要 minid_maxid(由主进程读数据),跳过避免白拿区间
if not skip_read_init:
self.minid_maxid_list = self.reuests_para_val.get_minid_maxid(
site_name=self.site_name, state=1, minid_maxid=None, month=self.month)
else:
self.minid_maxid_list = []
def init_db_names(self):
self.engine = self.mysql_connect()
......@@ -36,11 +42,10 @@ class Save_asin_detail(BaseUtils):
self.kafuka_producer = self.kafuka_connect() # 卡夫卡连接
self.kafuka_producer_str = self.kafuka_connect(acks=True, connections_max_idle_ms=300000) # 卡夫卡连接
self.redis_db14 = self.redis_db() # redis 链接
self.db_syn = self.site_name + '_all_syn_st_month_2025'
self.db_syn = self.site_name + '_all_syn_st_month_2026'
self.db_seller_account_syn = self.site_name + DB_REQUESTS_ASIN_PARAMS['db_seller_account_syn'][2:] + '_distinct'
self.db_seller_asin_account = self.site_name + DB_REQUESTS_ASIN_PARAMS['db_seller_asin_account'][2:]
self.minid_maxid_list = self.reuests_para_val.get_minid_maxid(site_name=self.site_name, state=1,
minid_maxid=None, month=self.month)
# get_minid_maxid 移到 __init__ 由 skip_read_init 控制
@func_set_timeout(240)
def process_item(self, item_queue, requests_error_asin_list, asin_list_update, asin_not_found_list,
......@@ -80,7 +85,7 @@ class Save_asin_detail(BaseUtils):
else:
self.engine = self.mysql_connect()
self.engine_pg = self.pg_connect()
sql_read = f"SELECT asin, id, date_info, asin_is_variation,data_type,volume,weight_str FROM {self.db_syn}_{self.month} WHERE STATE = 1 ORDER BY id FOR UPDATE SKIP LOCKED LIMIT {self.read_size}"
sql_read = f"SELECT asin, id, date_info, asin_is_variation,data_type,volume,weight_str FROM {self.db_syn}_{self.month} WHERE STATE = 1 FOR UPDATE LIMIT 100"
print(sql_read)
self.df_read = self.engine_pg.read_then_update(
select_sql=sql_read,
......@@ -93,8 +98,8 @@ class Save_asin_detail(BaseUtils):
self.index_tuple = tuple(self.df_read['id'])
print(self.index_tuple, 'self.index_tuplself.index_tuplself.index_tupl')
# 使用默认值填充空值
self.df_read['volume'].fillna('null', inplace=True)
self.df_read['weight_str'].fillna('null', inplace=True)
self.df_read['volume'] = self.df_read['volume'].fillna('null')
self.df_read['weight_str'] = self.df_read['weight_str'].fillna('null')
asin_list = list(
self.df_read.asin + '|' + self.df_read.date_info + '|' + self.df_read.asin_is_variation.astype(
......@@ -135,8 +140,8 @@ class Save_asin_detail(BaseUtils):
self.df_read.drop_duplicates(['asin'], inplace=True)
if self.df_read.shape[0] > 0:
# 使用默认值填充空值
self.df_read['volume'].fillna('null', inplace=True)
self.df_read['weight_str'].fillna('null', inplace=True)
self.df_read['volume'] = self.df_read['volume'].fillna('null')
self.df_read['weight_str'] = self.df_read['weight_str'].fillna('null')
self.index_tuple = tuple(self.df_read['id'])
asin_list = list(
self.df_read.asin + '|' + self.df_read.date_info + '|' + self.df_read.asin_is_variation.astype(
......@@ -166,7 +171,6 @@ class Save_asin_detail(BaseUtils):
self.engine_pg = self.pg_connect()
time.sleep(random.uniform(10, 20.5))
print("读取数据出bug并等待5s继续", e, f"\n{traceback.format_exc()}")
time.sleep(15)
continue
def split_list(self, lst, chunk_size):
......@@ -207,6 +211,7 @@ class Save_asin_detail(BaseUtils):
print(f"===============存储pg詳情數據数据=={self.site_name}_asin_detail_month_{report_info}=====")
df = df_asin_detail.loc[df_asin_detail.asin.isin(asin_week_dict[week])]
df = df.loc[:, self.cols]
df = df.where(pd.notnull(df), None) # 统一把 NaN 转为 None,防止存入 'nan' 字符串
df.ac_name = df.ac_name.apply(lambda x: str(x)[:100] if x is not None else None) # 截取字符
df.brand = df.brand.apply(lambda x: str(x)[:100] if x is not None else None) # 截取字符
df.title = df.title.apply(lambda x: str(x)[:400] if x is not None else None) # 截取字符
......@@ -220,7 +225,7 @@ class Save_asin_detail(BaseUtils):
df.weight_str = df.weight_str.apply(lambda x: str(x)[:250] if x is not None else None) # 截取字符
print(f'存储pg:{self.site_name}_asin_detail_month_{report_info}')
# df.to_csv(r'2025-7-30_srs_search_term_asin.csv', index=False)
# df.to_csv(r'2026-7-30_srs_search_term_asin.csv', index=False)
self.engine_pg.to_sql(df, f"{self.site_name}_asin_detail_month_{report_info}",
if_exists='append')
break
......@@ -234,10 +239,7 @@ class Save_asin_detail(BaseUtils):
continue
if requests_error_asin_list:
if self.site_name == 'us':
self.db_change_state(state=1, asin_list=requests_error_asin_list)
else:
self.db_change_state(state=1, asin_list=requests_error_asin_list)
self.db_change_state(state=1, asin_list=requests_error_asin_list)
if self.asin_list_update:
self.db_change_state(state=3, asin_list=self.asin_list_update)
self.asin_list_update = []
......@@ -310,7 +312,7 @@ class Save_asin_detail(BaseUtils):
print(len(df_seller_id_list))
with self.engine.begin() as conn:
conn.execute(
f"insert into {self.db_seller_account_syn} (seller_id, account_name, url) values (%s, %s, %s) ON DUPLICATE KEY UPDATE seller_id = values(seller_id)",
f"insert into {self.db_seller_account_syn} (seller_id, account_name, url) values (%s, %s, %s) ON DUPLICATE KEY UPDATE seller_id = values(seller_id), account_name = values(account_name)",
df_seller_id_list)
buyBox_list = []
break
......
......@@ -265,6 +265,45 @@ class search_temp_pg(BaseUtils):
# 清空变量,
self.init_list()
def run_pol_with_data(self, search_term_list):
"""接收主进程分发的数据列表,不自己读数据库"""
if not search_term_list:
print("没有分配到数据,进程退出")
return
self.date_info = f'2026-{self.month}'
# 构造 df_read 用于后续 db_change_state
rows = []
for item in search_term_list:
parts = item.split('|-|')
rows.append({'id': int(parts[0]), 'search_term': parts[1], 'url': parts[2]})
self.df_read = pd.DataFrame(rows)
if self.cookies_queue.empty():
cookies_dict = self.reuests_para_val.get_cookie()
self.cookie_dict_delete_id = cookies_dict
for ck in cookies_dict.values():
self.cookies_queue.put(ck)
for search_url in search_term_list:
self.search_term_queue.put(search_url)
html_thread = []
for i in range(10):
thread2 = threading.Thread(target=self.get_search_kw, args=(i,))
html_thread.append(thread2)
for ti in html_thread:
ti.start()
for t2 in html_thread:
t2.join()
print('最后刷新kafka flush')
self.kafuka_producer.flush()
print('当前线程抓取结束')
print("存储数据")
self.db_save_data()
self.db_change_state()
print("删除cookie:", len(self.delete_cookies_list))
self.reuests_para_val.delete_china_cookie(list(set(self.delete_cookies_list)))
self.init_list()
print("run_pol_with_data 正常结束")
def parse_html_page(self, response=None, keywords=None, scraper_url=None, page=None, keywords_id=None, etree_html=None):
parse_search_term = ParseSearchTermUs(page_source=response, driver=None, search_term=keywords,
page=page, site_name=self.site_name, etree_html=etree_html)
......@@ -370,8 +409,8 @@ class search_temp_pg(BaseUtils):
columns=['search_term', 'asin', 'page', 'buy_data', 'label', 'asin_brand',
'time_batch'])
df = df.loc[:, ['search_term', 'asin', 'page', 'buy_data', 'label','asin_brand']]
df.label = df.label.apply(lambda x: str(x)[:200] if x is not None else None) # 截取字符
df.buy_data = df.buy_data.apply(lambda x: str(x)[:200] if x is not None else None) # 截取字符
df.label = df.label.apply(lambda x: str(x)[:200] if pd.notna(x) else None) # 截取字符
df.buy_data = df.buy_data.apply(lambda x: str(x)[:200] if pd.notna(x) else None) # 截取字符
else:
df = pd.DataFrame(data=data_list, columns=self.columns)
df['date_info'] = self.date_info
......@@ -394,6 +433,7 @@ class search_temp_pg(BaseUtils):
if df.shape[0] > 0:
print("db_name:", db_name)
df['asin'] = df['asin'].str.replace('/', '')
df = df.where(pd.notnull(df), None)
self.engine_pg.to_sql(df, db_name, if_exists="append")
break
except Exception as e:
......@@ -417,6 +457,7 @@ class search_temp_pg(BaseUtils):
print(f'存储表::{self.db_brand_analytics}_{year_moth_list[0]}')
print(len(self.sort_all_list))
df_being_sold.drop_duplicates(['search_term', 'quantity_being_sold'], inplace=True) # 去重
df_being_sold = df_being_sold.where(pd.notnull(df_being_sold), None)
if df_being_sold.shape[0] > 0:
self.engine_pg.to_sql(df_being_sold, f'{self.db_brand_analytics}_{year_moth_list[0]}',
if_exists='append')
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment