datahub_util.py 2.53 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
# Inlined from /metadata-ingestion/examples/library/lineage_emitter_dataset_finegrained.py
#  提前操作
#  pip install -U acryl-datahub
#  pip install -U acryl-datahub[hive]
#  pip install -U acryl-datahub[datahub-rest]
import datahub.emitter.mce_builder as builder
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
    DatasetLineageType,
    FineGrainedLineage,
    FineGrainedLineageDownstreamType,
    FineGrainedLineageUpstreamType,
    Upstream,
    UpstreamLineage,
)

platform = "hive"
DEF_DB_NAME = "big_data_selection"
DATAHUB_GMS_URL = "http://hadoop7:8085"
DATAHUB_GMS_TOKEN = None


def make_dataset_urn(tbl):
    return builder.make_dataset_urn(platform, tbl)


def schema_field_urn(tbl, fld):
    return builder.make_schema_field_urn(make_dataset_urn(tbl), fld)


def build_column_lineages(lineages: list):
    """
    全量更新
    :param lineages:
    :return:
    """
    assert len(lineages) > 0, "不能为空!"

    fineGrainedLineages = []
    check_upstreams = []

    for val in lineages:
        filed_from = val['from']
        filed_to = val['to']
        arr1 = filed_from.split(".")
        from_tb = arr1[0]
        from_col = arr1[1]

        arr2 = filed_to.split(".")
        to_tb = arr2[0]
        to_col = arr2[1]

        upstreams_urns = [schema_field_urn(f"{DEF_DB_NAME}.{from_tb}", from_col)]
        downstreams_urns = [schema_field_urn(f"{DEF_DB_NAME}.{to_tb}", to_col)]

        fineGrainedLineages.append(
            FineGrainedLineage(
                upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
                upstreams=upstreams_urns,
                downstreamType=FineGrainedLineageDownstreamType.FIELD,
                downstreams=downstreams_urns,
            ),
        )

        check_upstreams.append(Upstream(dataset=make_dataset_urn(f"{DEF_DB_NAME}.{from_tb}"), type=DatasetLineageType.TRANSFORMED))
        pass

    lineageMcp = MetadataChangeProposalWrapper(
        entityUrn=make_dataset_urn(f"{DEF_DB_NAME}.{to_tb}"),
        aspect=UpstreamLineage(
            upstreams=check_upstreams,
            fineGrainedLineages=fineGrainedLineages
        ),
    )

    emitter = DatahubRestEmitter(gms_server=DATAHUB_GMS_URL)
    emitter.emit_mcp(lineageMcp)
    return f"http://hadoop7:9002/dataset/urn:li:dataset:(urn:li:dataPlatform:hive,big_data_selection.{to_tb},PROD)/?is_lineage_mode=true&separate_siblings=false&show_columns=true"


if __name__ == '__main__':
    pass