Commit 252e1e6b by chenyuanjie

使用Spark-Doris-connector批量读取Doris表数据

parent 906830e1
......@@ -59,6 +59,27 @@ class DorisHelper(object):
raise e
"""
通过 Doris Spark Connector 并行读取 Doris 表,适合大数据量场景
:param session: spark对象
:param table_identifier: doris表标识,格式为 db.table
:param read_fields: 读取的字段,逗号分隔,为空则读取全部字段
:param use_type: 连接类型
"""
@classmethod
def spark_import_with_connector(cls, session, table_identifier, read_fields=None, use_type='selection'):
print(f"执行读取Doris数据任务,表名:{table_identifier},字段:{read_fields}")
connection_info = DorisHelper.get_connection_info(use_type)
reader = session.read.format("doris") \
.option("doris.fenodes", f"{connection_info['ip']}:{connection_info['http_port']}") \
.option("user", connection_info['user']) \
.option("password", connection_info['pwd']) \
.option("doris.table.identifier", table_identifier)
if read_fields:
reader = reader.option("doris.read.field", read_fields)
return reader.load()
"""
通过spark通过jdbc方式读取doris生成dataframe
:param session: spark对象
:param query: 查询sql
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment