import os import sys from utils.db_conf import build_es_option from utils.templates import Templates sys.path.append(os.path.dirname(sys.path[0])) if __name__ == '__main__': """ 测试同步数据到到 es 中 """ tmp = Templates() spark = tmp.create_spark_object("test_df") sql = """ select id, date, year, quarter, month, week, day, week_day, year_quarter from big_data_selection.dim_date_20_to_30 limit 10; """ df = spark.sql(sql) df.show() df.printSchema() es_conf = build_es_option("us") es_conf['es.resource'] = "dim_date_20_to_30/_doc" # 注意次数一定要设置主键为哪个filed es_conf['es.mapping.id'] = "id" df.write.mode("append").format("es").options(**es_conf).save() print("success")