import os import sys sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from pyspark.storagelevel import StorageLevel from utils.templates import Templates # from ..utils.templates import Templates # from AmazonSpider.pyspark_job.utils.templates_test import Templates from pyspark.sql.types import StringType # 分组排序的udf窗口函数 from pyspark.sql.window import Window from pyspark.sql import functions as F class OdsBrandAnalytics(Templates): def __init__(self, site_name='us', date_type="month", date_info='2022-01'): super().__init__() self.site_name = site_name self.date_type = date_type self.date_info = date_info self.date_info2 = date_info # self.db_save = f'ods_brand_analytics' self.spark = self.create_spark_object( app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}") def read_data(self): sql = f"select search_term, asin from dwd_st_asin_measure where site_name='{self.site_name}' " \ f"and date_type='{self.date_type}' and date_info='{self.date_info}' and search_term in ('christmas decorations', 'my orders', 'ugly christmas sweater men')" print("sql:", sql) df_st_asin = self.spark.sql(sqlQuery=sql).cache() df_st_asin.show(20, truncate=False) sql = f"select search_term, st_ao_val from dwd_st_measure where site_name='{self.site_name}' " \ f"and date_type='{self.date_type}' and date_info='{self.date_info}' and search_term in ('christmas decorations', 'my orders', 'ugly christmas sweater men')" print("sql:", sql) df_st = self.spark.sql(sqlQuery=sql).cache() df_st.show(20, truncate=False) sql = f"select asin, asin_ao_val, asin_sp_counts, asin_sb_counts, asin_zr_counts from dwd_asin_measure where site_name='{self.site_name}' " \ f"and date_type='{self.date_type}' and date_info='{self.date_info}'" print("sql:", sql) df_asin = self.spark.sql(sqlQuery=sql).cache() df_asin.show(20, truncate=False) df_asin.filter("asin='B07G9ZZJSV'").show(20, truncate=False) df_save = df_st_asin.join( df_st, on='search_term', how='left' ).join( df_asin, on='asin', how='left' ) # df_save.show(100, truncate=False) print(df_save.count()) df_save = df_save.toPandas() df_save.to_csv("/opt/module/spark/demo/py_demo/temp/st_1_10_100_new.csv", index=False) quit() sql = f"select search_term, st_rank, asin, st_ao_val from dwt_st_asin_info where site_name='{self.site_name}' " \ f"and date_type='{self.date_type}' and date_info='{self.date_info}' and st_rank in(1, 10, 100)" print("sql:", sql) df_st_asin = self.spark.sql(sqlQuery=sql).cache() df_st_asin.show(20, truncate=False) df_st_asin.filter("asin='B07G9ZZJSV'").show(20, truncate=False) sql = f"select asin, asin_ao_val, asin_sp_counts, asin_sb_counts, asin_zr_counts from dwd_asin_counts where site_name='{self.site_name}' " \ f"and date_type='{self.date_type}' and date_info='{self.date_info}'" print("sql:", sql) df_asin = self.spark.sql(sqlQuery=sql).cache() df_asin.show(20, truncate=False) df_asin.filter("asin='B07G9ZZJSV'").show(20, truncate=False) df_save = df_st_asin.join( df_asin, on='asin', how='left' ) # df_save.show(100, truncate=False) print(df_save.count()) df_save = df_save.toPandas() df_save.to_csv("/opt/module/spark/demo/py_demo/temp/st_1_10_100.csv", index=False) quit() def handle_data(self): pass if __name__ == '__main__': site_name = sys.argv[1] # 参数1:站点 date_type = sys.argv[2] # 参数2:类型:day/week/4_week/month/quarter date_info = sys.argv[3] # 参数3:年-月-日/年-周/年-月/年-季, 比如: 2022-1 handle_obj = OdsBrandAnalytics(site_name=site_name, date_type=date_type, date_info=date_info) handle_obj.run()