diff --git a/Pyspark_job/my_kafka/kafka_asin_detail.py b/Pyspark_job/my_kafka/kafka_asin_detail.py index 35851e0..18b1dc7 100644 --- a/Pyspark_job/my_kafka/kafka_asin_detail.py +++ b/Pyspark_job/my_kafka/kafka_asin_detail.py @@ -95,12 +95,12 @@ class DimStAsinInfo(Templates): "it": "(\d+).*?Visualizza i Top 100 nella categoria " } # 匹配一级分类的排名 self.pattern_str = { - "us": "(\d+ in [\w&' ]+)", - "uk": "(\d+ in [\w&' ]+)", - "de": "Nr. (\d+ in [\w&' ]+)", - "es": "nº(\d+ en [\w&' ]+)", - "fr": "(\d+ en [\w&' ]+)", - "it": "n. (\d+ in [\w&' ]+)", + "us": "(\d+ in [\w&' -]+)", + "uk": "(\d+ in [\w&' -]+)", + "de": "Nr. (\d+ in [\w&' -]+)", + "es": "nº(\d+ en [\w&' -]+)", + "fr": "(\d+ en [\w&' -]+)", + "it": "n. (\d+ in [\w&' -]+)", } # 匹配排名和分类 self.replace_str = { "us": "See Top 100 in ", @@ -373,7 +373,7 @@ class DimStAsinInfo(Templates): def clean_kafka_df(df): df = df.withColumnRenamed("seller_id", "account_id") # |asin_zr_flow_proportion|asin_ao_val|asin_amazon_orders|variant_info|matrix_flow_proportion|matrix_ao_val| - df = df.select("asin", "parentAsin", "title", "variat_num", "best_sellers_rank", "best_sellers_herf", "price", "rating", + df = df.select("asin", "parentAsin", "title", "img_url", "variat_num", "best_sellers_rank", "best_sellers_herf", "price", "rating", "brand", "account_id", "account_name", "account_url", "buy_box_seller_type", "volume", "weight", "weight_str", "launch_time", "total_comments", "page_inventory", "asinUpdateTime", "site_name", "node_id", "buy_sales", 'asin_amazon_orders', 'asin_ao_val', 'matrix_ao_val', "asin_zr_flow_proportion", 'matrix_flow_proportion') @@ -389,6 +389,7 @@ class DimStAsinInfo(Templates): ) df.show(5, truncate=False) + df = df.withColumnRenamed("img_url", "imgUrl") df = df.withColumnRenamed("variat_num", "asinVarNum") df = df.withColumnRenamed("asin_bs_cate_1_rank", "oneCategoryRank") df = df.withColumnRenamed("rank_and_category", "bestSellersRank") # 解析后的 @@ -411,7 +412,7 @@ class DimStAsinInfo(Templates): df = df.withColumnRenamed("asin_zr_flow_proportion", "asinZrFlowProportion") df = df.withColumnRenamed("matrix_flow_proportion", "asinZrFlowProportionMatrix") # df = df.withColumnRenamed("fd_country_name", "fdCountryName") - df = df.select('asin', 'parentAsin', 'title', 'asinVarNum', 'oneCategoryRank', 'bestSellersRank', 'lastHerf', 'aoVal', 'matrixAoVal', 'price', 'rating', + df = df.select('asin', 'parentAsin', 'title', "imgUrl", 'asinVarNum', 'oneCategoryRank', 'bestSellersRank', 'lastHerf', 'aoVal', 'matrixAoVal', 'price', 'rating', 'bsrOrders', 'bsrOrdersSale', 'brandName', 'accountId', 'accountName', 'accountUrl', 'buyBoxSellerType', 'volume', 'weight', 'launchTime', 'totalComments', 'pageInventory', 'asinUpdateTime', 'asinBoughtMonth', "asinAmazonOrders", "fdCountryName", "key_outer", "key_inner", "volumeFormat", "weightFormat", "isSelfAsin", "auctionsNum", "skusNumCreat", "asinZrFlowProportion", "asinZrFlowProportionMatrix")