import json import os import re import sys import time import traceback import zlib import pandas as pd import redis from datetime import datetime sys.path.append("/opt/module/spark-3.2.0-bin-hadoop3.2/demo/py_demo/") sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from utils.templates import Templates # from ..utils.templates import Templates from utils.templates_mysql import TemplatesMysql # from ..utils.templates_mysql import TemplatesMysql from pyspark.sql.types import IntegerType from pyspark.sql import functions as F from pyspark.sql.types import * from yswg_utils.common_udf import udf_rank_and_category # from ..yswg_utils.common_udf import udf_rank_and_category from yswg_utils.common_df import get_node_first_id_df from kafka import KafkaConsumer, TopicPartition from yswg_utils.common_udf import parse_weight_str # from ..yswg_utils.common_udf import parse_weight_str from utils.db_util import DbTypes, DBUtil from yswg_utils.common_udf import udf_parse_seller_json from yswg_utils.common_udf import udf_extract_weight_format from yswg_utils.common_udf import udf_extract_volume_format from utils.common_util import CommonUtil class DimStAsinInfo(Templates): def __init__(self, site_name='us', date_type="day", date_info='2022-10-01', consumer_type='lastest', topic_name="us_asin_detail", batch_size=100000): super().__init__() self.site_name = site_name self.date_type = date_type self.date_info = date_info self.consumer_type = consumer_type # 消费实时还是消费历史 self.topic_name = topic_name # 主题名字 self.batch_size = batch_size self.batch_size_history = int(batch_size / 10) # 连接到Redis服务器 self.redis_db = { "us": 0, "uk": 1, "de": 2, "es": 3, "fr": 4, "it": 5, } self.client = redis.Redis(host='192.168.10.224', port=6379, db=self.redis_db[self.site_name], password='yswg2023') self.db_save = f'kafka_asin_detail' self.app_name = self.get_app_name() self.spark = self.create_spark_object( app_name=f"{self.app_name}") self.get_date_info_tuple() self.df_save = self.spark.sql(f"select 1+1;") # self.df_st_asin = self.spark.sql(f"select 1+1;") self.df_bs_report = self.spark.sql(f"select 1+1;") self.df_asin_bs = self.spark.sql(f"select 1+1;") self.df_self_asin = self.spark.sql(f"select 1+1;") self.df_asin_sku = self.spark.sql(f"select 1+1;") self.df_fd_asin_info = self.spark.sql(f"select 1+1;") self.df_asin_measure = self.spark.sql(f"select 1+1;") self.df_asin_templates = self.spark.sql("select asin_zr_counts, asin_sp_counts, asin_sb1_counts,asin_sb2_counts,asin_sb3_counts,asin_ac_counts,asin_bs_counts,asin_er_counts,asin_tr_counts from dwd_asin_measure limit 0") self.df_asin_counts = self.spark.sql("select asin_zr_counts, asin_sp_counts, asin_sb1_counts,asin_sb2_counts,asin_sb3_counts,asin_ac_counts,asin_bs_counts,asin_er_counts,asin_tr_counts from dwd_asin_measure limit 0") self.schema = self.init_schema() # self.u_rank_and_category = self.spark.udf.register("u_rank_and_category", udf_rank_and_category, schema) schema = StructType([ StructField('asin_bs_cate_1_rank', StringType(), True), StructField('rank_and_category', StringType(), True), ]) self.u_rank_and_category = self.spark.udf.register("u_rank_and_category", self.udf_rank_and_category, schema) self.u_cal_crc32 = self.spark.udf.register("u_cal_crc32", self.udf_cal_crc32, IntegerType()) self.u_cal_bkdr = self.spark.udf.register("u_cal_bkdr", self.udf_cal_bkdr, IntegerType()) self.u_extract_dimensions = self.spark.udf.register("u_extract_dimensions", udf_extract_volume_format, StringType()) self.u_extract_weight = self.spark.udf.register("u_extract_weight", udf_extract_weight_format, StringType()) seller_schema = StructType([ StructField("buy_box_seller_type", IntegerType(), True), StructField("account_name", StringType(), True), StructField("account_id", StringType(), True) ]) self.u_parse_seller_info = self.spark.udf.register('u_parse_seller_info', udf_parse_seller_json, seller_schema) self.pattern_1_rank_str = { "us": "(\d+).*?See Top 100 in ", "uk": "(\d+).*?See Top 100 in ", "de": "(\d+).*?Siehe Top 100 in ", "es": "(\d+).*?Ver el Top 100 en ", "fr": "(\d+).*?Voir les 100 premiers en ", "it": "(\d+).*?Visualizza i Top 100 nella categoria " } # 匹配一级分类的排名 self.pattern_str = { "us": "(\d+ in [\w&' ]+)", "uk": "(\d+ in [\w&' ]+)", "de": "Nr. (\d+ in [\w&' ]+)", "es": "nº(\d+ en [\w&' ]+)", "fr": "(\d+ en [\w&' ]+)", "it": "n. (\d+ in [\w&' ]+)", } # 匹配排名和分类 self.replace_str = { "us": "See Top 100 in ", "uk": "See Top 100 in ", "de": "Siehe Top 100 in ", "es": "Ver el Top 100 en ", "fr": "Voir les 100 premiers en ", "it": "Visualizza i Top 100 nella categoria ", } # 去掉top100匹配 # 连接mysql self.engine_mysql = DBUtil.get_db_engine(db_type=DbTypes.mysql.name, site_name=self.site_name) # self.beginning_offsets = 326_0000 if self.site_name == 'us' else 0 def get_connection(self): return TemplatesMysql(site_name=self.site_name).mysql_connect() def fetch_self_asin(self): while True: try: sql = f"""SELECT asin, 1 as isSelfAsin from {self.site_name}_self_asin""" df_self_asin = pd.read_sql(sql, con=self.engine_mysql) schema = StructType([ StructField("asin", StringType(), True), StructField("isSelfAsin", IntegerType(), True), ]) self.df_self_asin = self.spark.createDataFrame(df_self_asin, schema=schema).cache() self.df_self_asin.show(10, truncate=False) break except Exception as e: print(e, traceback.format_exc()) time.sleep(10) self.engine_mysql = self.get_connection() def fetch_asin_sku_count(self): while True: try: sql = f"""SELECT asin,count(id) as auctionsNum,count((case when sku!='' then sku else NULL end)) as skusNumCreat from selection.product_audit_asin_sku -- where asin in ('B085WYH539') GROUP BY asin """ df_asin_sku = pd.read_sql(sql, con=self.engine_mysql) schema = StructType([ StructField("asin", StringType(), True), StructField("auctionsNum", IntegerType(), True), StructField("skusNumCreat", IntegerType(), True), ]) self.df_asin_sku = self.spark.createDataFrame(df_asin_sku, schema=schema).cache() self.df_asin_sku.show(10, truncate=False) break except Exception as e: print(e, traceback.format_exc()) time.sleep(10) self.engine_mysql = self.get_connection() @staticmethod def init_schema(): schema = StructType([ StructField("asin", StringType(), True), StructField("week", StringType(), True), StructField("title", StringType(), True), StructField("img_url", StringType(), True), StructField("rating", StringType(), True), StructField("total_comments", StringType(), True), StructField("price", FloatType(), True), StructField("rank", StringType(), True), StructField("category", StringType(), True), StructField("launch_time", StringType(), True), StructField("volume", StringType(), True), StructField("weight", StringType(), True), StructField("page_inventory", IntegerType(), True), StructField("buy_box_seller_type", IntegerType(), True), StructField("asin_vartion_list", ArrayType(ArrayType(StringType()), True), True), StructField("title_len", IntegerType(), True), StructField("img_num", IntegerType(), True), StructField("img_type", StringType(), True), StructField("activity_type", StringType(), True), StructField("one_two_val", StringType(), True), StructField("three_four_val", StringType(), True), StructField("eight_val", StringType(), True), StructField("qa_num", IntegerType(), True), StructField("five_star", IntegerType(), True), StructField("four_star", IntegerType(), True), StructField("three_star", IntegerType(), True), StructField("two_star", IntegerType(), True), StructField("one_star", IntegerType(), True), StructField("low_star", IntegerType(), True), StructField("together_asin", StringType(), True), StructField("brand", StringType(), True), StructField("ac_name", StringType(), True), StructField("material", StringType(), True), StructField("node_id", StringType(), True), StructField("data_type", IntegerType(), True), StructField("sp_num", StringType(), True), StructField("describe", StringType(), True), StructField("date_info", StringType(), True), StructField("weight_str", StringType(), True), StructField("package_quantity", StringType(), True), StructField("pattern_name", StringType(), True), StructField("seller_id", StringType(), True), StructField("variat_num", IntegerType(), True), StructField("site_name", StringType(), True), StructField("best_sellers_rank", StringType(), True), StructField("best_sellers_herf", StringType(), True), StructField("account_url", StringType(), True), StructField("account_name", StringType(), True), StructField("parentAsin", StringType(), True), StructField("asinUpdateTime", StringType(), True), StructField("follow_sellers", StringType(), True), StructField("buy_sales", StringType(), True), StructField("seller_json", StringType(), True), ]) return schema @staticmethod def udf_cal_crc32(asin, key_size): # crc32算法 + 取余 # 获取asin字符串的字节表示形式 bytes_str = bytes(asin, 'utf-8') # 使用zlib计算CRC-32校验和 checksum = zlib.crc32(bytes_str) # 获取32位的二进制补码 checksum_signed = (checksum & 0xFFFFFFFF) - (1 << 32) if checksum & (1 << 31) else checksum def java_mod(x, y): # return x % y if x * y > 0 else x % y - y # 区分正负值 return abs(x) % y # 不区分正负值 # 取余 result = java_mod(checksum_signed, key_size) return result @staticmethod def udf_cal_bkdr(asin): # BKDR哈希算法 hash = 0 for c in asin: hash = (hash * 33 + ord(c)) % 65535 # 对哈希值取模65535,以避免溢出 return hash @staticmethod def udf_rank_and_category(best_sellers_rank, pattern_1_rank_str, pattern_str, replace_str): # best_sellers_rank = str(best_sellers_rank).replace(",", "") best_sellers_rank = str(best_sellers_rank).replace(".", "").replace(",", "") matches = re.findall(pattern_1_rank_str, best_sellers_rank) asin_bs_cate_1_rank = matches[0] if matches else None best_sellers_rank = best_sellers_rank.replace(replace_str, "") matches = re.findall(pattern_str, best_sellers_rank) rank_and_category = "&&&&".join([rank_cate.replace(",", "") for rank_cate in matches]) if matches else None return asin_bs_cate_1_rank, rank_and_category def read_data(self): # print("1.1 读取dim_st_asin_info表, 计算ao值") # sql = f"select * from dim_st_asin_info where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'" # print("sql:", sql) # self.df_st_asin = self.spark.sql(sql) # self.df_st_asin = self.df_st_asin.drop_duplicates(['search_term', 'asin', 'data_type']).cache() # self.df_st_asin.show(10, truncate=False) print("1.2 读取ods_one_category_report表") if int(self.year) == 2022 and int(self.month) < 3: sql = f"select category_id as asin_bs_cate_1_id, rank as asin_bs_cate_1_rank, orders as asin_bsr_orders from ods_one_category_report " \ f"where site_name='{self.site_name}' and date_type='month' and date_info='2022-12';" else: sql = f"select category_id as asin_bs_cate_1_id, rank as asin_bs_cate_1_rank, orders as asin_bsr_orders from ods_one_category_report " \ f"where site_name='{self.site_name}' and date_type='month' and date_info='{self.year}-{self.month}';" print("sql:", sql) self.df_bs_report = self.spark.sql(sqlQuery=sql).cache() self.df_bs_report.show(10, truncate=False) print("1.3 读取bsr一级分类表") self.df_asin_bs = get_node_first_id_df(self.site_name, self.spark) self.df_asin_bs = self.df_asin_bs.withColumnRenamed("category_first_id", "asin_bs_cate_1_id") self.df_asin_bs.show(10, truncate=False) print("1.4 读取内部asin表") # sql = f"select asin, 1 as isSelfAsin from ods_self_asin where site_name='{self.site_name}';" # print("sql:", sql) # self.df_self_asin = self.spark.sql(sqlQuery=sql).cache() # self.df_self_asin.show(10, truncate=False) self.fetch_self_asin() # 读取asin和sku计数关系 print("1.5 读取asin和sku计数关系") self.fetch_asin_sku_count() # print("1.6 读取dim_fd_asin_info表, 卖家所在地") # sql = f"select asin, fd_country_name as fdCountryName from dim_fd_asin_info where site_name='{self.site_name}';" # print("sql:", sql) # self.df_fd_asin_info = self.spark.sql(sql) # self.df_fd_asin_info = self.df_fd_asin_info.drop_duplicates(['asin']).cache() # self.df_fd_asin_info.show(10, truncate=False) print("1.7 读取dwd_asin_measure表") self.read_data_dwd_asin_measure() print("1.8 获取卖家相关信息-卖家所在地") sql = f""" select fd_unique as account_id, upper(fd_country_name) as fdCountryName from dim_fd_asin_info where site_name='{self.site_name}' and fd_unique is not null group by fd_unique, fd_country_name """ print("sql=", sql) self.df_fd_asin_info = self.spark.sql(sqlQuery=sql) self.df_fd_asin_info.show(10, truncate=False) def read_data_dwd_asin_measure(self): print("7. 读取dwd_asin_measure拿取ao及各类型数量") sql = f""" select asin, asin_zr_counts, asin_adv_counts, asin_st_counts, asin_amazon_orders, asin_zr_flow_proportion, asin_ao_val from dwd_asin_measure where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}' """ print("sql=", sql) self.df_asin_measure = self.spark.sql(sqlQuery=sql) self.df_asin_measure = self.df_asin_measure.repartition(20).cache() self.df_asin_measure.show(10, truncate=False) def handle_data(self): # 计算asin的ao值 self.get_topic_name() # self.df_asin_counts = self.handle_st_asin_counts() # self.df_asin_counts = self.df_asin_counts.select("asin", "asin_ao").cache() def handle_asin_bs_category_rank(self, df): df = df.withColumn( 'bs_str', self.u_rank_and_category( 'best_sellers_rank', F.lit(self.pattern_1_rank_str[self.site_name]), F.lit(self.pattern_str[self.site_name]), F.lit(self.replace_str[self.site_name]) ) ) df = df.withColumn('asin_bs_cate_1_rank', df.bs_str.getField('asin_bs_cate_1_rank')) \ .withColumn('rank_and_category', df.bs_str.getField('rank_and_category')) \ .drop('bs_str', 'best_sellers_rank') df.show(10, truncate=False) return df # def handle_st_asin_counts(self): # self.df_st_asin = self.df_st_asin.withColumn( # f"asin_data_type", # F.concat(F.lit(f"asin_"), self.df_st_asin.data_type, F.lit(f"_counts")) # ) # df_asin_counts = self.df_st_asin.groupby([f'asin']). \ # pivot(f"asin_data_type").count() # # df_asin_counts = self.df_asin_templates.unionByName(df_asin_counts, allowMissingColumns=True) # 防止爬虫数据没有导致程序运行出错 # df_asin_counts = df_asin_counts.fillna(0) # # df.show(10, truncate=False) # df_asin_counts = df_asin_counts.withColumn( # f"asin_sb_counts", # df_asin_counts[f"asin_sb1_counts"] + df_asin_counts[f"asin_sb2_counts"] + df_asin_counts[f"asin_sb3_counts"] # ) # df_asin_counts = df_asin_counts.withColumn( # f"asin_adv_counts", # df_asin_counts[f"asin_sb_counts"] + df_asin_counts[f"asin_sp_counts"] # ) # df_asin_counts = df_asin_counts.withColumn( # f"asin_ao", # df_asin_counts[f"asin_adv_counts"] / df_asin_counts[f"asin_zr_counts"] # ) # 不要把null置为0, null值产生原因是zr类型没有搜到对应的搜索词 # df_asin_counts = df_asin_counts.withColumn("asin_ao", F.round(df_asin_counts["asin_ao"], 4)) # # df_asin_counts.show(10, truncate=False) # return df_asin_counts # 处理配送方式 def handle_asin_buy_box_seller_type(self, df): df = df.withColumn("seller_json_parsed", self.u_parse_seller_info(df.seller_json)) df = df.withColumn("buy_box_seller_type", df.seller_json_parsed.buy_box_seller_type).withColumn( "account_name", df.seller_json_parsed.account_name).drop("seller_json_parsed", "seller_json") return df @staticmethod def clean_kafka_df(df): df = df.withColumnRenamed("seller_id", "account_id") # |asin_zr_flow_proportion|asin_ao_val|asin_amazon_orders|variant_info|matrix_flow_proportion|matrix_ao_val| df = df.select("asin", "parentAsin", "title", "variat_num", "best_sellers_rank", "best_sellers_herf", "price", "rating", "brand", "account_id", "account_name", "account_url", "buy_box_seller_type", "volume", "weight", "weight_str", "launch_time", "total_comments", "page_inventory", "asinUpdateTime", "site_name", "node_id", "buy_sales", 'asin_amazon_orders', 'asin_ao_val', 'matrix_ao_val', "asin_zr_flow_proportion", 'matrix_flow_proportion') return df def rename_cols(self, df): # 计算redis的key df = df.withColumn( 'key_outer', self.u_cal_crc32('asin', F.lit(self.batch_size)) ) df = df.withColumn( 'key_inner', self.u_cal_bkdr('asin') ) df.show(5, truncate=False) df = df.withColumnRenamed("variat_num", "asinVarNum") df = df.withColumnRenamed("asin_bs_cate_1_rank", "oneCategoryRank") df = df.withColumnRenamed("rank_and_category", "bestSellersRank") # 解析后的 df = df.withColumnRenamed("best_sellers_herf", "lastHerf") df = df.withColumnRenamed("asin_ao_val", "aoVal") df = df.withColumnRenamed("asin_bsr_orders", "bsrOrders") df = df.withColumnRenamed("asin_bsr_orders_sale", "bsrOrdersSale") df = df.withColumnRenamed("brand", "brandName") df = df.withColumnRenamed("account_id", "accountId") df = df.withColumnRenamed("account_name", "accountName") df = df.withColumnRenamed("account_url", "accountUrl") df = df.withColumnRenamed("buy_box_seller_type", "buyBoxSellerType") df = df.withColumnRenamed("launch_time", "launchTime") df = df.withColumnRenamed("total_comments", "totalComments") df = df.withColumnRenamed("page_inventory", "pageInventory") df = df.withColumnRenamed("buy_sales", "asinBoughtMonth") df = df.withColumnRenamed("asin_amazon_orders", "asinAmazonOrders") df = df.withColumnRenamed("asin_ao_val", "aoVal") df = df.withColumnRenamed("matrix_ao_val", "matrixAoVal") df = df.withColumnRenamed("asin_zr_flow_proportion", "asinZrFlowProportion") df = df.withColumnRenamed("matrix_flow_proportion", "asinZrFlowProportionMatrix") # df = df.withColumnRenamed("fd_country_name", "fdCountryName") df = df.select('asin', 'parentAsin', 'title', 'asinVarNum', 'oneCategoryRank', 'bestSellersRank', 'lastHerf', 'aoVal', 'matrixAoVal', 'price', 'rating', 'bsrOrders', 'bsrOrdersSale', 'brandName', 'accountId', 'accountName', 'accountUrl', 'buyBoxSellerType', 'volume', 'weight', 'launchTime', 'totalComments', 'pageInventory', 'asinUpdateTime', 'asinBoughtMonth', "asinAmazonOrders", "fdCountryName", "key_outer", "key_inner", "volumeFormat", "weightFormat", "isSelfAsin", "auctionsNum", "skusNumCreat", "asinZrFlowProportion", "asinZrFlowProportionMatrix") return df def get_topic_name(self): if self.site_name in ["us", "uk", "de"] and self.date_type == "month": self.topic_name = f"{site_name}_asin_detail_{self.date_type}_{self.date_info.replace('-', '_')}" else: self.topic_name = f"{site_name}_asin_detail" def handle_kafka_stream(self, kafka_df, epoch_id): try: count = kafka_df.count() print("当前批次传输的数据量为df.count():", count) if count == 0: pass # 确保schema非空以避免NoneType错误 if not self.schema: raise ValueError("Schema is not defined") # df.show(5, truncate=False) print("df.columns:", kafka_df.columns) # df = df.select("asin", "launch_time", "volume", "weight", "weight_str", "node_id", "variat_num", "best_sellers_rank", "best_sellers_herf", "seller_id", "account_url", "account_name", "site_name") kafka_df = self.handle_asin_buy_box_seller_type(kafka_df) # 处理卖家类型 kafka_df = CommonUtil.get_asin_variant_attribute(df_asin_detail=kafka_df, df_asin_measure=self.df_asin_measure, partition_num=20, use_type=0) print("df.columns:", kafka_df.columns) df = self.clean_kafka_df(df=kafka_df) # df.show(5, truncate=False) # # 提取排名和分类 df_bs = self.handle_asin_bs_category_rank(df=df.select("asin", "best_sellers_rank")) # join df_save = df.join( df_bs, on='asin', how='left' ).join( self.df_asin_bs, on='node_id', how='left' ).join( self.df_self_asin, on='asin', how='left' ).join( self.df_asin_sku, on='asin', how='left' ).join( self.df_fd_asin_info, on='account_id', how='left' ) df_save = df_save.na.fill({"isSelfAsin": 0}) # 计算bsr效率 df_save = df_save.join( self.df_bs_report, on=['asin_bs_cate_1_rank', 'asin_bs_cate_1_id'], how='left' ) df_save = df_save.withColumn("volumeFormat", F.when(F.col("volume").isNotNull(), self.u_extract_dimensions("volume"))) df_save = df_save.withColumn("weightFormat", F.when(F.col("weight_str").isNotNull(), self.u_extract_weight("weight_str"))) df_save = df_save.withColumn("asin_bsr_orders_sale", df_save.price * df_save.asin_bsr_orders) df_save = self.rename_cols(df=df_save) df_save = df_save.fillna({"isSelfAsin": 0}) self.save_to_redis(df=df_save) except Exception as e: print(e, traceback.format_exc()) # # 与从Kafka读取的数据进行连接 # joined_df = df.join(self.df_asin_title, "asin", how='left') # # 执行你的转换和聚合逻辑 # result_df = joined_df.groupBy("asin").count() # result_df.show(10, truncate=False) print("epoch_id:", epoch_id, datetime.now().strftime("%Y-%m-%d %H:%M:%S")) def handle_kafka_history(self, kafka_df): try: print("df.columns:", kafka_df.columns) # df = df.select("asin", "launch_time", "volume", "weight", "weight_str", "node_id", "variat_num", "best_sellers_rank", "best_sellers_herf", "seller_id", "account_url", "account_name", "site_name") # kafka_df.show(10, truncate=False) kafka_df.show(10) # 处理卖家类型 kafka_df = self.handle_asin_buy_box_seller_type(kafka_df) # kafka_df.show(10) kafka_df = CommonUtil.get_asin_variant_attribute(df_asin_detail=kafka_df, df_asin_measure=self.df_asin_measure, partition_num=20, use_type=0) # |asin_zr_counts|asin_adv_counts|asin_st_counts|asin_amazon_orders|asin_zr_flow_proportion|asin_ao_val|asin_amazon_orders|variant_info|matrix_flow_proportion|matrix_ao_val| # kafka_df.show(10) df = self.clean_kafka_df(df=kafka_df) # 选择需要的列 # df = df.withColumn("volumeFormat", F.when(F.col("volume").isNotNull(), self.u_extract_dimensions("volume"))) # df = df.withColumn("weightFormat", F.when(F.col("weight_str").isNotNull(), self.u_extract_weight("weight_str"))) # # df.select("asin", "volume", "volumeFormat", "weight_str", "weightFormat").show(20, truncate=False) # # df.show(5, truncate=False) # # 提取排名和分类 df_bs = self.handle_asin_bs_category_rank(df=df.select("asin", "best_sellers_rank")) # join df_save = df.join( df_bs, on='asin', how='left' ).join( self.df_asin_bs, on='node_id', how='left' ).join( self.df_self_asin, on='asin', how='left' ).join( self.df_asin_sku, on='asin', how='left' ).join( self.df_fd_asin_info, on='account_id', how='left' ) # 计算bsr效率 df_save = df_save.join( self.df_bs_report, on=['asin_bs_cate_1_rank', 'asin_bs_cate_1_id'], how='left' ) df_save = df_save.withColumn("volumeFormat", F.when(F.col("volume").isNotNull(), self.u_extract_dimensions("volume"))) df_save = df_save.withColumn("weightFormat", F.when(F.col("weight_str").isNotNull(), self.u_extract_weight("weight_str"))) # df_save.select("asin", "volume", "volumeFormat", "weight_str", "weightFormat").show(20, truncate=False) df_save = df_save.withColumn("asin_bsr_orders_sale", df_save.price * df_save.asin_bsr_orders) df_save = self.rename_cols(df=df_save) df_save = df_save.fillna({"isSelfAsin": 0}) # df_save.show(10) self.save_to_redis(df=df_save) except Exception as e: print(e, traceback.format_exc()) def save_to_redis(self, df): # 将Spark DataFrame转换为Pandas DataFrame pdf = df.toPandas() print(f"开始存储数据: {pdf.shape}") # 遍历Pandas DataFrame并将数据插入到Redis for index, row in pdf.iterrows(): # 创建一个复合键,或者根据你的需要选择适当的键 # 1. 外层key为10197, 内层可以为10197:15931 # redis_key = f"{row['key_outer']}:{row['key_inner']}" # # # 插入值到Redis - 在这里我仅仅存储了一个值,你可以存储一个字典来存储多个值 # self.client.set(redis_key, row['value']) # row_json = row.to_json(orient='split') # self.client.set(redis_key, row_json) # 2. 外层key为10197, 内层可以为15931 # redis_key = row['key_outer'] # redis_field = row['key_inner'] # row_json = row.to_json(orient='split') # self.client.hset(redis_key, redis_field, row_json) # 3. hashmap + 外层key为10197, 内层可以为15931 redis_key = row['key_outer'] # redis_field = row['key_inner'] redis_field = row['asin'] row_dict = row.to_dict() # row_dict = {k: str(v).lower().replace("none", "").replace("nan", "") for k, v in row_dict.items()} # 确保所有的值都是字符串 row_dict = {k: str(v).replace("None", "").replace("none", "").replace("NaN", "").replace("nan", "") for k, v in row_dict.items()} # 确保所有的值都是字符串 row_dict = {k: format(v, ".2f") if isinstance(v, (int, float)) else str(v).replace("None", "").replace( "nan", "") for k, v in row_dict.items()} del row_dict["key_outer"] del row_dict["key_inner"] row_json = json.dumps(row_dict) self.client.hset(redis_key, redis_field, row_json) if __name__ == '__main__': site_name = sys.argv[1] # 参数1:站点 date_type = sys.argv[2] # 参数2:类型:week/4_week/month/quarter/day date_info = sys.argv[3] # 参数3:年-周/年-月/年-季/年-月-日, 比如: 2022-1 consumer_type = sys.argv[4] # 参数3:年-周/年-月/年-季/年-月-日, 比如: 2022-1 handle_obj = DimStAsinInfo(site_name=site_name, date_type=date_type, date_info=date_info, consumer_type=consumer_type, batch_size=100000) # handle_obj.run() handle_obj.run_kafka()