Commit c8524b22 by Peng

no message

parent 4418209b
......@@ -3,15 +3,11 @@ import os
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from utils.db_connect import BaseUtils
from amazon_params.params import DB_CONN_DICT
import math
from secure_db_client import get_remote_engine
import pandas as pd
import time
import pymysql
import requests
# import numpy as np
#
# from scipy.optimize import curve_fit
import math
"""计算销量,均值差"""
......@@ -59,19 +55,19 @@ class CalculateMean(BaseUtils):
sql_6 = f"""
SELECT * from {self.site_name}_one_category WHERE id in ( select max(id) from {self.site_name}_one_category where `year_month`='2025_5' and orders=0 and rank>50000 GROUP BY `name`)
UNION
select * from {self.site_name}_one_category where `year_month`='2025_6' and rank<=50000
select * from {self.site_name}_one_category where `year_month`='2025_8' and rank<=50000
"""
print('查询原始表6:', sql_6)
self.df_sum_6 = pd.read_sql(sql_6, con=self.engine)
self.df_sum_6 = self.engine.read_sql(sql_6)
# ---- 7 月份 ----
sql_7 = f"""
SELECT * from {self.site_name}_one_category WHERE id in ( select max(id) from {self.site_name}_one_category where `year_month`='2025_5' and orders=0 and rank>50000 GROUP BY `name`)
UNION
select * from {self.site_name}_one_category where `year_month`='2025_7' and rank<=50000
select * from {self.site_name}_one_category where `year_month`='2025_9' and rank<=50000
"""
print('查询原始表7:', sql_7)
self.df_sum_7 = pd.read_sql(sql_7, con=self.engine)
self.df_sum_7 = self.engine.read_sql(sql_7)
# 合并后直接靠 keep='last' 留 7 月
self.df_sum = pd.concat([self.df_sum_6, self.df_sum_7], ignore_index=True)
......@@ -86,62 +82,12 @@ class CalculateMean(BaseUtils):
self.cate_list = list(set(self.df_sum.name))
sql_select = f"SELECT `year_month` from selection.week_20_to_30 WHERE `week`={int(self.week)} and `year`={self.year}"
print(sql_select, 'sql_select:')
df = pd.read_sql(sql_select, con=self.engine)
df = self.engine.read_sql(sql_select)
self.year_month = list(df['year_month'])[0] if list(df['year_month']) else ''
print("self.year_month:", self.year_month)
time.sleep(2)
self.handle_data()
# def handle_data(self,max_rank=1_000_000, step=1):
# records = []
# for cate in self.cate_list:
# dfk = (self.df_sum[self.df_sum.name == cate]
# [['rank', 'orders']]
# .drop_duplicates()
# .query('orders>0')
# .sort_values('rank'))
# if len(dfk) < 3: continue
#
# # 1) 构造 log(rank), log(orders)
# lr = np.log(dfk['rank'].values)
# lo = np.log(dfk['orders'].values)
# # 2) 二次多项式扩展 X = [1, lr, lr^2]
# X = np.vstack([np.ones_like(lr), lr, lr ** 2]).T
# # 3) 求解最小二乘: coef = (X^T X)^-1 X^T lo
# coef = np.linalg.lstsq(X, lo, rcond=None)[0]
#
# # 4) 用这个多项式预测 full_range
# full = np.arange(dfk['rank'].min(), max_rank + 1, step)
# lf = np.log(full)
# log_pred = coef[0] + coef[1] * lf + coef[2] * (lf ** 2)
# orders_pred = np.exp(log_pred)
# cutoff_idx = np.argmax(orders_pred <= 30)
# # 如果从未出现 orders_pred < min_orders,cutoff_idx 会是 0
# # 但此时 orders_pred[0] 一定 >= min_orders,所以要检查:
# if orders_pred[cutoff_idx] >= 30:
# # 数组中没有小于阈值的点,保留全部
# last = len(full)
# else:
# # 在 cutoff_idx 处开始 <min_orders,就截断到它之前
# last = cutoff_idx
# full = full[:last]
# orders_pred = orders_pred[:last]
#
# # 5. 组装输出 DataFrame
# dfout = pd.DataFrame({
# 'name': cate,
# 'rank': full,
# 'orders': orders_pred
# })
# # 用四舍五入计算日均销量
# dfout['orders_day'] = (dfout['orders'] / 30).round(0).astype(int)
# dfout['year_month'] = self.year_month
# dfout['week'] = self.week
#
# records.append(dfout)
#
# records.append(dfout)
# self.df_repeat = pd.concat(records, ignore_index=True)
def handle_data(self):#旧代码
print(len(self.cate_list))
......@@ -195,12 +141,12 @@ class CalculateMean(BaseUtils):
sql = f"select en_name as name, category_id from {self.site_name}_bs_category where 1 = 1 and nodes_num = 2 group by en_name, category_id"
print('sql',sql)
df_en_name = pd.read_sql(sql, con=self.engine)
df_en_name = self.engine.read_sql(sql)
# 使用 merge 判断两个列的 name 是否一样
self.df_repeat = pd.merge(self.df_repeat, df_en_name, on='name', how='left')
self.df_repeat = self.df_repeat.loc[self.df_repeat.orders >= 30] # 保留大于0的 排名月销
self.df_repeat.drop_duplicates(['name', 'rank','orders'], inplace=True) # 去重
self.df_repeat.to_sql(f"{self.site_name}_one_category_report", con=self.engine, if_exists="append", index=False)
self.engine.to_sql(self.df_repeat,f"{self.site_name}_one_category_report",if_exists="append")
def run(self):
self.db_read_data()
......@@ -208,21 +154,20 @@ class CalculateMean(BaseUtils):
self.db_save_data()
def sendMessage(self, week, site_name):
db = pymysql.connect(host=DB_CONN_DICT['mysql_host'], port=DB_CONN_DICT['mysql_port'],
user=DB_CONN_DICT['mysql_user'],
password=DB_CONN_DICT['mysql_pwd'],
database='selection', charset="utf8mb4")
cursor = db.cursor(cursor=pymysql.cursors.DictCursor)
time_strftime = time.strftime("%Y-%m-%d %X", time.localtime())
update_workflow_progress = f"update workflow_progress set status_val=3,status='抓取结束' where page='ASIN销量' and date_info='2025-{week}' and site_name='{site_name}' and date_type='week'"
print(update_workflow_progress)
cursor.execute(update_workflow_progress)
db.commit()
cursor.close()
db.close()
engine_us_mysql = get_remote_engine(
site_name='us', # -> database "selection"
db_type="mysql", # -> 服务端 alias "mysql"
)
with engine_us_mysql.begin() as conn:
time_strftime = time.strftime("%Y-%m-%d %X", time.localtime())
update_workflow_progress = f"update workflow_progress set status_val=3,status='抓取结束' where page='ASIN销量' and date_info='2025-{week}' and site_name='{site_name}' and date_type='week'"
print(update_workflow_progress)
conn.execute(update_workflow_progress)
url = 'http://47.112.96.71:8082/selection/sendMessage'
data = {
'account': 'pengyanbing,fangxingjun,wangrui4',
'account': 'pengyanbing,fangxingjun',
'title': f"{site_name} 站点类目销量统计",
'content': str(self.week) + f' 周 {site_name}站点类目销量计算 已结束,请确认下一步流程!时间:' + time_strftime
}
......@@ -240,12 +185,12 @@ if __name__ == '__main__':
handle_obj_us = CalculateMean(site_name='us', year=2025, week=week)
handle_obj_us.run()
handle_obj_us.sendMessage(week, site_name='us')
#handle_obj_uk = CalculateMean(site_name='uk', year=2025, week=week)
#handle_obj_uk.run()
# handle_obj_uk.sendMessage(week, site_name='uk')
#handle_obj_de = CalculateMean(site_name='de', year=2025, week=week)
#handle_obj_de.run()
#handle_obj_de.sendMessage(week, site_name='de')
# handle_obj_uk = CalculateMean(site_name='uk', year=2025, week=week)
# handle_obj_uk.run()
# handle_obj_uk.sendMessage(week, site_name='uk')
# handle_obj_de = CalculateMean(site_name='de', year=2025, week=week)
# handle_obj_de.run()
# handle_obj_de.sendMessage(week, site_name='de')
# handle_obj_fr = CalculateMean(site_name='fr', year=2025, week=week)
# handle_obj_fr.run()
# handle_obj_fr.sendMessage(week, site_name='fr')
......
from playwright.sync_api import sync_playwright
from sqlalchemy import create_engine
from secure_db_client import get_remote_engine
import pandas as pd
import urllib.parse
import json
import traceback
import time
from sqlalchemy.engine import URL
'商机探测器。下载bsr分类数据'
......@@ -17,32 +16,16 @@ class One688LoginSpider(object):
yaer = time.strftime('%Y', time.localtime(time.time()))
self.y_w = f"{yaer}-{month}"
self.mysql_connect()
def mysql_connect(self):
if self.site == 'us':
db = 'selection'
else:
db = f'selection_{self.site}'
DB_CONN_DICT = {
"mysql_port": 3306,
"mysql_db": db,
"mysql_user": "XP_Yswg2025_PY",
"mysql_pwd": "Gd1pGJog1ysLMLBdML8w81",
"mysql_host": "rm-wz9yg9bsb2zf01ea4yo.mysql.rds.aliyuncs.com",
}
url = URL.create(
drivername="mysql+pymysql",
username=DB_CONN_DICT["mysql_user"],
password=DB_CONN_DICT["mysql_pwd"], # 原始密码,含 @ 也没问题
host=DB_CONN_DICT["mysql_host"],
port=int(DB_CONN_DICT["mysql_port"]),
database=db,
query={"charset": "utf8mb4"}
self.engine_us_mysql = get_remote_engine(
site_name='us', # -> database "selection"
db_type='mysql', # -> 服务端 alias "mysql"
)
self.engine_pg = get_remote_engine(
site_name=self.site, # -> database "selection"
db_type='postgresql_15_outer', # -> 服务端 alias "mysql"
)
self.engine_us_mysql = create_engine( url)
self.engine_pg = create_engine(
f"postgresql+psycopg2://postgres:F9kL2sXe81rZq@113.100.143.162:5432/{db}",
encoding='utf-8')
return self.engine_us_mysql
def print_request_finished(self, request):
......@@ -59,7 +42,7 @@ class One688LoginSpider(object):
def select_category_json(self):
sql = 'SELECT category_json,id FROM seller_product_opportunity_syn where state=1'
engine_mysql = self.mysql_connect()
df_category_json = pd.read_sql(sql, con=engine_mysql)
df_category_json = engine_mysql.read_sql(sql)
category_data_list = list(df_category_json['category_json'] + '|=|=|' + df_category_json['id'].astype("U"))
data_list = []
for i in category_data_list:
......@@ -132,8 +115,7 @@ class One688LoginSpider(object):
'minimum_price', 'maximum_price', 'avg_price',
'return_rate_t360', 'search_volume_growth_t360',
'site', 'date_info', 'search_term'])
df_category_data.to_sql('seller_product_opportunity', con=self.engine_pg, if_exists='append',
index=False)
self.engine_pg.to_sql(df_category_data,'seller_product_opportunity', if_exists='append')
print('存储成功:', len(category_data_list))
with self.engine_us_mysql.begin() as conn:
sql_update = f"update seller_product_opportunity_syn set state=3 where id={int(data[1])};"
......@@ -155,10 +137,10 @@ class One688LoginSpider(object):
[self.site, self.y_w, '商机探测器抓取完成', 3, 'seller_product_opportunity', 'month',
'商机探测器', '是']]
df_seller_asin_account = pd.DataFrame(data=workflow_everyday_list,
columns=['site_name', 'date_info', 'status', 'status_val',
'table_name', 'report_date', 'page', 'is_end'])
df_seller_asin_account.to_sql('workflow_everyday', con=self.engine_us_mysql, if_exists='append',
index=False)
columns=['site_name', 'report_date', 'status', 'status_val',
'table_name', 'date_type', 'page', 'is_end'])
self.engine_us_mysql.to_sql(df_seller_asin_account,'workflow_everyday', if_exists='append'
)
def crawl(self, url):
self.page.on("requestfinished", self.print_request_finished)
......
......@@ -670,7 +670,10 @@ class nsr_catgory(BaseUtils):
en_name_id_list.append(en_name_id[0])
id_tuple = tuple(en_name_id_list)
print(len(id_tuple))
update_sql = f'update {self.site_name}_new_releases set one_category_id={id[0]} where id in {id_tuple}'
if len(id_tuple) == 1:
update_sql = f"""UPDATE {self.site_name}_new_releases set one_category_id={id[0]} where id in ('{id_tuple[0]}')"""
else:
update_sql = f'update {self.site_name}_new_releases set one_category_id={id[0]} where id in {id_tuple}'
self.db_cursor_connect_update(update_sql, self.site_name)
except:
pass
......
......@@ -408,32 +408,35 @@ class async_account_name_products(BaseUtils):
try:
with self.engine_pg6.begin() as conn:
# 查詢收藏asin
sql_read_asin = f'SELECT id, data_id, end_time FROM {self.db_user_collection_syn} WHERE now() >= crawling_time and now() <= end_time and state = 1 and data_type = 1 ORDER BY id FETCH FIRST {self.read_size} ROWS ONLY FOR UPDATE;'
sql_read_asin = f'SELECT id, data_id, end_time FROM {self.db_user_collection_syn} WHERE now() >= crawling_time and now() <= end_time and state = 1 and data_type = 1 ORDER BY id FOR UPDATE'
print('查詢收藏asin:', sql_read_asin)
b = conn.execute(sql_read_asin)
self.df_read_asin = pd.DataFrame(b, columns=['id', 'data_id', 'end_time'])
self.index_tuple_asin = tuple(self.df_read_asin['id'])
if self.index_tuple_asin:
if len(self.index_tuple_asin) == 1:
sql_update = f"""UPDATE {self.db_user_collection_syn} b set state=2 where b.id in ({self.index_tuple_asin[0]})"""
else:
sql_update = f"""UPDATE {self.db_user_collection_syn} b set state=2 where b.id in {self.index_tuple_asin}"""
conn.execute(sql_update)
user_asin_list = list(
self.df_read_asin.data_id + '|-|' + '8' + '|-|' + '1' + '|-|' + self.site_name + '|-|' + self.df_read_asin.end_time.astype(
str))
self.asin_real_spider = list(self.df_read_asin['data_id'])
for user_asin in user_asin_list:
print(user_asin, 'user_asinuser_asin')
user_asin_list = user_asin.split('|-|')
self.user_asin_list.append(user_asin_list)
print(self.user_asin_list)
self.save_asin_syn()
self.df_read_asin = self.engine_pg6.read_sql(sql_read_asin)
if self.df_read_asin.shape[0] !=0:
self.index_tuple_asin = tuple(self.df_read_asin['id'])
print('self.index_tuple_asin::',len(self.index_tuple_asin))
if self.index_tuple_asin:
if len(self.index_tuple_asin) == 1:
sql_update = f"""UPDATE {self.db_user_collection_syn} b set state=2 where b.id in ({self.index_tuple_asin[0]})"""
else:
sql_update = f"""UPDATE {self.db_user_collection_syn} b set state=2 where b.id in {self.index_tuple_asin}"""
conn.execute(sql_update)
user_asin_list = list(
self.df_read_asin.data_id + '|-|' + '8' + '|-|' + '1' + '|-|' + self.site_name + '|-|' + self.df_read_asin.end_time.astype(
str))
self.asin_real_spider = list(self.df_read_asin['data_id'])
for user_asin in user_asin_list:
print(user_asin, 'user_asinuser_asin')
user_asin_list = user_asin.split('|-|')
self.user_asin_list.append(user_asin_list)
print(self.user_asin_list)
print('存储 收藏asin')
self.save_asin_syn()
# 查询收藏店铺
sql_read = f'SELECT id, product_url,data_id,end_time FROM {self.db_user_collection_syn} WHERE now() >= crawling_time and now() <= end_time and state = 1 and data_type = 2 ORDER BY id FETCH FIRST {self.read_size} ROWS ONLY FOR UPDATE;'
print('查询收藏店铺:', sql_read)
a = conn.execute(sql_read)
self.df_read = pd.DataFrame(a, columns=['id', 'product_url', 'data_id', 'end_time'])
# a = conn.execute(sql_read)
self.df_read = self.engine_pg6.read_sql(sql_read)
# self.df_read = pd.DataFrame(a, columns=['id', 'product_url', 'data_id', 'end_time'])
if self.df_read.shape[0] == 0:
self.stop_item_queue = False
return []
......@@ -473,9 +476,7 @@ class async_account_name_products(BaseUtils):
else:
sql_DELETE = f"""DELETE FROM {self.site_name}_user_seller_collections where account_id in {tuple(self.account_name_list_update)}"""
conn.execute(sql_DELETE)
df_asin_variation.to_sql(f"{self.site_name}_user_seller_collections", con=self.engine_pg6,
if_exists='append',
index=False)
self.engine_pg6.to_sql(df_asin_variation,f"{self.site_name}_user_seller_collections",if_exists='append')
self.asin_detail_list = []
break
except Exception as e:
......
......@@ -351,8 +351,8 @@ else:
# redis
REDIS_CONN = {
"redis_host": "113.100.143.162",
"redis_port": 6379,
"redis_pwd": "fG7#vT6kQ1pX",
"redis_port": 54372,
"redis_pwd": "N8#rTp2Xz!Lk6@Vw9qHs4&Yb1Fm0Cj3",
"redis_db": 14
}
......
import sys
import os
import sys
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from func_timeout import func_set_timeout
......@@ -29,11 +29,12 @@ class Save_asin_detail(BaseUtils):
self.init_db_names()
self.cols = self.reuests_para_val.db_column(site_name)
self.redis_client = self.redis_db()
def init_db_names(self):
self.engine = self.mysql_connect()
self.engine_pg = self.pg_connect() # 更改变体 时 存储 变体表 使用 self.engine
self.kafuka_producer = self.kafuka_connect() # 卡夫卡连接
self.kafuka_producer_str = self.kafuka_connect(acks=True,connections_max_idle_ms=300000) # 卡夫卡连接
self.kafuka_producer_str = self.kafuka_connect(acks=True, connections_max_idle_ms=300000) # 卡夫卡连接
self.redis_db14 = self.redis_db() # redis 链接
self.db_syn = self.site_name + '_all_syn_st_month_2025'
self.db_seller_account_syn = self.site_name + DB_REQUESTS_ASIN_PARAMS['db_seller_account_syn'][2:] + '_distinct'
......@@ -90,7 +91,7 @@ class Save_asin_detail(BaseUtils):
self.df_read.drop_duplicates(['asin'], inplace=True)
if self.df_read.shape[0] > 0:
self.index_tuple = tuple(self.df_read['id'])
print(self.index_tuple,'self.index_tuplself.index_tuplself.index_tupl')
print(self.index_tuple, 'self.index_tuplself.index_tuplself.index_tupl')
# 使用默认值填充空值
self.df_read['volume'].fillna('null', inplace=True)
self.df_read['weight_str'].fillna('null', inplace=True)
......@@ -220,8 +221,8 @@ class Save_asin_detail(BaseUtils):
print(f'存储pg:{self.site_name}_asin_detail_month_{report_info}')
# df.to_csv(r'2025-7-30_srs_search_term_asin.csv', index=False)
self.engine_pg.to_sql(df,f"{self.site_name}_asin_detail_month_{report_info}",
if_exists='append')
self.engine_pg.to_sql(df, f"{self.site_name}_asin_detail_month_{report_info}",
if_exists='append')
break
except Exception as e:
traceback.print_exc() # ★ 打印完整栈到终端
......@@ -258,6 +259,27 @@ class Save_asin_detail(BaseUtils):
self.db_change_state(state=13, asin_list=asin_not_div_id_dp_list)
@func_set_timeout(240)
def save_asin_not_buysales(self, asin_buySales_list):
while True:
try:
if is_internet_available():
pass
else:
self.engine = self.mysql_connect()
self.engine_pg = self.pg_connect()
print('错误月销的asin:', asin_buySales_list)
print('错误月销的asin:', len(asin_buySales_list))
df_asin_ = pd.DataFrame(data=asin_buySales_list, columns=['asin', 'buysales', 'date_info'])
self.engine_pg.to_sql(df_asin_, f'{self.site_name}_asin_detail_2025_not_buysales', if_exists='append')
break
except Exception as e:
print("存储 _asin_detail_2025_not_buysales 文本 数据错误", e)
self.engine = self.mysql_connect()
self.engine_pg = self.pg_connect()
time.sleep(random.uniform(10, 20.5))
continue
@func_set_timeout(240)
def save_bs_category_asin_detail(self, bs_category_asin_list_pg):
# 存储 asin bsr 文本
while True:
......@@ -278,7 +300,8 @@ class Save_asin_detail(BaseUtils):
if df_asin_bsr_pg.shape[0] > 0:
date_info_ = list(df_asin_bsr_pg.date_info)[0].replace('-', '_')
print(f'{self.site_name}_bs_category_asin_detail_month_{date_info_}')
self.engine_pg.to_sql(df_asin_bsr_pg,f'{self.site_name}_bs_category_asin_detail_month_{date_info_}',
self.engine_pg.to_sql(df_asin_bsr_pg,
f'{self.site_name}_bs_category_asin_detail_month_{date_info_}',
if_exists='append')
bs_category_asin_list_pg = []
break
......@@ -340,8 +363,8 @@ class Save_asin_detail(BaseUtils):
else:
sql_delete = f"delete from {self.db_seller_asin_account} where asin in {tuple(set(df_seller_asin_account.asin))};"
conn.execute(sql_delete)
self.engine.to_sql(df_seller_asin_account,self.db_seller_asin_account,
if_exists='append')
self.engine.to_sql(df_seller_asin_account, self.db_seller_asin_account,
if_exists='append')
buyBoxname_asin_list = []
break
except Exception as e:
......@@ -412,7 +435,7 @@ class Save_asin_detail(BaseUtils):
sql_delete = f"delete from {self.site_name}_all_syn_st_asin where asin in {tuple(set(df_asin.asin))};"
conn.execute(sql_delete)
df_asin['state'] = state
self.engine_pg.to_sql(df_asin,f'{self.site_name}_all_syn_st_asin',if_exists='append')
self.engine_pg.to_sql(df_asin, f'{self.site_name}_all_syn_st_asin', if_exists='append')
break
except Exception as e:
self.engine = self.mysql_connect()
......@@ -422,6 +445,5 @@ class Save_asin_detail(BaseUtils):
f"\n{traceback.format_exc()}")
continue
if __name__ == '__main__':
Save_asin_detail()
# if __name__ == '__main__':
# Save_asin_detail()
......@@ -3,17 +3,16 @@ import sys
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from secure_db_client import get_remote_engine
import traceback
from curl_cffi import requests
from utils.db_connect import BaseUtils
import re
from lxml import etree
os.environ['NO_PROXY'] = 'amazon.com'
import json
from urllib.parse import urlparse
import datetime
import time
class Amazon_reviewer():
......@@ -111,6 +110,35 @@ class Amazon_reviewer():
"review_data_img": review_img}
print(items)
def pg_get_asin(self):
while True:
try:
print('轮询 mysql 查询:', datetime.now().strftime("%m-%d %H:%M:%S"))
engine_pg = self.pg_connect()
spider_state_sql = """select asin,task_id from ai_asin_analyze_spider where status = '未开始' limit 20 """
print('spider_state_sql:', spider_state_sql)
df_asin = engine_pg.read_sql(spider_state_sql)
if not df_asin.empty:
update_time = int(time.time())
with engine_pg.begin() as conn:
index_tuple = tuple(df_asin['task_id'])
if len(index_tuple) == 1:
sql_update = f"""UPDATE ai_asin_analyze_spider a set status='爬取中',update_time='{update_time}' where a.task_id in ({index_tuple[0]})"""
else:
sql_update = f"""UPDATE ai_asin_analyze_spider a set status='爬取中',update_time='{update_time}' where a.task_id in {index_tuple}"""
print('UPDATE_sql:', sql_update)
conn.execute(sql_update)
_asin_lis = list(df_asin.asin + '|-|' + df_asin.task_id.astype("U"))
print("_asin_lis:::", _asin_lis, )
print("_asin_lis::: len ", len(_asin_lis))
run_spider(_asin_lis) # 传递asin 列表
time.sleep(3)
# break
except Exception as e:
print('查询 mysql_get_asin 报错::', e, f"\n{traceback.format_exc()}")
def run(self):
self.redis_db()
self.get_asin_reviewer()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment