test_df.py 2.47 KB
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))

from utils.templates import Templates
from pyspark.sql import functions as F

sys.path.append(os.path.dirname(sys.path[0]))

if __name__ == '__main__':
    tmp = Templates()
    spark = tmp.create_spark_object("test_df")
    sql = f"""
--         select asin_category_desc,
--                first_value(rel_val)               as first_value,
--                count(rel_val)                     as count,
--                sum(if(rel_val = first_val, 1, 0)) as eq_sum
--         from (
--                  select asin_category_desc,
--                         concat(bsr_cate_1_id, "|", bsr_cate_current_id) rel_val,
--                         (
--                             max(struct(asin_crawl_date, concat(bsr_cate_1_id, "|", bsr_cate_current_id)))
--                                 over (partition by asin_category_desc)
--                             )['col2'] as                                     first_val
--                  from dim_cal_asin_history_detail
--                  where 1 = 1
--                    and site_name = lower("us")
--                    and bsr_cate_1_id is not null
--                    and bsr_cate_current_id is not null
--                    and asin_category_desc != '无'
--              )
--         group by asin_category_desc

        select asin,asin_category_desc,bsr_cate_1_id,bsr_cate_current_id
        from dim_cal_asin_history_detail
        where 1 = 1
          and site_name = lower("us")
          and bsr_cate_1_id is not null
          and bsr_cate_current_id is not null
          and asin_category_desc != '无'
          and asin_category_desc = 'Clothing, Shoes & Jewelry›Novelty & More›Clothing›Novelty›Women›Tops & Tees›T-Shirts'
          limit 30
        

"""

    df_save = spark.sql(sql)
    # df = spark.createDataFrame([
    #     ("Tools & Home Improvement›Power & Hand Tools›Hand Tools›Magnetic Sweepers", 1)
    # ],
    #     ('label', "age")
    # )
    #
    # df = df.withColumn("arr", F.split(F.col("label"), "›"))
    # df = df.withColumn("first", F.col("arr").getItem(0))
    # df = df.withColumn("level", F.size("arr"))
    # df = df.withColumn("last", F.col("arr").getItem(F.col("level") - 1))

    df_save.show(truncate=False)

    # df_save.write.saveAsTable(name="asin_category_statistics", format='hive', mode='overwrite', )
    print("success")
    # df_save.write.saveAsTable(name=self.hive_tb, format='hive', mode='append', partitionBy=partition_by)