Commit 77b04aaa by Peng

no message

parent 80347e49
import sys, os
from datetime import datetime
sys.path.append(os.path.dirname(sys.path[0]))
import json
from flask import Flask, request as flask_request, Response
from utils.secure_db_client import get_remote_engine
flask_app = Flask(__name__)
def get_business_val(seller_address, key):
parts =[p.strip()for p in seller_address.split("|-|")]
for i, p in enumerate(parts):
if p.startswith(key):
if key in ("Business Address","Geschaftsadresse","Geschäftsadresse"):
return " ".join(parts[i + 1:]).strip()
elif i + 1 < len(parts):
return parts[i + 1].strip()
return None
def json_response(data):
return Response(json.dumps(data, ensure_ascii=False), mimetype='application/json')
# 站点对应的key映射
site_key_map = {
"us": ("Business Name", "Business Address"),
"uk": ("Business Name", "Business Address"),
"de": ("Geschäftsname", "Geschäftsadresse")
}
@flask_app.route('/seller/info', methods=['GET'])
def get_seller_info():
site_name = flask_request.args.get('site_name', '').strip()
seller_id = flask_request.args.get('seller_id', '').strip()
if not site_name or not seller_id:
return json_response({"code": 400, "message": "缺少参数 site_name 或 seller_id"})
if site_name not in site_key_map:
return json_response({"code": 400, "message": f"不支持的站点: {site_name}"})
try:
engine = get_remote_engine(site_name=site_name, db_type="postgresql_14")
year = datetime.now().year
sql = f"""SELECT seller_address FROM {site_name}_seller_account_feedback_{year}
WHERE seller_id = '{seller_id}' ORDER BY id DESC LIMIT 1"""
df = engine.read_sql(sql)
# 今年没查到,查去年
if df.empty:
sql = f"""SELECT seller_address FROM {site_name}_seller_account_feedback_{year - 1}
WHERE seller_id = '{seller_id}' ORDER BY id DESC LIMIT 1"""
df = engine.read_sql(sql)
print('sql::', sql)
print('df::', df.values.tolist())
if df.empty:
return json_response({"code": 404, "message": "未找到该卖家信息", "data": None})
seller_address = df.iloc[0]['seller_address']
if not seller_address:
return json_response({"code": 404, "message": "seller_address 为空", "data": None})
name_key, addr_key = site_key_map.get(site_name, ("Business Name", "Business Address"))
business_name = get_business_val(seller_address, name_key)
business_address = get_business_val(seller_address, addr_key)
return json_response({
"code": 200,
"message": "success",
"data": {
"business_name": business_name,
"business_address": business_address
}
})
except Exception as e:
return json_response({"code": 500, "message": f"查询失败: {str(e)}"})
if __name__ == '__main__':
flask_app.run('0.0.0.0', 10240)
import sys
import os
sys.path.append(os.path.dirname(sys.path[0]))
import pandas as pd
from urllib.parse import quote
import datetime
from utils.secure_db_client import get_remote_engine
from fastapi import FastAPI
import uvicorn
app = FastAPI()
engine = get_remote_engine(site_name='us', db_type="doris_adv", database="advertising_manager")
def build_url(search_term: str, page: int) -> str:
site_url = "https://www.amazon.com/"
url_template = f"{site_url}s?k={{search_term}}&page={{page_number}}"
q = quote(str(search_term), "utf-8")
q = q.replace("'", "%27").replace("/", "%2F")
q = (q.replace(" ", "+")
.replace("&", "%26")
.replace("#", "%23")
.replace("(", "%28")
.replace(")", "%29"))
return url_template.format(search_term=q, page_number=page)
@app.get("/generate_sp_search_term")
def generate_sp_search_term():
"""读取 sp_keyword_position_keyword 的搜索词,构建5页url,写入 us_sp_search_term_syn"""
try:
# 读取关键词
df = engine.read_sql("SELECT DISTINCT keyword_text FROM sp_keyword_position_keyword")
df.columns = ['search_term']
keyword_count = len(df)
if keyword_count == 0:
return {"code": 400, "msg": "sp_keyword_position_keyword 表没有数据"}
# 每条记录复制5次(5页)
df5 = df.loc[df.index.repeat(5)].copy()
s = datetime.datetime.now().strftime("%Y-%m-%d-%H")
df5["page"] = df5.groupby(level=0).cumcount() + 1
df5["time_batch"] = s
df5["url"] = df5.apply(lambda r: build_url(r["search_term"], int(r["page"])), axis=1)
need_cols = ["search_term", "url", "time_batch"]
df5 = df5[need_cols]
df5.drop_duplicates(['search_term', 'url'], inplace=True)
total = len(df5)
# 清空表
with engine.begin() as conn:
conn.execute("TRUNCATE TABLE us_sp_search_term_syn")
# 写入
engine.to_sql(df5, 'us_sp_search_term_syn', if_exists='append')
return {"code": 200, "msg": "成功", "keyword_count": keyword_count, "url_count": total}
except Exception as e:
return {"code": 500, "msg": str(e)}
@app.get("/health")
def health():
return {"status": "ok"}
if __name__ == '__main__':
uvicorn.run(app, host="0.0.0.0", port=8099)
# import pandas as pd
#
# file_path = r"C:\Users\ASUS\Downloads\Result_65.xlsx"
# df = pd.read_excel(file_path)
#
# # 去掉列名可能存在的空格(很常见:'keywordId ' 这种)
# df.columns = df.columns.str.strip()
#
# # 按原始列名映射到数据库字段名
# rename_map = {
# "keywordId": "keyword_id",
# "keywordText": "search_term",
# "adGroupId": "adgroupid",
# "campaignId": "campaignid",
# "adId": "adid",
# "sku": "sku",
# }
#
# df = df.rename(columns=rename_map)
#
# # 只保留表里需要的6列,并按表结构排序
# need_cols = ["search_term", "keyword_id", "adgroupid", "campaignid", "adid", "sku"]
# df = df[need_cols]
#
# print(df.columns)
# print(df.head())
# from utils.secure_db_client import get_remote_engine
# engine = get_remote_engine('us', 'postgresql_14')
# engine.to_sql(df,'sp_search_term_syn', if_exists='append')
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment