test_overwrite_insert.py 1.34 KB
import os
import sys

from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil

sys.path.append(os.path.dirname(sys.path[0]))

if __name__ == '__main__':
    sql = """
            select col1,
                   col2,
                   col3,
                   site_name
            from tmp_wjc_test;
            """

    spark = SparkUtil.get_spark_session("tmp_wjc_test")

    # df_save = spark.sql(sql).cache()
    df_save = spark.sql(sql)
    df_save.show()

    rows = [
        {"col1": "1", "col2": "1", "col3": "1", "site_name": "us"},
        {"col1": "2", "col2": "2", "col3": "2", "site_name": "us"},
        {"col1": "3", "col2": "3", "col3": "3", "site_name": "us"},
        {"col1": "4", "col2": "4", "col3": "4", "site_name": "us"},
    ]
    append = spark.createDataFrame(rows)
    append.show()

    #  此处可以写入另一个表
    df_save = df_save.unionAll(append).cache()

    df_save.show()
    #
    # #  本地表更新
    # df_save.write.saveAsTable(name="tmp_wjc_test_back", format='hive', mode='overwrite', partitionBy=['site_name'])
    # rel_save = spark.sql("select * from tmp_wjc_test_back")
    HdfsUtils.delete_file_in_folder("/home/big_data_selection/tmp/tmp_wjc_test/site_name=us")
    df_save.write.saveAsTable(name="tmp_wjc_test", format='hive', mode='append', partitionBy=['site_name'])
    print("success")