dwt_aba_keyword_analytics.py 5.03 KB
import os
import sys


sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from utils.templates import Templates
from pyspark.sql import functions as F

class DwtAbaKeywordAnalytics(Templates):

    def __init__(self, site_name='us', date_type="month", date_info='2023-10'):
        super().__init__()
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        self.db_save = f'dwt_aba_keyword_analytics'
        self.spark = self.create_spark_object(
            app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}")
        self.reset_partitions(partitions_num=10)
        self.partitions_by = ['site_name', 'date_type', 'date_info']
        self.df_aba = self.spark.sql(f"select 1+1;")
        self.df_keyword = self.spark.sql(f"select 1+1;")
        self.df_save = self.spark.sql(f"select 1+1;")

    def read_data(self):
        sql1 = f"""
        select 
            id,
            search_term,
            rank,
            category_id,
            orders,
            bsr_orders,
            search_volume,
            quantity_being_sold,
            st_ao_avg,
            st_ao_val_rate,
            new_bsr_orders_proportion,
            new_asin_proportion,
            page1_title_proportion,
            price_avg,
            total_comments_avg,
            rating_avg,
            weight_avg,
            volume_avg,
            title_length_avg,
            st_num,
            aadd_proportion,
            sp_proportion,
            fbm_proportion,
            cn_proportion,
            amzon_proportion,
            most_proportion,
            max_num,
            asin1,
            asin2,
            asin3,
            click_share1,
            click_share2,
            click_share3,
            total_click_share,
            conversion_share1,
            conversion_share2,
            conversion_share3,
            total_conversion_share,
            new_asin_num,
            total_asin_num,
            new_asin_orders,
            new_asin_bsr_orders,
            is_first_text,
            is_ascending_text,
            is_search_text,
            top3_seller_orders,
            top3_seller_bsr_orders,
            top3_brand_orders,
            top3_brand_bsr_orders,
            page3_brand_num,
            page3_seller_num,
            brand_monopoly,
            seller_monopoly,
            max_num_asin,
            is_self_max_num_asin,
            is_new_market_segment,
            created_time,
            updated_time,
            color_proportion,
            gross_profit_fee_air,
            gross_profit_fee_sea,
            re_string_field1,
            re_string_field2,
            re_string_field3,
            category_current_id,
            re_int_field2,
            re_int_field3,
            supply_demand,
            market_cycle_type,
            multi_color_proportion,
            multi_size_proportion,
            st_4_20_ao_avg,
            st_4_20_ao_rate,
            asin_aadd_count,
            asin_video_count,
            asin_fbm_count,
            asin_cn_count,
            asin_amazon_count,
            asin_color_count,
            asin_multi_color_count,
            asin_multi_size_count,
            st_word_num,
            re_string_field4,
            st_movie_label,
            st_brand_label,
            st_brand1,
            st_category1,
            st_brand2,
            st_category2,
            st_brand3,
            st_category3,
            st_bsr_cate_1_id_new,
            st_bsr_cate_current_id_new,
            st_crawl_date,
            is_high_return_text,
            st_zr_page123_title_appear_rate,
            st_sp_page123_title_appear_rate,
            st_competition_level
        from 
            dwt_aba_st_analytics
        where
            site_name = '{self.site_name}' 
        and date_type = '{self.date_type}' 
        and date_info = '{self.date_info}';
        """
        print(sql1)
        self.df_aba = self.spark.sql(sqlQuery=sql1).cache()

        sql2 = f"""
        select 
            keywordText as search_term,
            keywordText,
            matchType
        from 
            ods_adv_keyword
        where
            site_name = '{self.site_name}'
        group by keywordText, matchType;
        """
        print(sql2)
        self.df_keyword = self.spark.sql(sqlQuery=sql2).cache()

    def handle_data(self):
        self.df_save = self.df_aba.join(self.df_keyword, 'search_term', 'inner')
        print(self.site_name, self.date_type, self.date_info)
        # 填充分区字段
        self.df_save = self.df_save.withColumn("site_name", F.lit(self.site_name))
        self.df_save = self.df_save.withColumn("date_type", F.lit(self.date_type))
        self.df_save = self.df_save.withColumn("date_info", F.lit(self.date_info))
        self.df_save.show(10)


if __name__ == '__main__':
    site_name = sys.argv[1]
    date_type = sys.argv[2]
    date_info = sys.argv[3]
    handle_obj = DwtAbaKeywordAnalytics(site_name=site_name, date_type=date_type, date_info=date_info)
    handle_obj.run()