export_title_tmp.py 2.05 KB
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))
from utils.spark_util import SparkUtil
from pyspark.sql import functions as F
from utils.common_util import CommonUtil
from yswg_utils.udf_util import UdfUtil
from pyspark.sql.types import ArrayType, IntegerType, StringType

"""
asin 标题分词
"""


class ExportTitleTmp(object):

    def __init__(self, site_name):
        app_name = f"{self.__class__.__name__}"
        self.site_name = site_name
        self.spark = SparkUtil.get_spark_session(app_name)
        self.udf_word_tokenize_reg = F.udf(self.udf_word_tokenize, ArrayType(StringType()))
        pass

    @staticmethod
    def udf_word_tokenize(title: str):
        if title is None:
            return None
        result = UdfUtil.word_tokenize(title)
        filter_arr = [
            " ", "\t", "\r", "\n", "(", ")", ",", ",", "[", "]", "、", "-", ":", "&", "|", "+", "``", "''",
        ]
        return list(filter(lambda x: x not in filter_arr, result))

    def run(self):
        assert self.site_name is not None, "站点不能为空!"
        sql = f"""
        select asin_title
        from big_data_selection.dim_cal_asin_history_detail
        where site_name = '{self.site_name}'
          and asin_title is not null
          and asin_crawl_date > '2022-06-16'
"""

        save_df = self.spark.sql(sql)

        word_df = save_df.select(
            F.explode(self.udf_word_tokenize_reg(F.col("asin_title"))).alias("word")
        )

        df_save = word_df.groupby("word") \
            .agg(
            F.count(F.col("word")).alias("count")
        )

        df_save.write.saveAsTable(name="word_count_tmp1", format='hive', mode='overwrite')
        pass


if __name__ == '__main__':
    try:
        site_name = CommonUtil.get_sys_arg(1, None)
        obj = ExportTitleTmp(site_name)
        obj.run()
        CommonUtil.send_wx_msg(['wujicang'], "提醒", "asin标题count完成!")
    except:
        import traceback

        traceback.print_exc()
        CommonUtil.send_wx_msg(['wujicang'], "提醒", "asin标题异常!")