test_export_es.py 923 Bytes
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
import os
import sys

from utils.db_conf import build_es_option
from utils.templates import Templates

sys.path.append(os.path.dirname(sys.path[0]))

if __name__ == '__main__':
    """
    测试同步数据到到 es 中
    """
    tmp = Templates()
    spark = tmp.create_spark_object("test_df")
    sql = """
            select id,
               date,
               year,
               quarter,
               month,
               week,
               day,
               week_day,
               year_quarter
        from big_data_selection.dim_date_20_to_30
        limit 10;
    """
    df = spark.sql(sql)
    df.show()
    df.printSchema()
    es_conf = build_es_option("us")
    es_conf['es.resource'] = "dim_date_20_to_30/_doc"
    #  注意次数一定要设置主键为哪个filed
    es_conf['es.mapping.id'] = "id"

    df.write.mode("append").format("es").options(**es_conf).save()

    print("success")