import os import sys from utils.hdfs_utils import HdfsUtils from utils.spark_util import SparkUtil sys.path.append(os.path.dirname(sys.path[0])) if __name__ == '__main__': sql = """ select col1, col2, col3, site_name from tmp_wjc_test; """ spark = SparkUtil.get_spark_session("tmp_wjc_test") # df_save = spark.sql(sql).cache() df_save = spark.sql(sql) df_save.show() rows = [ {"col1": "1", "col2": "1", "col3": "1", "site_name": "us"}, {"col1": "2", "col2": "2", "col3": "2", "site_name": "us"}, {"col1": "3", "col2": "3", "col3": "3", "site_name": "us"}, {"col1": "4", "col2": "4", "col3": "4", "site_name": "us"}, ] append = spark.createDataFrame(rows) append.show() # 此处可以写入另一个表 df_save = df_save.unionAll(append).cache() df_save.show() # # # 本地表更新 # df_save.write.saveAsTable(name="tmp_wjc_test_back", format='hive', mode='overwrite', partitionBy=['site_name']) # rel_save = spark.sql("select * from tmp_wjc_test_back") HdfsUtils.delete_file_in_folder("/home/big_data_selection/tmp/tmp_wjc_test/site_name=us") df_save.write.saveAsTable(name="tmp_wjc_test", format='hive', mode='append', partitionBy=['site_name']) print("success")