es_flow_asin.py 9.77 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
import sys
import os

sys.path.append(os.path.dirname(sys.path[0]))
from utils.es_util import EsUtils
from utils.templates_mysql import TemplatesMysql
import pandas as pd
from utils.common_util import CommonUtil
from utils.spark_util import SparkUtil
from utils.db_util import DbTypes, DBUtil


class EsStDetail(TemplatesMysql):

    def __init__(self, site_name='us', date_type="week", date_info='2023-01', result_type='formal'):
        super().__init__()
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        self.result_type = result_type
        self.df_workflow = pd.DataFrame()
        self.table_name = f"dwt_flow_asin"
        # 创建spark对象
        self.spark = SparkUtil.get_spark_sessionV4(
            app_name=f"flow_asin_synchronize: {self.site_name}-{self.date_type}-{self.date_info}",
            use_db="big_data_selection")
        # DataFrame对象初始化
        self.df_synchronize = self.spark.sql("select 1+1")
        if self.date_type == '4_week':
            self.cur_date = self.get_date_from_week()
        else:
            self.cur_date = self.date_info
        self.year_month = str(self.cur_date).replace("-", "_")
        self.engine_mysql = DBUtil.get_db_engine(db_type=DbTypes.mysql.name, site_name="us")
        # elasticsearch上索引名称及索引别名准备
        self.alias_name = f'{self.site_name}_st_detail_last_4_week' if self.result_type == 'formal' else f'{self.site_name}_st_detail_last_4_week_test'
        is_formal = self.result_type == 'formal'
        env_suffix = '' if is_formal else '_test'
        if self.date_type == '4_week':
            print(f"往{'正式' if is_formal else '测试'}环境上同步最近三十天数据")
            self.es_index_name = f"{self.site_name}_flow_asin_last30day_{self.cur_date}{env_suffix}"
        else:
            print(f"往{'正式' if is_formal else '测试'}环境上同步月数据")
            self.es_index_name = f"{self.site_name}_st_detail_{self.date_type}_{self.year_month}{env_suffix}"
        # 流程表中任务类型和记录表名
        self.record_table = 'workflow_everyday'
        self.record_type = 'month' if self.date_type == 'month' else '30_day'
        self.record_table_name_field = f'{self.site_name}_flow_asin_last_month' if self.date_type == 'month' else f'{self.site_name}_flow_asin_last30day'
        # elasticsearch相关配置
        self.client = EsUtils.get_es_client()
        self.es_options = EsUtils.get_es_options(self.es_index_name)
        self.es_body = EsUtils.get_es_body()

        # 正式导出需入导出记录表
        if result_type == 'formal':
            CommonUtil.judge_is_work_hours(site_name=site_name, date_type=date_type, date_info=date_info,
                                           principal='wangrui4', priority=3, export_tools_type=2,
                                           belonging_to_process='流量选品')

    def get_date_from_week(self):
        df_date = self.spark.sql(f"select * from dim_date_20_to_30;")
        df = df_date.toPandas()
        df_loc = df.loc[(df.year_week == f'{self.date_info}') & (df.week_day == 7)]
        cur_date = list(df_loc.date)[0]
        print(cur_date)
        return str(cur_date)

    # 读取数据
    def read_data(self):
        sql = f"""
            select asin, asin_ao_val as ao_val, asin_zr_counts as zr_counts, asin_sp_counts as sp_counts, 
            asin_sb_counts as sb_counts, asin_vi_counts as vi_counts, asin_bs_counts as bs_counts, 
            asin_ac_counts as ac_counts, asin_tr_counts as tr_counts, asin_er_counts as er_counts, 
            bsr_orders, sales as bsr_orders_sale, asin_title as title, asin_title_len as title_len, asin_price as price, 
            asin_rating as rating, asin_total_comments as total_comments, asin_buy_box_seller_type as buy_box_seller_type, 
            asin_page_inventory as page_inventory, asin_volume as volume, asin_weight as weight, asin_color as color, 
            asin_size as size, asin_style as style, asin_material as material, asin_launch_time as launch_time, 
            asin_img_num as img_num, parent_asin, asin_img_type as img_type, asin_img_url as img_url, 
            asin_activity_type as activity_type, act_one_two_val as one_two_val, act_three_four_val as three_four_val, 
            act_five_six_val as five_six_val, act_eight_val as eight_val, asin_brand_name as brand, variation_num, 
            one_star, two_star, three_star, four_star, five_star, low_star, together_asin, account_name, account_id, 
            seller_country_name as site_name, asin_rank_rise as rank_rise, asin_rank_change as rank_change, 
            asin_ao_rise as ao_rise, asin_ao_change as ao_change, asin_price_rise as price_rise, 
            asin_price_change as price_change, asin_rating_rise as rating_rise, asin_rating_change as rating_change, 
            asin_comments_rise as comments_rise, asin_comments_change as comments_change, 
            asin_bsr_orders_rise as bsr_orders_rise, asin_bsr_orders_change as bsr_orders_change, 
            asin_sales_rise as sales_rise, asin_sales_change as sales_change, asin_variation_rise as variation_rise, 
            asin_variation_change as variation_change, asin_size_type as size_type, asin_rating_type as rating_type, 
            asin_site_name_type as site_name_type, asin_weight_type as weight_type, asin_launch_time_type as launch_time_type, 
            asin_ao_val_type as ao_val_type, asin_rank_type as rank_type, asin_price_type as price_type, bsr_type, 
            bsr_best_orders_type, asin_quantity_variation_type as quantity_variation_type, package_quantity, is_movie_label, 
            is_brand_label, is_alarm_brand, asin_type, asin_crawl_date, category_first_id, category_id, first_category_rank, 
            current_category_rank, asin_weight_ratio, asin_bought_month, asin_lqs_rating, asin_lqs_rating_detail, 
            title_matching_degree, asin_lob_info, is_contains_lob_info, is_package_quantity_abnormal, zr_flow_proportion, 
            matrix_flow_proportion, matrix_ao_val, customer_reviews_json as product_features, img_info, 
            coalesce(parent_asin, asin) as collapse_asin, follow_sellers_count 
            from {self.table_name} where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'
        """
        print("sql:", sql)
        self.df_synchronize = self.spark.sql(sqlQuery=sql)
        self.df_synchronize = self.df_synchronize.repartition(40).cache()
        self.df_synchronize.show(10, truncate=False)

    # 同步数据前的准备工作
    def es_prepare(self):
        print("当前链接的es节点信息为:" + str(EsUtils.__es_ip__))
        EsUtils.create_index(self.es_index_name, self.client, self.es_body)
        if self.date_type != 'month':
            if not EsUtils.exist_index_alias(self.alias_name, self.client):
                EsUtils.create_index_alias(self.es_index_name, self.alias_name, self.client)
            else:
                index_name_list = EsUtils.get_index_names_associated_alias(self.alias_name, self.client)
                if self.es_index_name not in index_name_list:
                    EsUtils.delete_index_alias(self.alias_name, self.client)
                    EsUtils.create_index_alias(self.es_index_name, self.alias_name, self.client)
                else:
                    pass

    # 同步数据到elasticsearch
    def save_data(self):
        print(f"当前同步的数据为:{self.table_name}: {self.site_name}-{self.date_type}-{self.date_info}; 数据量为: {self.df_synchronize.count()}")
        self.df_synchronize = self.df_synchronize.repartition(60)
        self.df_synchronize.write.format('org.elasticsearch.spark.sql').options(**self.es_options).mode('append').save()

    # 月数据同步完成后将最近三十天索引切换到月并修改流程表状态
    def modify_mission_record_status(self):
        # 查询流程表已完成的最新记录时间点
        if self.date_type == 'month' and self.result_type == 'formal':
            select_sql = f"select id from {self.record_table} where site_name='{self.site_name}' and date_type='{self.date_type}' and report_date='{self.cur_date}' and page='流量选品' and status_val=14 and is_end='是'"
            df_is_finished = pd.read_sql(select_sql, self.engine_mysql)
            if df_is_finished.empty:
                replace_sql = f"""
                    replace into {self.record_table} (site_name, report_date, status, status_val, table_name, date_type, page, is_end, remark, export_db_type)
                    VALUES ('{self.site_name}', '{self.cur_date}', '流量选品计算完毕', 14,  '{self.record_table_name_field}', '{self.record_type}', '流量选品', '是', '流量选品计算完毕', 'elasticsearch')
                """
                DBUtil.exec_sql('mysql', 'us', replace_sql)
                if EsUtils.exist_index_alias(self.alias_name, self.client):
                    print("切换最近三十天索引别名链接到月数据")
                    EsUtils.delete_index_alias(self.alias_name, self.client)
                    EsUtils.create_index_alias(self.es_index_name, self.alias_name, self.client)
                else:
                    EsUtils.create_index_alias(self.es_index_name, self.alias_name, self.client)
        else:
            pass

    def run(self):
        self.read_data()
        self.es_prepare()
        self.save_data()
        self.modify_mission_record_status()


if __name__ == '__main__':
    arguments = sys.argv[1:]
    site_name = sys.argv[1]  # 参数1:站点
    date_type = sys.argv[2]  # 参数2:week/month/quarter
    date_info = sys.argv[3]
    if len(arguments) == 4:
        result_type = sys.argv[4]
    else:
        result_type = 'formal'
    handle_obj = EsStDetail(site_name=site_name, date_type=date_type, date_info=date_info, result_type=result_type)
    handle_obj.run()