export_asin_desc.py 2.57 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))
from utils.common_util import CommonUtil
from utils.db_util import DBUtil, DbTypes
from utils.spark_util import SparkUtil
from yswg_utils.common_df import get_asin_unlanuch_df
from pyspark.sql import functions as F
from pyspark.sql.types import BooleanType
import re

"""
asin 标题 卖点 查询
"""


class ExportAsinDesc(object):

    def __init__(self, site_name):
        app_name = f"{self.__class__.__name__}"
        self.site_name = site_name
        self.spark = SparkUtil.get_spark_session(app_name)
        self.udf_has_keyword_reg = self.spark.udf.register('udf_has_keyword_reg', self.udf_has_keyword, BooleanType())
        pass

    @staticmethod
    def udf_has_keyword(text: str):
        if text is None:
            return False
        all = re.findall(r"(\s|\b)(US|UK|FR|DE|IT|ES|MX|CA)(\s|\b)", text, flags=re.IGNORECASE)
        return len(all) > 0

    def run(self):
        sql = f"""
        select asin,
               title,
               describe,
               created_at,
               site as site_name
        from us_self_asin_detail_2023_08_14
        where title is not null
        """
        print(sql)
        conn_info = DBUtil.get_connection_info(db_type=DbTypes.postgresql.name, site_name=self.site_name)
        save_df = SparkUtil.read_jdbc_query(
            session=self.spark,
            url=conn_info["url"],
            pwd=conn_info["pwd"],
            username=conn_info["username"],
            query=sql
        ).cache()
        save_df = save_df.where(F.expr("udf_has_keyword_reg(title) = 1"))
        unlanuch_df = get_asin_unlanuch_df(self.site_name, spark_session=self.spark)
        #  排除下架的asin
        save_df = save_df.join(unlanuch_df, on=['asin'], how='left_anti')

        save_df = save_df.select(
            F.col("asin"),
            F.col("title"),
            F.col("describe"),
            F.col("site_name"),
            F.col("created_at"),
        )

        save_df.write.saveAsTable(name="export_asin_desc_tmp", format='hive', mode='overwrite')
        pass

    def export(self):
        """
        导出
        :return:
        """

        # CommonUtil.df_export_csv(self.spark, )

        pass


if __name__ == '__main__':
    site_name = CommonUtil.get_sys_arg(1, None)
    method = CommonUtil.get_sys_arg(2, "run")
    assert site_name is not None, "站点不能为空!"
    assert method is not None, "method不能为空!"
    obj = ExportAsinDesc(site_name)
    if method == 'run':
        obj.run()
    elif method == 'export':
        obj.export()