1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.templates import Templates
from pyspark.sql import functions as F
sys.path.append(os.path.dirname(sys.path[0]))
if __name__ == '__main__':
tmp = Templates()
spark = tmp.create_spark_object("test_df")
sql = f"""
-- select asin_category_desc,
-- first_value(rel_val) as first_value,
-- count(rel_val) as count,
-- sum(if(rel_val = first_val, 1, 0)) as eq_sum
-- from (
-- select asin_category_desc,
-- concat(bsr_cate_1_id, "|", bsr_cate_current_id) rel_val,
-- (
-- max(struct(asin_crawl_date, concat(bsr_cate_1_id, "|", bsr_cate_current_id)))
-- over (partition by asin_category_desc)
-- )['col2'] as first_val
-- from dim_cal_asin_history_detail
-- where 1 = 1
-- and site_name = lower("us")
-- and bsr_cate_1_id is not null
-- and bsr_cate_current_id is not null
-- and asin_category_desc != '无'
-- )
-- group by asin_category_desc
select asin,asin_category_desc,bsr_cate_1_id,bsr_cate_current_id
from dim_cal_asin_history_detail
where 1 = 1
and site_name = lower("us")
and bsr_cate_1_id is not null
and bsr_cate_current_id is not null
and asin_category_desc != '无'
and asin_category_desc = 'Clothing, Shoes & Jewelry›Novelty & More›Clothing›Novelty›Women›Tops & Tees›T-Shirts'
limit 30
"""
df_save = spark.sql(sql)
# df = spark.createDataFrame([
# ("Tools & Home Improvement›Power & Hand Tools›Hand Tools›Magnetic Sweepers", 1)
# ],
# ('label', "age")
# )
#
# df = df.withColumn("arr", F.split(F.col("label"), "›"))
# df = df.withColumn("first", F.col("arr").getItem(0))
# df = df.withColumn("level", F.size("arr"))
# df = df.withColumn("last", F.col("arr").getItem(F.col("level") - 1))
df_save.show(truncate=False)
# df_save.write.saveAsTable(name="asin_category_statistics", format='hive', mode='overwrite', )
print("success")
# df_save.write.saveAsTable(name=self.hive_tb, format='hive', mode='append', partitionBy=partition_by)