import os import sys sys.path.append(os.path.dirname(sys.path[0])) from utils.common_util import CommonUtil from utils.hdfs_utils import HdfsUtils from utils.spark_util import SparkUtil class SqlExportCsv(object): def __init__(self, sql, csv_name): self.sql = sql self.csv_name = csv_name assert sql is not None, "sql 不能为空" assert csv_name is not None, "csv_name 不能为空" app_name = f"{self.__class__.__name__}" self.spark = SparkUtil.get_spark_session(app_name) def run(self): print("======================查询sql如下======================") print(self.sql) df_all = self.spark.sql(self.sql) CommonUtil.df_export_csv(self.spark, df_all, csv_name) pass if __name__ == '__main__': sql = """ select t1.search_term, st_rank, first_match_brand, st_brand_label from (select search_term, first_match_brand, st_brand_label from dws_st_brand_info where site_name = 'us' and date_type = 'month' and date_info = '2023-03') t1 left join (select search_term, st_rank from dim_st_detail where site_name = 'us' and date_type = 'month' and date_info = '2023-03') t2 on t1.search_term = t2.search_term """ csv_name = "dws_st_brand_info_202303" obj = SqlExportCsv(sql, csv_name) obj.run()