test_udf.py 1.35 KB
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from pyspark.sql.types import StringType
from utils.templates import Templates
from google.cloud import translate_v2 as translate


class Test(Templates):

    def __init__(self):
        super().__init__()
        self.spark = self.create_spark_object(app_name=f"test")
        self.df_st = self.spark.sql(f"select 1+1;")
        self.translate_client = translate.Client()
        # 自定义udf
        self.u_translate_text = self.spark.udf.register('translate_text', self.translate_text, StringType())

    def translate_text(self, word: str, target_language='zh'):
        result = self.translate_client.translate(word, target_language=target_language)
        return result['translatedText']

    def read_data(self):
        sql1 = f"""
            select 
                search_term 
            from dwt_aba_last365 
            where site_name = 'us' 
              and date_type = 'last365day' 
              and date_info = '2023-12';
        """
        self.df_st = self.spark.sql(sql1).limit(20).cache()

    def handle_data(self):
        self.df_st = self.df_st.withColumn(
            'translate_text',
            self.u_translate_text(self.df_st['search_term'])
        )
        self.df_st.show(20, False)


if __name__ == '__main__':
    handle_obj = Test()
    handle_obj.run()