1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import os
import sys
from utils.db_conf import build_es_option
from utils.templates import Templates
sys.path.append(os.path.dirname(sys.path[0]))
if __name__ == '__main__':
"""
测试同步数据到到 es 中
"""
tmp = Templates()
spark = tmp.create_spark_object("test_df")
sql = """
select id,
date,
year,
quarter,
month,
week,
day,
week_day,
year_quarter
from big_data_selection.dim_date_20_to_30
limit 10;
"""
df = spark.sql(sql)
df.show()
df.printSchema()
es_conf = build_es_option("us")
es_conf['es.resource'] = "dim_date_20_to_30/_doc"
# 注意次数一定要设置主键为哪个filed
es_conf['es.mapping.id'] = "id"
df.write.mode("append").format("es").options(**es_conf).save()
print("success")