1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# Inlined from /metadata-ingestion/examples/library/lineage_emitter_dataset_finegrained.py
# 提前操作
# pip install -U acryl-datahub
# pip install -U acryl-datahub[hive]
# pip install -U acryl-datahub[datahub-rest]
import datahub.emitter.mce_builder as builder
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
DatasetLineageType,
FineGrainedLineage,
FineGrainedLineageDownstreamType,
FineGrainedLineageUpstreamType,
Upstream,
UpstreamLineage,
)
platform = "hive"
DEF_DB_NAME = "big_data_selection"
DATAHUB_GMS_URL = "http://hadoop7:8085"
DATAHUB_GMS_TOKEN = None
def make_dataset_urn(tbl):
return builder.make_dataset_urn(platform, tbl)
def schema_field_urn(tbl, fld):
return builder.make_schema_field_urn(make_dataset_urn(tbl), fld)
def build_column_lineages(lineages: list):
"""
全量更新
:param lineages:
:return:
"""
assert len(lineages) > 0, "不能为空!"
fineGrainedLineages = []
check_upstreams = []
for val in lineages:
filed_from = val['from']
filed_to = val['to']
arr1 = filed_from.split(".")
from_tb = arr1[0]
from_col = arr1[1]
arr2 = filed_to.split(".")
to_tb = arr2[0]
to_col = arr2[1]
upstreams_urns = [schema_field_urn(f"{DEF_DB_NAME}.{from_tb}", from_col)]
downstreams_urns = [schema_field_urn(f"{DEF_DB_NAME}.{to_tb}", to_col)]
fineGrainedLineages.append(
FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
upstreams=upstreams_urns,
downstreamType=FineGrainedLineageDownstreamType.FIELD,
downstreams=downstreams_urns,
),
)
check_upstreams.append(Upstream(dataset=make_dataset_urn(f"{DEF_DB_NAME}.{from_tb}"), type=DatasetLineageType.TRANSFORMED))
pass
lineageMcp = MetadataChangeProposalWrapper(
entityUrn=make_dataset_urn(f"{DEF_DB_NAME}.{to_tb}"),
aspect=UpstreamLineage(
upstreams=check_upstreams,
fineGrainedLineages=fineGrainedLineages
),
)
emitter = DatahubRestEmitter(gms_server=DATAHUB_GMS_URL)
emitter.emit_mcp(lineageMcp)
return f"http://hadoop7:9002/dataset/urn:li:dataset:(urn:li:dataPlatform:hive,big_data_selection.{to_tb},PROD)/?is_lineage_mode=true&separate_siblings=false&show_columns=true"
if __name__ == '__main__':
pass