""" author: wangrui description: 根据dim_st_asin_info表得到新增的zr类型asin信息并导出到pg数据库存储 table_read_name: dim_st_asin_info table_save_name: dwt_zr_asin_info table_save_level: dwt version: 2.0 created_date: 2023-04-23 updated_date: 2023-04-23 """ import os import sys sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from utils.templates import Templates from pyspark.sql import functions as F from pyspark.sql.types import StringType import re from pyspark.storagelevel import StorageLevel class DwtZrAsinInfo(Templates): def __init__(self, site_name="us"): super().__init__() self.site_name = site_name self.db_save = f"dwt_zr_asin_info" self.spark = self.create_spark_object(app_name=f"{self.db_save} {self.site_name}") self.df_date = self.get_year_week_tuple() self.get_date_info_tuple() self.df_save = self.spark.sql(f"select 1+1;") self.df_asin_info = self.spark.sql(f"select 1+1;") self.partitions_by = ['site_name', 'asin_type'] self.reset_partitions(30) self.u_get_zr_asin_type = self.spark.udf.register("u_get_zr_asin_type", self.udf_get_zr_asin_type, StringType()) self.u_change_data_type = self.spark.udf.register("u_change_data_type", self.udf_change_data_type, StringType()) @staticmethod def udf_get_zr_asin_type(asin): pattern_list = [ ("^B0[1-6][A-Z0-9]*", "B01_B06"), ("^B07[0-9][A-Z0-9]*", "B070_B079"), ("^B07[A-Z][A-Z0-9]*", "B07A_B07Z"), ("^B08[0-9][A-Z0-9]*", "B080_B089"), ("^B08[A-Z][A-Z0-9]*", "B08A_B08Z"), ("^B09[0-9][A-Z0-9]*", "B090_B099"), ("^B09[A-N][A-Z0-9]*", "B09A_B09N"), ("^B09[O-Z][A-Z0-9]*", "B09O_B09Z"), ("^B0B[0-9][A-Z0-9]*", "B0B0_B0B9"), ("^B0B[A-G][A-Z0-9]*", "B0BA_B0BG"), ("^B0B[H-K][A-Z0-9]*", "B0BH_B0BK"), ("^B0B[L-N][A-Z0-9]*", "B0BL_B0BN"), ("^B0B[O-R][A-Z0-9]*", "B0BO_B0BR"), ("^B0B[S-V][A-Z0-9]*", "B0BS_B0BV"), ("^B0B[W-Z][A-Z0-9]*", "B0BW_B0BZ") ] if asin is not None: asin_type = 'OTHER' asin = str(asin).upper() for pattern, type_value in pattern_list: if re.match(pattern, asin): asin_type = type_value break return asin_type @staticmethod def udf_change_data_type(column): column = str(column).replace("[", "{").replace("]", "}") return column def read_data(self): print("1. 读取dwt_flow_asin表") sql = f"""select asin, asin_zr_counts, asin_sp_counts, asin_sb_counts, round(asin_ao_val,3) as asin_ao_val, asin_price, bsr_orders as asin_bsr_orders, round(sales, 2) as bsr_orders_sale, concat(date_info, '-15') as cur_time from dwt_flow_asin where site_name='{self.site_name}' and date_type='month' """ print("sql:", sql) self.df_asin_info = self.spark.sql(sqlQuery=sql) self.df_asin_info = self.df_asin_info.drop_duplicates(['asin', 'cur_time']) self.df_asin_info = self.df_asin_info.repartition(100).persist(StorageLevel.DISK_ONLY) self.df_asin_info.show(10, truncate=False) def get_asin_info_list(self): self.df_asin_info = self.df_asin_info.na.fill( {"asin_ao_val": -1.0, "asin_zr_counts": -1, "asin_sp_counts": -1, "asin_sb_counts": -1, "asin_bsr_orders": -1, "asin_price": -1.0, "bsr_orders_sale": -1}) df_asin_info_agg = self.df_asin_info.groupby(['asin']).agg(F.sort_array( F.collect_list(F.struct("cur_time", "asin_ao_val", "asin_zr_counts", "asin_sp_counts", "asin_sb_counts", "asin_price", "asin_bsr_orders","bsr_orders_sale"))).alias("values")) df_asin_info_agg = df_asin_info_agg.\ select("asin", F.expr("transform(values, x -> x.cur_time)").alias("times_array"), F.expr("transform(values, x -> x.asin_ao_val)").alias("ao_array"), F.expr("transform(values, x -> x.asin_zr_counts)").alias("zr_count_array"), F.expr("transform(values, x -> x.asin_sp_counts)").alias("sp_count_array"), F.expr("transform(values, x -> x.asin_sb_counts)").alias("sb_count_array"), F.expr("transform(values, x -> x.asin_price)").alias("price_array"), F.expr("transform(values, x -> x.asin_bsr_orders)").alias("bsr_orders_array"), F.expr("transform(values, x -> x.bsr_orders_sale)").alias("bsr_orders_sale_array")) for column in ['times_array', 'ao_array', 'zr_count_array', 'sp_count_array', 'sb_count_array', 'price_array', 'bsr_orders_array', 'bsr_orders_sale_array']: new_column = column.replace("array", "list") df_asin_info_agg = df_asin_info_agg.withColumn(new_column, self.u_change_data_type(df_asin_info_agg[column])) df_asin_info_agg = df_asin_info_agg.drop(column).persist(StorageLevel.DISK_ONLY) df_asin_info_agg.show(10, truncate=False) self.df_save = df_asin_info_agg.withColumn("orders_list", F.lit(None)) def handle_data_group(self): self.df_save = self.df_save.withColumn("asin_type", self.u_get_zr_asin_type(self.df_save.asin)) self.df_save = self.df_save.withColumn("created_time", F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS')). \ withColumn("updated_time", F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS')) self.df_save = self.df_save.withColumn("site_name", F.lit(self.site_name)) def handle_data(self): self.get_asin_info_list() self.handle_data_group() if __name__ == '__main__': site_name = sys.argv[1] # 参数1:站点 handle_obj = DwtZrAsinInfo(site_name=site_name) handle_obj.run()