dim_asin_title_parse.py 1.97 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))

from utils.common_util import CommonUtil
from yswg_utils.common_udf import udf_title_number_parse_reg
from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil
from pyspark.sql import functions as F

"""
解析asin标题 获取标题中的信息
全量解析当前所有asin
"""


class DimAsinTitleParse(object):

    def __init__(self, site_name):
        self.site_name = site_name
        app_name = f"{self.__class__.__name__}:{site_name}"
        self.spark = SparkUtil.get_spark_session(app_name)
        self.udf_title_number_parse_reg = udf_title_number_parse_reg()

        self.hive_tb = "dim_asin_title_parse"
        hdfs_path = f"/home/{SparkUtil.DEF_USE_DB}/dim/{self.hive_tb}/site_name={self.site_name}"
        print(f"清除hdfs目录中.....{hdfs_path}")
        HdfsUtils.delete_hdfs_file(hdfs_path)

    def run(self):
        sql = f"""
                select asin,
                asin_title
                from dim_cal_asin_history_detail
                where site_name = '{self.site_name}'
                and asin_title is not null  and  asin_title != ''
"""
        print("======================查询sql如下======================")
        print(sql)
        df_save = self.spark.sql(sql)

        df_save = df_save.select(
            "asin",
            self.udf_title_number_parse_reg("asin_title").alias("title_num_json"),
            F.lit(self.site_name).alias("site_name"),
        ).where("title_num_json is not null")

        # 分区数量调整为2个
        df_save = df_save.repartition(6)
        partition_by = ["site_name"]
        print(f"当前存储的表名为:{self.hive_tb},分区为{partition_by}", )
        df_save.write.saveAsTable(name=self.hive_tb, format='hive', mode='append', partitionBy=partition_by)
        print("success")


if __name__ == '__main__':
    site_name = CommonUtil.get_sys_arg(1, None)
    obj = DimAsinTitleParse(site_name)
    obj.run()