import os import sys sys.path.append(os.path.dirname(sys.path[0])) from utils.templates import Templates from pyspark.sql import functions as F sys.path.append(os.path.dirname(sys.path[0])) if __name__ == '__main__': tmp = Templates() spark = tmp.create_spark_object("test_df") sql = f""" -- select asin_category_desc, -- first_value(rel_val) as first_value, -- count(rel_val) as count, -- sum(if(rel_val = first_val, 1, 0)) as eq_sum -- from ( -- select asin_category_desc, -- concat(bsr_cate_1_id, "|", bsr_cate_current_id) rel_val, -- ( -- max(struct(asin_crawl_date, concat(bsr_cate_1_id, "|", bsr_cate_current_id))) -- over (partition by asin_category_desc) -- )['col2'] as first_val -- from dim_cal_asin_history_detail -- where 1 = 1 -- and site_name = lower("us") -- and bsr_cate_1_id is not null -- and bsr_cate_current_id is not null -- and asin_category_desc != '无' -- ) -- group by asin_category_desc select asin,asin_category_desc,bsr_cate_1_id,bsr_cate_current_id from dim_cal_asin_history_detail where 1 = 1 and site_name = lower("us") and bsr_cate_1_id is not null and bsr_cate_current_id is not null and asin_category_desc != '无' and asin_category_desc = 'Clothing, Shoes & Jewelry›Novelty & More›Clothing›Novelty›Women›Tops & Tees›T-Shirts' limit 30 """ df_save = spark.sql(sql) # df = spark.createDataFrame([ # ("Tools & Home Improvement›Power & Hand Tools›Hand Tools›Magnetic Sweepers", 1) # ], # ('label', "age") # ) # # df = df.withColumn("arr", F.split(F.col("label"), "›")) # df = df.withColumn("first", F.col("arr").getItem(0)) # df = df.withColumn("level", F.size("arr")) # df = df.withColumn("last", F.col("arr").getItem(F.col("level") - 1)) df_save.show(truncate=False) # df_save.write.saveAsTable(name="asin_category_statistics", format='hive', mode='overwrite', ) print("success") # df_save.write.saveAsTable(name=self.hive_tb, format='hive', mode='append', partitionBy=partition_by)