test_overwrite_insert.py
1.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import os
import sys
from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil
sys.path.append(os.path.dirname(sys.path[0]))
if __name__ == '__main__':
sql = """
select col1,
col2,
col3,
site_name
from tmp_wjc_test;
"""
spark = SparkUtil.get_spark_session("tmp_wjc_test")
# df_save = spark.sql(sql).cache()
df_save = spark.sql(sql)
df_save.show()
rows = [
{"col1": "1", "col2": "1", "col3": "1", "site_name": "us"},
{"col1": "2", "col2": "2", "col3": "2", "site_name": "us"},
{"col1": "3", "col2": "3", "col3": "3", "site_name": "us"},
{"col1": "4", "col2": "4", "col3": "4", "site_name": "us"},
]
append = spark.createDataFrame(rows)
append.show()
# 此处可以写入另一个表
df_save = df_save.unionAll(append).cache()
df_save.show()
#
# # 本地表更新
# df_save.write.saveAsTable(name="tmp_wjc_test_back", format='hive', mode='overwrite', partitionBy=['site_name'])
# rel_save = spark.sql("select * from tmp_wjc_test_back")
HdfsUtils.delete_file_in_folder("/home/big_data_selection/tmp/tmp_wjc_test/site_name=us")
df_save.write.saveAsTable(name="tmp_wjc_test", format='hive', mode='append', partitionBy=['site_name'])
print("success")