import os import sys sys.path.append(os.path.dirname(sys.path[0])) from utils.spark_util import SparkUtil from pyspark.sql import functions as F from utils.common_util import CommonUtil from yswg_utils.udf_util import UdfUtil from pyspark.sql.types import ArrayType, IntegerType, StringType """ asin 标题分词 """ class ExportTitleTmp(object): def __init__(self, site_name): app_name = f"{self.__class__.__name__}" self.site_name = site_name self.spark = SparkUtil.get_spark_session(app_name) self.udf_word_tokenize_reg = F.udf(self.udf_word_tokenize, ArrayType(StringType())) pass @staticmethod def udf_word_tokenize(title: str): if title is None: return None result = UdfUtil.word_tokenize(title) filter_arr = [ " ", "\t", "\r", "\n", "(", ")", ",", ",", "[", "]", "、", "-", ":", "&", "|", "+", "``", "''", ] return list(filter(lambda x: x not in filter_arr, result)) def run(self): assert self.site_name is not None, "站点不能为空!" sql = f""" select asin_title from big_data_selection.dim_cal_asin_history_detail where site_name = '{self.site_name}' and asin_title is not null and asin_crawl_date > '2022-06-16' """ save_df = self.spark.sql(sql) word_df = save_df.select( F.explode(self.udf_word_tokenize_reg(F.col("asin_title"))).alias("word") ) df_save = word_df.groupby("word") \ .agg( F.count(F.col("word")).alias("count") ) df_save.write.saveAsTable(name="word_count_tmp1", format='hive', mode='overwrite') pass if __name__ == '__main__': try: site_name = CommonUtil.get_sys_arg(1, None) obj = ExportTitleTmp(site_name) obj.run() CommonUtil.send_wx_msg(['wujicang'], "提醒", "asin标题count完成!") except: import traceback traceback.print_exc() CommonUtil.send_wx_msg(['wujicang'], "提醒", "asin标题异常!")