dim_asin_img_path.py 2.68 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))
from utils.db_util import DBUtil

from pyspark.sql import functions as F
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil

"""
对 asin图片本地存储地址进行存档,同时进行预处理
"""


class DimAsinImgPath(object):

    def __init__(self, site_name):
        self.site_name = site_name
        app_name = f"{self.__class__.__name__}:{site_name}"
        self.spark = SparkUtil.get_spark_session(app_name)
        self.hive_tb = "dim_asin_img_path"

    def run(self):
        sql = f"""
                select id,
                asin,
                asin_img_url,
                asin_img_path,
                created_at,
                bsr_cate_current_id
                from ods_asin_img_path 
                where site_name = '{self.site_name}'
                and asin_img_url is not null
                and asin_img_url != 'null'
                and asin_img_path is not null
"""
        print("======================查询sql如下======================")
        print(sql)
        df_save = self.spark.sql(sql)

        if df_save.first() == None:
            print("============================无数据跳过===================================")
            return

        path_sql = f"""
        select id as bsr_cate_current_id,
        category_id
        from us_bs_category
        """
        conn_info = DBUtil.get_connection_info("mysql", "us")
        id_df = SparkUtil.read_jdbc_query(
            session=self.spark,
            url=conn_info["url"],
            pwd=conn_info["pwd"],
            username=conn_info["username"],
            query=path_sql
        )
        #  todo
        df_save = df_save.join(id_df, on='bsr_cate_current_id').select(
            F.col("id"),
            F.col("asin"),
            F.col("asin_img_url"),
            F.col("asin_img_path"),
            F.col("created_at"),
            df_save["bsr_cate_current_id"],
            F.col("category_id"),
            F.lit(self.site_name).alias("site_name"),
        )

        partition_dict = {
            "site_name": self.site_name
        }
        hdfs_path = CommonUtil.build_hdfs_path(self.hive_tb, partition_dict=partition_dict)
        HdfsUtils.delete_hdfs_file(hdfs_path)
        partition_by = list(partition_dict.keys())

        print(f"当前存储的表名为:{self.hive_tb},分区为{partition_by}", )
        df_save.write.saveAsTable(name=self.hive_tb, format='hive', mode='append', partitionBy=partition_by)
        print("success")


if __name__ == '__main__':
    site_name = CommonUtil.get_sys_arg(1, None)
    obj = DimAsinImgPath(site_name)
    obj.run()