import os import sys sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from utils.templates import Templates from pyspark.sql import functions as F class DwtAbaKeywordAnalytics(Templates): def __init__(self, site_name='us', date_type="month", date_info='2023-10'): super().__init__() self.site_name = site_name self.date_type = date_type self.date_info = date_info self.db_save = f'dwt_aba_keyword_analytics' self.spark = self.create_spark_object( app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}") self.reset_partitions(partitions_num=10) self.partitions_by = ['site_name', 'date_type', 'date_info'] self.df_aba = self.spark.sql(f"select 1+1;") self.df_keyword = self.spark.sql(f"select 1+1;") self.df_save = self.spark.sql(f"select 1+1;") def read_data(self): sql1 = f""" select id, search_term, rank, category_id, orders, bsr_orders, search_volume, quantity_being_sold, st_ao_avg, st_ao_val_rate, new_bsr_orders_proportion, new_asin_proportion, page1_title_proportion, price_avg, total_comments_avg, rating_avg, weight_avg, volume_avg, title_length_avg, st_num, aadd_proportion, sp_proportion, fbm_proportion, cn_proportion, amzon_proportion, most_proportion, max_num, asin1, asin2, asin3, click_share1, click_share2, click_share3, total_click_share, conversion_share1, conversion_share2, conversion_share3, total_conversion_share, new_asin_num, total_asin_num, new_asin_orders, new_asin_bsr_orders, is_first_text, is_ascending_text, is_search_text, top3_seller_orders, top3_seller_bsr_orders, top3_brand_orders, top3_brand_bsr_orders, page3_brand_num, page3_seller_num, brand_monopoly, seller_monopoly, max_num_asin, is_self_max_num_asin, is_new_market_segment, created_time, updated_time, color_proportion, gross_profit_fee_air, gross_profit_fee_sea, re_string_field1, re_string_field2, re_string_field3, category_current_id, re_int_field2, re_int_field3, supply_demand, market_cycle_type, multi_color_proportion, multi_size_proportion, st_4_20_ao_avg, st_4_20_ao_rate, asin_aadd_count, asin_video_count, asin_fbm_count, asin_cn_count, asin_amazon_count, asin_color_count, asin_multi_color_count, asin_multi_size_count, st_word_num, re_string_field4, st_movie_label, st_brand_label, st_brand1, st_category1, st_brand2, st_category2, st_brand3, st_category3, st_bsr_cate_1_id_new, st_bsr_cate_current_id_new, st_crawl_date, is_high_return_text, st_zr_page123_title_appear_rate, st_sp_page123_title_appear_rate, st_competition_level from dwt_aba_st_analytics where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info}'; """ print(sql1) self.df_aba = self.spark.sql(sqlQuery=sql1).cache() sql2 = f""" select keywordText as search_term, keywordText, matchType from ods_adv_keyword where site_name = '{self.site_name}' group by keywordText, matchType; """ print(sql2) self.df_keyword = self.spark.sql(sqlQuery=sql2).cache() def handle_data(self): self.df_save = self.df_aba.join(self.df_keyword, 'search_term', 'inner') print(self.site_name, self.date_type, self.date_info) # 填充分区字段 self.df_save = self.df_save.withColumn("site_name", F.lit(self.site_name)) self.df_save = self.df_save.withColumn("date_type", F.lit(self.date_type)) self.df_save = self.df_save.withColumn("date_info", F.lit(self.date_info)) self.df_save.show(10) if __name__ == '__main__': site_name = sys.argv[1] date_type = sys.argv[2] date_info = sys.argv[3] handle_obj = DwtAbaKeywordAnalytics(site_name=site_name, date_type=date_type, date_info=date_info) handle_obj.run()