import os import sys import time sys.path.append(os.path.dirname(sys.path[0])) from utils.db_util import DBUtil from utils.ssh_util import SSHUtil from utils.common_util import CommonUtil, DateTypes from datetime import datetime if __name__ == '__main__': site_name = CommonUtil.get_sys_arg(1, None) date_type = CommonUtil.get_sys_arg(2, None) date_info = CommonUtil.get_sys_arg(3, None) # 获取最后一个参数 last_flag = CommonUtil.get_sys_arg(len(sys.argv) - 1, None) cur_date = datetime.now().date() sql_date_type = date_type print(f"执行参数为{sys.argv}") db_type = 'postgresql_cluster' CommonUtil.judge_is_work_hours(site_name=site_name, date_type=date_type, date_info=date_info, principal='fangxingjun', priority=4, export_tools_type=1, belonging_to_process='变体表') # CommonUtil.judge_is_work_hours(site_name=site_name, date_type=date_type, date_info=date_info, principal='fangxingjun', # priority=3, export_tools_type=1, belonging_to_process='反查搜索词') # 获取数据库连接 engine = DBUtil.get_db_engine(db_type, site_name) suffix = str(date_info).replace("-", "_") export_cols = [ "asin", "parent_asin", "color", "size", "style", "state", "column_2", "column_1", "created_time", "updated_time", "created_date", "mapped_asin", ] export_tb_target = f"{site_name}_asin_variation" export_tb_copy = f"{site_name}_asin_variation_copy1" export_table = export_tb_copy sql = f""" drop table if exists {export_tb_copy}; create table if not exists {export_tb_copy} ( like {export_tb_target} including comments including defaults ) PARTITION BY RANGE (mapped_asin); truncate table {export_tb_copy}; SELECT create_distributed_table('{export_tb_copy}', 'asin'); CREATE TABLE public.{export_tb_copy}_part1 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (0) TO (50000000); CREATE TABLE public.{export_tb_copy}_part2 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (50000000) TO (100000000); CREATE TABLE public.{export_tb_copy}_part3 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (100000000) TO (150000000); CREATE TABLE public.{export_tb_copy}_part4 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (150000000) TO (200000000); CREATE TABLE public.{export_tb_copy}_part5 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (200000000) TO (250000000); CREATE TABLE public.{export_tb_copy}_part6 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (250000000) TO (300000000); CREATE TABLE public.{export_tb_copy}_part7 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (300000000) TO (350000000); CREATE TABLE public.{export_tb_copy}_part8 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (350000000) TO (400000000); CREATE TABLE public.{export_tb_copy}_part9 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (400000000) TO (450000000); CREATE TABLE public.{export_tb_copy}_part10 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (450000000) TO (500000000); CREATE TABLE public.{export_tb_copy}_part11 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (500000000) TO (550000000); CREATE TABLE public.{export_tb_copy}_part12 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (550000000) TO (600000000); CREATE TABLE public.{export_tb_copy}_part13 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (600000000) TO (650000000); CREATE TABLE public.{export_tb_copy}_part14 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (650000000) TO (700000000); CREATE TABLE public.{export_tb_copy}_part15 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (700000000) TO (750000000); CREATE TABLE public.{export_tb_copy}_part16 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (750000000) TO (800000000); CREATE TABLE public.{export_tb_copy}_part17 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (800000000) TO (850000000); CREATE TABLE public.{export_tb_copy}_part18 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (850000000) TO (900000000); CREATE TABLE public.{export_tb_copy}_part19 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (900000000) TO (950000000); CREATE TABLE public.{export_tb_copy}_part20 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (950000000) TO (1000000000); """ print(f"sql: {sql}") # 执行SQL语句 DBUtil.engine_exec_sql(engine, sql) DBUtil.engine_exec_sql(engine, f"ALTER TABLE {export_tb_copy} DROP COLUMN IF EXISTS id;") print("导出的字段:", export_cols) partition_dict = { "site_name": site_name, } # 导出执行sqoop的sh编写 sh = CommonUtil.build_export_sh( site_name=site_name, db_type=db_type, hive_tb="dim_asin_variation_info", export_tb=export_table, col=export_cols, partition_dict=partition_dict ) client = SSHUtil.get_ssh_client() SSHUtil.exec_command_async(client, sh, ignore_err=False) client.close() # 重新获取数据库连接 engine = DBUtil.get_db_engine(db_type, site_name) while True: try: with engine.begin() as conn: # 创建索引 sql_index_create1 = f"""CREATE INDEX {export_tb_copy}_asin_hash ON public.{export_tb_copy} USING hash (asin);""" print(f"sql_index_create1: {sql_index_create1}") conn.execute(sql_index_create1) sql_index_create2 = f"""CREATE INDEX {export_tb_copy}_parent_asin_hash ON public.{export_tb_copy} USING hash (parent_asin);""" print(f"sql_index_create2: {sql_index_create2}") conn.execute(sql_index_create2) # 交换表名 sql_table_rename1 = f"""alter table {export_tb_target} rename to {export_tb_target}_back; """ print(f"sql_table_rename1: {sql_table_rename1}") conn.execute(sql_table_rename1) sql_table_rename2 = f"""alter table {export_tb_copy} rename to {export_tb_target};""" print(f"sql_table_rename2: {sql_table_rename2}") conn.execute(sql_table_rename2) sql_table_rename3 = f"""alter table {export_tb_target}_back rename to {export_tb_copy};""" print(f"sql_table_rename3: {sql_table_rename3}") conn.execute(sql_table_rename3) # 交换索引名称 # asin_hash sql_index_exchange1 = f"""ALTER INDEX {export_tb_target}_asin_hash RENAME TO {export_tb_target}_asin_hash_temp;""" print(f"sql_index_exchange1: {sql_index_exchange1}") conn.execute(sql_index_exchange1) sql_index_exchange2 = f"""ALTER INDEX {export_tb_copy}_asin_hash RENAME TO {export_tb_target}_asin_hash;""" print(f"sql_index_exchange2: {sql_index_exchange2}") conn.execute(sql_index_exchange2) sql_index_exchange3 = f"""ALTER INDEX {export_tb_target}_asin_hash_temp RENAME TO {export_tb_copy}_asin_hash;""" print(f"sql_index_exchange3: {sql_index_exchange3}") conn.execute(sql_index_exchange3) # parent_asin_hash sql_index_exchange1 = f"""ALTER INDEX {export_tb_target}_parent_asin_hash RENAME TO {export_tb_target}_parent_asin_hash_temp;""" print(f"sql_index_exchange1: {sql_index_exchange1}") conn.execute(sql_index_exchange1) sql_index_exchange2 = f"""ALTER INDEX {export_tb_copy}_parent_asin_hash RENAME TO {export_tb_target}_parent_asin_hash;""" print(f"sql_index_exchange2: {sql_index_exchange2}") conn.execute(sql_index_exchange2) sql_index_exchange3 = f"""ALTER INDEX {export_tb_target}_parent_asin_hash_temp RENAME TO {export_tb_copy}_parent_asin_hash;""" print(f"sql_index_exchange3: {sql_index_exchange3}") conn.execute(sql_index_exchange3) # 交换分区名称 for part_num in range(1, 21): sql_1 = f"""alter table {export_tb_target}_part{part_num} rename to {export_tb_target}_part{part_num}_back; """ conn.execute(sql_1) sql_2 = f"""alter table {export_tb_copy}_part{part_num} rename to {export_tb_target}_part{part_num};""" conn.execute(sql_2) sql_3 = f"""alter table {export_tb_target}_part{part_num}_back rename to {export_tb_copy}_part{part_num};""" conn.execute(sql_3) print(sql_1) print(sql_2) print(sql_3) break except Exception as e: time.sleep(60) print("创建索引或者交换表名失败:", e) engine = DBUtil.get_db_engine(db_type, site_name)