Commit 6d492287 by chenyuanjie

Doris读取方式

parent d913491e
......@@ -164,7 +164,7 @@ class EsAsinProfitRate(object):
)
# 从Doris获取asin_crawl_date(用于利润率主索引写入)
df_crawl_date = DorisHelper.spark_import_with_connector(
df_crawl_date = DorisHelper.spark_import_with_flight(
session=self.spark,
table_identifier=f"selection.{self.site_name}_asin_latest_detail",
read_fields="asin,asin_crawl_date"
......
......@@ -18,6 +18,7 @@ class DorisHelper(object):
"ip": "192.168.10.151",
"http_port": 48030,
"jdbc_port": 49030,
"flight_port": 48071,
"user": "spark",
"pwd": "yswg123"
}
......@@ -80,6 +81,61 @@ class DorisHelper(object):
return reader.load()
"""
通过 Arrow Flight SQL 读取 Doris 表(彻底规避 Doris 3.1+ is_nullable SchemaUtils 问题)
format("doris") 读取方式在 Doris 3.1 下因 SchemaUtils 解析 is_nullable 字段失败,
改为 Arrow Flight SQL 绕开 Doris Connector,Arrow 原生类型映射,无需手动 cast。
:param session: spark 对象
:param table_identifier: doris 表标识,格式为 db.table
:param read_fields: 读取的字段,逗号分隔,为空则读取全部字段
:param use_type: 连接类型
"""
@classmethod
def spark_import_with_flight(cls, session, table_identifier, read_fields=None, use_type='selection'):
import base64
import adbc_driver_flightsql.dbapi as adbc
from adbc_driver_flightsql import DatabaseOptions
print(f"执行 Arrow Flight SQL 读取 Doris 数据任务,表名:{table_identifier},字段:{read_fields}")
try:
connection_info = DorisHelper.get_connection_info(use_type)
db, table = table_identifier.split(".", 1)
fields = read_fields if read_fields else "*"
sql = f"SELECT {fields} FROM {db}.{table}"
auth = "Basic " + base64.b64encode(
f"{connection_info['user']}:{connection_info['pwd']}".encode()
).decode()
conn = adbc.connect(
uri=f"grpc://{connection_info['ip']}:{connection_info['flight_port']}",
db_kwargs={
DatabaseOptions.RPC_CALL_HEADER_PREFIX.value + 'authorization': auth,
DatabaseOptions.WITH_MAX_MSG_SIZE.value: '268435456',
}
)
cursor = conn.cursor()
cursor.execute(sql)
arrow_table = cursor.fetch_arrow_table()
# 旧版 pyarrow 的 RecordBatchReader 无 close() 方法,屏蔽兼容性报错
# fetch_arrow_table() 已将数据全量加载到内存,close 失败不影响数据
try:
cursor.close()
except (AttributeError, RuntimeError):
pass
try:
conn.close()
except (AttributeError, RuntimeError):
pass
print("Arrow Flight SQL 数据读取任务完毕")
# pandas 2.0 移除了 iteritems(),PySpark 3.1.x 内部仍依赖该方法
# 补丁:将 iteritems 指向等价的 items,避免 AttributeError
import pandas as pd
if not hasattr(pd.DataFrame, 'iteritems'):
pd.DataFrame.iteritems = pd.DataFrame.items
return session.createDataFrame(arrow_table.to_pandas())
except Exception as e:
raise e
"""
通过spark通过jdbc方式读取doris生成dataframe
:param session: spark对象
:param query: 查询sql
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment