Commit c007b3d1 by fangxingjun

提交所有更改

parent 0f05969c
# 默认忽略的文件
/shelf/
/workspace.xml
# 数据源本地存储已忽略文件
/dataSources/
/dataSources.local.xml
# 基于编辑器的 HTTP 客户端请求
/httpRequests/
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="PublishConfigData" remoteFilesAllowedToDisappearOnAutoupload="false">
<serverData>
<paths name="fangxingjun@192.168.200.210:22">
<serverdata>
<mappings>
<mapping local="$PROJECT_DIR$" web="/" />
</mappings>
</serverdata>
</paths>
<paths name="root@192.168.10.219:22">
<serverdata>
<mappings>
<mapping local="$PROJECT_DIR$" web="/" />
</mappings>
</serverdata>
</paths>
<paths name="root@hadoop1.jiasujia.space:22211">
<serverdata>
<mappings>
<mapping local="$PROJECT_DIR$" web="/" />
</mappings>
</serverdata>
</paths>
</serverData>
</component>
</project>
\ No newline at end of file
<component name="InspectionProjectProfileManager">
<profile version="1.0">
<option name="myName" value="Project Default" />
<inspection_tool class="PyPackageRequirementsInspection" enabled="true" level="WARNING" enabled_by_default="true">
<option name="ignoredPackages">
<value>
<list size="5">
<item index="0" class="java.lang.String" itemvalue="py4j" />
<item index="1" class="java.lang.String" itemvalue="PySide6" />
<item index="2" class="java.lang.String" itemvalue="peewee" />
<item index="3" class="java.lang.String" itemvalue="numpy" />
<item index="4" class="java.lang.String" itemvalue="deap" />
</list>
</value>
</option>
</inspection_tool>
<inspection_tool class="PyPep8Inspection" enabled="true" level="WEAK WARNING" enabled_by_default="true">
<option name="ignoredErrors">
<list>
<option value="E501" />
</list>
</option>
</inspection_tool>
<inspection_tool class="PyUnresolvedReferencesInspection" enabled="true" level="WARNING" enabled_by_default="true">
<option name="ignoredIdentifiers">
<list>
<option value="AmazonSpider.pyspark_job.dim.dim_st_asin.utils" />
</list>
</option>
</inspection_tool>
</profile>
</component>
\ No newline at end of file
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.10 (base) (2)" project-jdk-type="Python SDK" />
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/yswg_developer.iml" filepath="$PROJECT_DIR$/.idea/yswg_developer.iml" />
</modules>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="WebResourcesPaths">
<contentEntries>
<entry url="file://$PROJECT_DIR$">
<entryData>
<resourceRoots>
<path value="file://$PROJECT_DIR$/Pyspark_job" />
</resourceRoots>
</entryData>
</entry>
</contentEntries>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/Pyspark_job" isTestSource="false" />
</content>
<orderEntry type="jdk" jdkName="Python 3.10 (base) (2)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
<component name="PyDocumentationSettings">
<option name="format" value="PLAIN" />
<option name="myDocStringFormat" value="Plain" />
</component>
<component name="TemplatesService">
<option name="TEMPLATE_CONFIGURATION" value="Jinja2" />
<option name="TEMPLATE_FOLDERS">
<list>
<option value="$MODULE_DIR$/py_gpt/app/templates" />
</list>
</option>
</component>
</module>
\ No newline at end of file
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from pyspark.sql.types import StringType, MapType
from utils.common_util import CommonUtil, DateTypes
from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil
from utils.db_util import DBUtil
from pyspark.sql import functions as F
import numpy as np
class DwtStMtChristmasInfo(object):
def __init__(self, site_name, date_type, date_info):
self.site_name = site_name
self.date_type = date_type
self.date_info = date_info
self.hive_tb = "dwt_st_mt_christmas_info"
self.partition_dict = {
"site_name": site_name,
"date_type": date_type,
"date_info": date_info
}
# 落表路径校验
self.hdfs_path = CommonUtil.build_hdfs_path(self.hive_tb, partition_dict=self.partition_dict)
# 注册自定义函数 (UDF)
self.u_theme_pattern = F.udf(self.udf_theme_pattern, StringType())
# 创建spark_session对象相关
app_name = f"{self.__class__.__name__}:{site_name}:{date_info}"
self.spark = SparkUtil.get_spark_session(app_name)
# 全局df初始化
self.df_st = self.spark.sql(f"select 1+1;")
self.df_st_info = self.spark.sql(f"select 1+1;")
self.df_st_asin_info = self.spark.sql(f"select 1+1;")
self.df_asin_order_info = self.spark.sql(f"select 1+1;")
self.df_asin_launchtime_info = self.spark.sql(f"select 1+1;")
self.df_self_asin = self.spark.sql(f"select 1+1;")
self.df_st_handle = self.spark.sql(f"select 1+1;")
self.df_mt_volume = self.spark.sql(f"select 1+1;")
# 其他变量
self.theme_list_str = str() # 正则匹配
@staticmethod
def udf_theme_pattern(parttern_text, theme_list_str):
found_themes = [theme.strip() for theme in eval(theme_list_str) if theme in parttern_text]
if found_themes:
return ','.join(set(found_themes))
else:
return None
def read_data(self):
print("======================查询sql如下======================")
# 获取搜索词及搜索词下搜索产品总数
sql = f"""
select search_term,
cast(quantity_being_sold as int) as st_quantity_being_sold from (
select search_term,
quantity_being_sold,
row_number() over (partition by search_term order by created_time desc) as rank_flag
from ods_st_quantity_being_sold
where site_name = '{self.site_name}'
and date_type = 'month'
and date_info = '{self.date_info}'
and search_term like '%christmas%') t1
where t1.rank_flag=1
"""
self.df_st_info = self.spark.sql(sql)
# 获取搜索词与asin的关系
sql = f"""
select
search_term,
asin
from dwd_st_asin_measure
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.date_info}'
and search_term like '%christmas%'
"""
print(sql)
self.df_st_asin_info = self.spark.sql(sql)
# 获取asin的销量相关数据
sql = f"""
select
asin,
asin_bsr_orders,
asin_zr_orders_sum
from dwd_asin_measure
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.date_info}'
"""
print(sql)
self.df_asin_order_info = self.spark.sql(sql)
# 获取asin详情中的上架日期数据
sql = f"""
select
asin,
asin_launch_time,
case when(datediff(`current_date`(),to_date(asin_launch_time)) <= 90) then 1 end as is_new_flag
from dim_asin_detail
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.date_info}'
"""
print(sql)
self.df_asin_launchtime_info = self.spark.sql(sql)
# 读取ods_self_asin,获得公司内部asin信息
sql = f"""select
asin,
1 as is_self_asin
from ods_self_asin
where site_name='{self.site_name}' group by asin """
print(sql)
self.df_self_asin = self.spark.sql(sqlQuery=sql)
# 读取Merchantwords中的对应christmas的搜索量
sql = f"""
select
keyword as search_term,
volume as mt_volume
from dwt_merchantwords_st_detail
where site_name='{self.site_name}'
and keyword like '%christmas%'
"""
print(sql)
self.df_mt_volume = self.spark.sql(sqlQuery=sql)
def handle_data(self):
self.read_data()
self.handle_st_data()
self.save_data()
def handle_st_data(self):
# 关联处理好搜索词和asin的关系
self.df_st_asin_info = self.df_st_asin_info.join(
self.df_st_info, on=['search_term'], how='inner'
).cache()
# 关联必要的数据
self.df_st_handle = self.df_st_asin_info.join(
self.df_asin_order_info, on=['asin'], how='left'
).join(
self.df_asin_launchtime_info, on=['asin'], how='left'
).join(
self.df_self_asin, on=['asin'], how='left'
)
# 计算各类搜索词占比数据
self.df_st_handle = self.df_st_handle.groupby(['search_term']).agg(
F.max('st_quantity_being_sold').alias('total_searched_products'),
F.count('asin').alias('total_page3_products'),
F.sum('is_self_asin').alias('total_self_products'),
F.sum('asin_bsr_orders').alias('total_bsr_orders'),
F.sum('asin_zr_orders_sum').alias('total_orders'),
F.sum('is_new_flag').alias('total_new_products')
)
self.df_st_handle = self.df_st_handle.join(
self.df_mt_volume, on=['search_term'], how='left'
)
def save_data(self):
df_save = self.df_st_handle.select(
F.col('search_term'),
F.col('mt_volume'),
F.col('total_searched_products'),
F.col('total_page3_products'),
F.col('total_self_products'),
F.round(F.col('total_self_products')/F.col('total_page3_products'), 4).alias('self_product_rate'),
F.col('total_bsr_orders'),
F.col('total_orders'),
F.col('total_new_products'),
F.round(F.col('total_new_products') / F.col('total_page3_products'), 4).alias('new_product_rate'),
F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS').alias('created_time'),
F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS').alias('updated_time'),
F.lit(self.site_name).alias('site_name'),
F.lit(self.date_type).alias('date_type'),
F.lit(self.date_info).alias('date_info')
)
# CommonUtil.check_schema(self.spark, df_save, self.hive_tb)
print(f"清除hdfs目录中:{self.hdfs_path}")
HdfsUtils.delete_file_in_folder(self.hdfs_path)
df_save = df_save.repartition(10)
partition_by = ["site_name", "date_type", "date_info"]
print(f"当前存储的表名为:{self.hive_tb},分区为{partition_by}", )
df_save.write.saveAsTable(name=self.hive_tb, format='hive', mode='append', partitionBy=partition_by)
print("success")
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
date_type = CommonUtil.get_sys_arg(2, None)
date_info = CommonUtil.get_sys_arg(3, None) # 参数3:年-周/年-月/年-季/年-月-日, 比如: 2022-1
assert site_name is not None, "site_name 不能为空!"
assert date_type is not None, "date_type 不能为空!"
assert date_info is not None, "date_info 不能为空!"
obj = DwtStMtChristmasInfo(site_name=site_name, date_type=date_type, date_info=date_info)
obj.handle_data()
\ No newline at end of file
...@@ -17,7 +17,7 @@ class GetRedisData(object): ...@@ -17,7 +17,7 @@ class GetRedisData(object):
"it": 5, "it": 5,
} }
self.site_name = 'us' self.site_name = 'us'
self.client = redis.Redis(host='192.168.10.224', port=6379, db=self.redis_db[self.site_name], password='yswg2023') self.client = redis.Redis(host='192.168.10.224', port=6379, db=self.redis_db[self.site_name], password='N8#rTp2Xz!Lk6@Vw9qHs4&Yb1Fm0Cj3')
@staticmethod @staticmethod
def udf_cal_crc32(asin, key_size): def udf_cal_crc32(asin, key_size):
...@@ -80,7 +80,7 @@ if __name__ == '__main__': ...@@ -80,7 +80,7 @@ if __name__ == '__main__':
# handle_obj = KafkaAsinDetailHistory(asin='B0C7BZL9C3') # handle_obj = KafkaAsinDetailHistory(asin='B0C7BZL9C3')
# handle_obj = KafkaAsinDetailHistory(asin='B0C6DM2V9X') # handle_obj = KafkaAsinDetailHistory(asin='B0C6DM2V9X')
# handle_obj = KafkaAsinDetailHistory(asin='B0BV7CBQW3') # handle_obj = KafkaAsinDetailHistory(asin='B0BV7CBQW3')
handle_obj = GetRedisData(asin='B08KR79NBR') handle_obj = GetRedisData(asin='B07Z17PZZM')
# handle_obj = GetRedisData(asin='B0062TMTMK') # handle_obj = GetRedisData(asin='B0062TMTMK')
# handle_obj.run() # handle_obj.run()
handle_obj.get_redis_data() handle_obj.get_redis_data()
\ No newline at end of file
import json
import os
import sys
from typing import Dict
import requests
sys.path.append(os.path.dirname(sys.path[0]))
class DolphinschedulerHelper(object):
_admin_token = "2a761f0d17baac7ac6ac4a23fe6f33df"
_ip_port = "http://113.100.143.162:12345"
_project_map = {}
_project_df_map = {}
@classmethod
def get_http_header(cls):
return {
"token": cls._admin_token,
}
pass
@classmethod
def get_project_map(cls):
if len(cls._project_map) == 0:
url = f"{cls._ip_port}/dolphinscheduler/projects"
resp = requests.get(
url,
headers=cls.get_http_header(),
params={
"pageNo": 1,
"pageSize": 10,
}
)
resp_json = json.loads(resp.content.decode("utf-8"))
for item in resp_json['data']['totalList']:
cls._project_map[item['name']] = item['code']
return cls._project_map
@classmethod
def get_project_df_map(cls, project_code):
if cls._project_df_map.get(project_code) == None:
url = f"{cls._ip_port}/dolphinscheduler/projects/{project_code}/process-definition/simple-list"
resp = requests.get(
url,
headers=cls.get_http_header()
)
resp_json = json.loads(resp.content.decode("utf-8"))
cls._project_df_map[project_code] = {}
for item in resp_json['data']:
cls._project_df_map[project_code][item['name']] = item['code']
return cls._project_df_map[project_code]
@classmethod
def start_process_instance_common(cls, project_name: str,
process_df_name: str,
startParams: Dict,
# 警告类型
warning_Type: str = "NONE"
):
"""
启动一个海豚流程
:param project_name: 项目名
:param process_df_name: 流程名
:param startParams: 启动全局参数
:param warning_Type: 警告类型 NONE ALL
:return:
"""
project_map = cls.get_project_map()
project_code = project_map.get(project_name)
process_df_map: Dict = cls.get_project_df_map(project_code)
process_df_code = process_df_map.get(process_df_name)
url = f"{cls._ip_port}/dolphinscheduler/projects/{project_code}/executors/start-process-instance"
startParams['_sender'] = "api"
# processDefinitionCode: 9651368013984
# scheduleTime:
# failureStrategy: CONTINUE
# warningType: ALL
# warningGroupId: 5
# execType:
# startNodeList:
# taskDependType: TASK_POST
# runMode: RUN_MODE_SERIAL
# processInstancePriority: MEDIUM
# workerGroup: default
# environmentCode: 5769107604288
# startParams: {"site_name":"us","date_type":"week","wx_user":"wujicang"}
# expectedParallelismNumber:
# dryRun: 0
req_params = {
"processDefinitionCode": process_df_code,
"scheduleTime": "",
# 失败策略
"failureStrategy": "CONTINUE",
"warningType": warning_Type,
# 告警组 2 是http推送报警
"warningGroupId": "2",
"execType": "",
"startNodeList": "",
"taskDependType": "TASK_POST",
"runMode": "RUN_MODE_SERIAL",
"processInstancePriority": "MEDIUM",
"workerGroup": "h5",
# 环境组
"environmentCode": "10337211525600",
"startParams": json.dumps(startParams),
"expectedParallelismNumber": "",
"dryRun": "0",
}
print(req_params)
resp = requests.post(
url,
headers=cls.get_http_header(),
data=req_params
)
resp_json = json.loads(resp.content.decode("utf-8"))
resp_state = bool(resp_json['success'])
if resp_state:
DolphinschedulerHelper.send_startup_state_to_oa(project_name, process_df_name, resp_state)
return True
else:
DolphinschedulerHelper.send_startup_state_to_oa(project_name, process_df_name, resp_state)
raise Exception(f"任务【{project_name}/{process_df_name}】调度失败!")
@classmethod
def send_startup_state_to_oa(cls, project_name: str, process_df_name: str, resp_state: bool):
"""
根据api触发海豚oa消息推送(推送人由维护在海豚调度任务中的wx_user决定)
:param project_name:项目名称
:param process_df_name:流程名称
:param resp_state:任务调度启动状态
:return
"""
from utils.common_util import CommonUtil
wx_user_list = DolphinschedulerHelper.get_process_df_manger(project_name, process_df_name)
title = f"【海豚调度】调度api触发提示"
if resp_state:
msg = f"项目【{project_name}】,流程【{process_df_name}】api任务触发成功!"
else:
msg = f"项目【{project_name}】,流程【{process_df_name}】api任务触发异常,请查看日志!"
if bool(wx_user_list):
CommonUtil.send_wx_msg(wx_user_list, title, msg)
@classmethod
def get_process_df_manger(cls, project_name: str, process_df_name: str):
"""
获取海豚流程定义的全局变量对应的wx_user
:param project_name:
:param process_df_name:
:return:
"""
project_map = cls.get_project_map()
project_code = project_map.get(project_name)
process_df_map: Dict = cls.get_project_df_map(project_code)
process_df_code = process_df_map.get(process_df_name)
url = f"{cls._ip_port}/dolphinscheduler/projects/{project_code}/process-definition/{process_df_code}/view-variables"
resp = requests.get(
url,
headers=cls.get_http_header(),
)
resp_json = json.loads(resp.content.decode("utf-8"))
if bool(resp_json['success']):
globalParams: list = resp_json['data']['globalParams']
paramMap = {param['prop']: param['value'] for param in globalParams}
wx_user = paramMap.get("wx_user") or ""
return wx_user.split(",")
return None
# from utils.DolphinschedulerHelper import DolphinschedulerHelper
from post_to_dolphin import DolphinschedulerHelper
process_df_name = '内部asin-评分评论数计算'
# DolphinschedulerHelper.start_process_instance(
DolphinschedulerHelper.start_process_instance_common(
project_name="big_data_selection",
process_df_name=process_df_name,
startParams={
"site_name": 'us',
"date_type": 'day',
"date_info": '2026-03-01'
},
warning_Type="ALL"
)
# import img2pdf
# from pathlib import Path
#
# imgs = ["1.jpg", "2.png", "3.jpg"] # 顺序就是 PDF 页顺序
# with open("out.pdf", "wb") as f:
# f.write(img2pdf.convert([str(Path(p)) for p in imgs]))
\ No newline at end of file
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.secure_db_client import get_remote_engine
engine = get_remote_engine(
site_name="us", # -> database "selection"
db_type="postgresql_cluster", # -> 服务端 alias "mysql"
user="fangxingjun", # -> 服务端 alias "mysql"
user_token="5f1b2e9c3a4d7f60" # 可不传,走默认
)
# site_name = 'us'
# date_type = 'month'
# date_info = '2026-01'
site_name = sys.argv[1] # 参数1:站点
date_type = sys.argv[2] # 参数2:类型:week/4_week/month/quarter/day
date_info = sys.argv[3] # 参数3:年-周/年-月/年-季/年-月-日, 比如: 2022-1
partitions = {
'site_name': site_name,
'date_type': date_type,
'date_info': date_info,
}
cols_list = ['st_key', 'search_term', 'theme_ch', 'theme_en', 'theme_label_ch', 'theme_label_en', 'date_info', 'created_time', 'updated_time']
engine.sqoop_raw_export(
hive_table='dws_st_theme',
import_table=f'{site_name}_st_theme_detail_{date_info.replace("-", "_")}',
partitions=partitions,
m=1,
cols=','.join(cols_list)
)
"""
author: 方星钧(ffman)
description: 清洗6大站点对应的 单周的zr,sp,sb,ac,bs,er,tr等7大类型数据表(计算zr,sp类型表的page_rank+合并7张表)
table_read_name: ods_st_rank_zr/sp/sb/ac/bs/er/tr
table_save_name: dim_st_asin_info
table_save_level: dim
version: 3.0
created_date: 2022-05-10
updated_date: 2022-11-07
"""
import os
import sys
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from utils.templates import Templates
# from ..utils.templates import Templates
from pyspark.sql.types import IntegerType
class DimStAsinInfo(Templates):
def __init__(self, site_name='us', date_type="day", date_info='2022-10-01'):
super().__init__()
self.site_name = site_name
self.date_type = date_type
self.date_info = date_info
self.db_save = f'temp_me_asin'
self.spark = self.create_spark_object(
app_name=f"{self.db_save}: {self.site_name},{self.date_type}, {self.date_info}")
self.df_date = self.get_year_week_tuple()
self.partitions_by = ['site_name', 'date_type', 'date_info']
self.reset_partitions(partitions_num=10)
def read_data(self):
sql = """SELECT DISTINCT asin
FROM (
SELECT asin
FROM ods_search_term_zr
WHERE site_name = 'us'
AND date_type = 'month_aba_me'
AND date_info = '2025-11'
UNION ALL
SELECT asin
FROM ods_search_term_sp
WHERE site_name = 'us'
AND date_type = 'month_aba_me'
AND date_info = '2025-11'
UNION ALL
SELECT asin
FROM ods_search_term_sb
WHERE site_name = 'us'
AND date_type = 'month_aba_me'
AND date_info = '2025-11'
UNION ALL
SELECT asin
FROM ods_search_term_ac
WHERE site_name = 'us'
AND date_type = 'month_aba_me'
AND date_info = '2025-11'
UNION ALL
SELECT asin
FROM ods_search_term_bs
WHERE site_name = 'us'
AND date_type = 'month_aba_me'
AND date_info = '2025-11'
) t;
"""
df = self.spark.sql(sqlQuery=sql)
df.show(10, truncate=False)
pdf = df.toPandas()
print(f"pdf.shape: {pdf.shape}")
pdf.to_csv("temp_me_asin.csv", index=False)
# self.df_save.show(n=10, truncate=False)
# print("self.df_save.count():", self.df_save.count())
if __name__ == '__main__':
site_name = sys.argv[1] # 参数1:站点
date_type = sys.argv[2] # 参数2:类型:week/4_week/month/quarter/day
date_info = sys.argv[3] # 参数3:年-周/年-月/年-季/年-月-日, 比如: 2022-1
handle_obj = DimStAsinInfo(site_name=site_name, date_type=date_type, date_info=date_info)
handle_obj.read_data()
# ========== 服务端新增接口 ==========
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from sqlalchemy import create_engine, text
from typing import Dict, List
import pandas as pd
app = FastAPI()
db_dict = {
# mysql-阿里云
"mysql": {
'host': 'rm-wz9yg9bsb2zf01ea4yo.mysql.rds.aliyuncs.com',
"driver": "pymysql",
'port': 3306,
'user': 'XP_Yswg2025',
'password': 'VPaaDe2R1MKqvzq2YSpcxu',
'database': 'selection',
'charset': 'utf8mb4',
},
# pg爬虫库-内网
"postgresql_14": {
'host': '192.168.10.223',
"driver": "psycopg2",
'port': 5433,
'user': 'postgres',
'password': 'F9kL2sXe81rZq',
'database': 'selection',
'charset': 'utf8mb4',
},
# pg爬虫库-外网
"postgresql_14_outer": {
'host': '61.145.136.61',
"driver": "psycopg2",
'port': 54328,
'user': 'postgres',
'password': 'F9kL2sXe81rZq',
'database': 'selection',
'charset': 'utf8mb4',
},
# pg正式库-内网
"postgresql_15": {
'host': '192.168.10.224',
"driver": "psycopg2",
'port': 5433,
'user': 'postgres',
'password': 'F9kL2sXe81rZq',
'database': 'selection',
'charset': 'utf8mb4',
},
# pg正式库-外网
"postgresql_15_outer": {
'host': '113.100.143.162',
"driver": "psycopg2",
'port': 5433,
'user': 'postgres',
'password': 'F9kL2sXe81rZq',
'database': 'selection',
'charset': 'utf8mb4',
},
# pg集群-内网
"postgresql_cluster": {
'host': '192.168.10.155',
"driver": "psycopg2",
'port': 6432,
'user': 'postgres',
'password': 'G8m!Q2p9D#f%5x',
'database': 'selection',
'charset': 'utf8mb4',
},
# pg集群-外网
"postgresql_cluster_outer": {
'host': '113.100.143.162',
"driver": "psycopg2",
'port': 6432,
'user': 'postgres',
'password': 'G8m!Q2p9D#f%5x',
'database': 'selection',
'charset': 'utf8mb4',
},
# doris集群-内网
"doris": {
'host': '192.168.10.151',
"driver": "pymysql",
'port': 49030,
'user': 'fangxingjun',
'password': 'fangxingjun12345',
'database': 'selection',
'charset': 'utf8mb4',
},
}
db_engines: Dict[str, any] = {
"main": create_engine("mysql+pymysql://user:password@host1/db_main", pool_size=20),
"analytics": create_engine("mysql+pymysql://user:password@host2/db_analytics", pool_size=15),
"legacy": create_engine("postgresql://user:password@host3/db_legacy", pool_size=10),
}
class QueryRequest(BaseModel):
db: str
sql: str
class InsertRequest(BaseModel):
db: str
table: str
if_exists: str = "append"
data: List[dict]
class TransactionRequest(BaseModel):
db: str
sql_list: List[str]
@app.post("/query")
def query_db(req: QueryRequest):
if req.db not in db_engines:
raise HTTPException(status_code=400, detail="Unknown database alias")
engine = db_engines[req.db]
try:
with engine.connect() as conn:
rows = conn.execute(text(req.sql)).fetchall()
return {"result": [dict(r) for r in rows]}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/insert")
def insert_data(req: InsertRequest):
if req.db not in db_engines:
raise HTTPException(status_code=400, detail="Unknown database alias")
engine = db_engines[req.db]
try:
df = pd.DataFrame(req.data)
df.to_sql(req.table, con=engine, if_exists=req.if_exists, index=False)
return {"success": True, "rows_written": len(df)}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/transaction")
def handle_transaction(req: TransactionRequest):
if req.db not in db_engines:
raise HTTPException(status_code=400, detail="Unknown DB")
engine = db_engines[req.db]
try:
with engine.begin() as conn:
for sql in req.sql_list:
conn.execute(text(sql))
return {"success": True, "sql_executed": len(req.sql_list)}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
\ No newline at end of file
from enum import Enum
class DbTypes(Enum):
"""
导出数据库类型
"""
mysql = "mysql" # 阿里云
postgresql_14 = "postgresql_14" # 爬虫库
postgresql_15 = "postgresql_15" # 正式库
postgresql_test = "postgresql_test" # 测试库
postgresql_cluster = "postgresql_cluster" # 集群
srs = "srs"
doris = "doris"
class DBUtil(object, db_type='mysql', site_name='us'):
"""
"""
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment