import os import sys sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from pyspark.sql.types import StringType from utils.templates import Templates from google.cloud import translate_v2 as translate class Test(Templates): def __init__(self): super().__init__() self.spark = self.create_spark_object(app_name=f"test") self.df_st = self.spark.sql(f"select 1+1;") self.translate_client = translate.Client() # 自定义udf self.u_translate_text = self.spark.udf.register('translate_text', self.translate_text, StringType()) def translate_text(self, word: str, target_language='zh'): result = self.translate_client.translate(word, target_language=target_language) return result['translatedText'] def read_data(self): sql1 = f""" select search_term from dwt_aba_last365 where site_name = 'us' and date_type = 'last365day' and date_info = '2023-12'; """ self.df_st = self.spark.sql(sql1).limit(20).cache() def handle_data(self): self.df_st = self.df_st.withColumn( 'translate_text', self.u_translate_text(self.df_st['search_term']) ) self.df_st.show(20, False) if __name__ == '__main__': handle_obj = Test() handle_obj.run()