1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import os
import sys
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from pyspark.sql.types import StringType
from utils.templates import Templates
from google.cloud import translate_v2 as translate
class Test(Templates):
def __init__(self):
super().__init__()
self.spark = self.create_spark_object(app_name=f"test")
self.df_st = self.spark.sql(f"select 1+1;")
self.translate_client = translate.Client()
# 自定义udf
self.u_translate_text = self.spark.udf.register('translate_text', self.translate_text, StringType())
def translate_text(self, word: str, target_language='zh'):
result = self.translate_client.translate(word, target_language=target_language)
return result['translatedText']
def read_data(self):
sql1 = f"""
select
search_term
from dwt_aba_last365
where site_name = 'us'
and date_type = 'last365day'
and date_info = '2023-12';
"""
self.df_st = self.spark.sql(sql1).limit(20).cache()
def handle_data(self):
self.df_st = self.df_st.withColumn(
'translate_text',
self.u_translate_text(self.df_st['search_term'])
)
self.df_st.show(20, False)
if __name__ == '__main__':
handle_obj = Test()
handle_obj.run()