import os import sys sys.path.append(os.path.dirname(sys.path[0])) from utils.common_util import CommonUtil from utils.db_util import DBUtil, DbTypes from utils.spark_util import SparkUtil from yswg_utils.common_df import get_asin_unlanuch_df from pyspark.sql import functions as F from pyspark.sql.types import BooleanType import re """ asin 标题 卖点 查询 """ class ExportAsinDesc(object): def __init__(self, site_name): app_name = f"{self.__class__.__name__}" self.site_name = site_name self.spark = SparkUtil.get_spark_session(app_name) self.udf_has_keyword_reg = self.spark.udf.register('udf_has_keyword_reg', self.udf_has_keyword, BooleanType()) pass @staticmethod def udf_has_keyword(text: str): if text is None: return False all = re.findall(r"(\s|\b)(US|UK|FR|DE|IT|ES|MX|CA)(\s|\b)", text, flags=re.IGNORECASE) return len(all) > 0 def run(self): sql = f""" select asin, title, describe, created_at, site as site_name from us_self_asin_detail_2023_08_14 where title is not null """ print(sql) conn_info = DBUtil.get_connection_info(db_type=DbTypes.postgresql.name, site_name=self.site_name) save_df = SparkUtil.read_jdbc_query( session=self.spark, url=conn_info["url"], pwd=conn_info["pwd"], username=conn_info["username"], query=sql ).cache() save_df = save_df.where(F.expr("udf_has_keyword_reg(title) = 1")) unlanuch_df = get_asin_unlanuch_df(self.site_name, spark_session=self.spark) # 排除下架的asin save_df = save_df.join(unlanuch_df, on=['asin'], how='left_anti') save_df = save_df.select( F.col("asin"), F.col("title"), F.col("describe"), F.col("site_name"), F.col("created_at"), ) save_df.write.saveAsTable(name="export_asin_desc_tmp", format='hive', mode='overwrite') pass def export(self): """ 导出 :return: """ # CommonUtil.df_export_csv(self.spark, ) pass if __name__ == '__main__': site_name = CommonUtil.get_sys_arg(1, None) method = CommonUtil.get_sys_arg(2, "run") assert site_name is not None, "站点不能为空!" assert method is not None, "method不能为空!" obj = ExportAsinDesc(site_name) if method == 'run': obj.run() elif method == 'export': obj.export()