dwt_aba_last365.py 6.48 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))

from utils.db_util import DBUtil
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil, DateTypes

if __name__ == '__main__':
    site_name = CommonUtil.get_sys_arg(1, None)
    date_type = CommonUtil.get_sys_arg(2, None)
    date_info = CommonUtil.get_sys_arg(3, None)
    #  获取最后一个参数
    test_flag = CommonUtil.get_sys_arg(len(sys.argv) - 1, None)
    print(f"执行参数为{sys.argv}")

    if test_flag == 'test':
        db_type = 'postgresql_test'
        print("导出到测试库中")
    else:
        db_type = "postgresql"
        print("导出到PG库中")

    if date_type == DateTypes.year.name:
        year = date_info
        export_tb_before = f"{site_name}_aba_last_year_{year}"
        rel_date_info = f"{year}-12"

    elif date_type == DateTypes.last365day.name:
        export_tb_before = f"{site_name}_aba_last_365_day"
        rel_date_info = date_info
        pass
    else:
        raise Exception("DateType不合法")

    export_tb_rel = f"{export_tb_before}_copy"
    engine = DBUtil.get_db_engine(db_type, site_name)

    #  创建备份表
    with engine.connect() as connection:
        sql = f"""
         drop table if exists {export_tb_rel};
         create table if not exists {export_tb_rel} 
        (
            like {export_tb_before}  including comments
        );
                """
        print("================================执行sql================================")
        print(sql)
        connection.execute(sql)

    # 兼容 month_old
    rel_date_type = DateTypes.last365day.name if rel_date_info > '2023-10' else f"{DateTypes.last365day.name}_{DateTypes.month_old.name}"

    # 导出表名
    sh = CommonUtil.build_export_sh(
        site_name=site_name,
        db_type=db_type,
        hive_tb="dwt_aba_last365",
        export_tb=export_tb_rel,
        col=[
            "id",
            "search_term",
            "category_id",
            "rank",
            "top_rank",
            "st_num1",
            "st_num2",
            "st_num3",
            "st_num4",
            "st_num5",
            "st_num6",
            "st_num7",
            "st_num8",
            "st_num9",
            "st_num10",
            "st_num11",
            "st_num12",
            "total_st_num",
            "bsr_orders1",
            "bsr_orders2",
            "bsr_orders3",
            "bsr_orders4",
            "bsr_orders5",
            "bsr_orders6",
            "bsr_orders7",
            "bsr_orders8",
            "bsr_orders9",
            "bsr_orders10",
            "bsr_orders11",
            "bsr_orders12",
            "market_cycle_type1",
            "market_cycle_type2",
            "market_cycle_type3",
            "market_cycle_type4",
            "market_cycle_type5",
            "market_cycle_type6",
            "market_cycle_type7",
            "market_cycle_type8",
            "market_cycle_type9",
            "market_cycle_type10",
            "market_cycle_type11",
            "market_cycle_type12",
            "search_volume1",
            "search_volume2",
            "search_volume3",
            "search_volume4",
            "search_volume5",
            "search_volume6",
            "search_volume7",
            "search_volume8",
            "search_volume9",
            "search_volume10",
            "search_volume11",
            "search_volume12",
            "st_ao_avg",
            "st_ao_val_rate",
            "supply_demand",
            "price_avg",
            "total_comments_avg",
            "rating_avg",
            "weight_avg",
            "volume_avg",
            "aadd_proportion",
            "sp_proportion",
            "fbm_proportion",
            "cn_proportion",
            "amzon_proportion",
            "top3_seller_orders",
            "top3_seller_bsr_orders",
            "top3_brand_orders",
            "top3_brand_bsr_orders",
            "page3_brand_num",
            "page3_seller_num",
            "max_num",
            "most_avg_proportion",
            "new_asin_num_avg_monopoly",
            "new_asin_bsr_orders_avg_monopoly",
            "total_asin_num",
            "orders",
            "bsr_orders",
            "created_time",
            "updated_time",
            "max_num_asin",
            "is_self_max_num_asin",
            "date_info",
            "gross_profit_fee_sea",
            "gross_profit_fee_air",
            "category_current_id",
            "color_proportion",
            "brand_monopoly",
            "seller_monopoly",
            "orders1",
            "orders2",
            "orders3",
            "orders4",
            "orders5",
            "orders6",
            "orders7",
            "orders8",
            "orders9",
            "orders10",
            "orders11",
            "orders12",
            "max_orders_month",
            "max_bsr_orders_month",
            "multi_color_avg_proportion",
            "multi_size_avg_proportion",
            "q1_bsr_orders",
            "q2_bsr_orders",
            "q3_bsr_orders",
            "q4_bsr_orders",
            "q1_orders",
            "q2_orders",
            "q3_orders",
            "q4_orders",
            "is_new_market_segment",
            "is_first_text",
            "is_ascending_text",
            "is_search_text",
            "st_word_num",
        ],
        partition_dict={
            "site_name": site_name,
            "date_type": rel_date_type,
            "date_info": rel_date_info
        }
    )

    client = SSHUtil.get_ssh_client()
    SSHUtil.exec_command_async(client, sh, ignore_err=False)
    client.close()

    #  交换表名
    DBUtil.exchange_tb(engine,
                       source_tb_name=export_tb_rel,
                       target_tb_name=export_tb_before,
                       cp_index_flag=True)

    if test_flag != 'test':
        pass
        #  正式环境更新workflow_everyday
        # engine = DBUtil.get_db_engine("mysql", site_name)
        # with engine.connect() as connection:
        #     sql = f"""
        #       insert into workflow_everyday (site_name, report_date, status, status_val, table_name, date_type, page, is_end, remark)
        #         values ('{site_name}', '{date_info}', '导出PG数据库', 14, 'us_bsr_asin_rank', 'day', 'BSR榜单', '是', 'BS榜单对应的TOP100ASIN');
        #             """
        #     print("================================更新workflow_everyday================================")
        #     print(sql)
        #     connection.execute(sql)

    print("success")