test_export_es.py 923 Bytes
import os
import sys

from utils.db_conf import build_es_option
from utils.templates import Templates

sys.path.append(os.path.dirname(sys.path[0]))

if __name__ == '__main__':
    """
    测试同步数据到到 es 中
    """
    tmp = Templates()
    spark = tmp.create_spark_object("test_df")
    sql = """
            select id,
               date,
               year,
               quarter,
               month,
               week,
               day,
               week_day,
               year_quarter
        from big_data_selection.dim_date_20_to_30
        limit 10;
    """
    df = spark.sql(sql)
    df.show()
    df.printSchema()
    es_conf = build_es_option("us")
    es_conf['es.resource'] = "dim_date_20_to_30/_doc"
    #  注意次数一定要设置主键为哪个filed
    es_conf['es.mapping.id'] = "id"

    df.write.mode("append").format("es").options(**es_conf).save()

    print("success")