import os import sys import time sys.path.append(os.path.dirname(sys.path[0])) from utils.db_util import DBUtil from utils.ssh_util import SSHUtil from utils.common_util import CommonUtil,DateTypes from datetime import datetime if __name__ == '__main__': site_name = CommonUtil.get_sys_arg(1, None) date_type = CommonUtil.get_sys_arg(2, None) date_info = CommonUtil.get_sys_arg(3, None) # 获取最后一个参数 last_flag = CommonUtil.get_sys_arg(len(sys.argv) - 1, None) cur_date = datetime.now().date() sql_date_type = date_type print(f"执行参数为{sys.argv}") db_type = 'postgresql_cluster' CommonUtil.judge_is_work_hours(site_name=site_name, date_type=date_type, date_info=date_info, principal='fangxingjun', priority=4, export_tools_type=1, belonging_to_process='反查搜索词') # 获取数据库连接 engine = DBUtil.get_db_engine(db_type, site_name) suffix = str(date_info).replace("-", "_") export_cols = [ "asin", "parent_asin", "color", "size", "style", "state", "column_2", "column_1", "created_time", "updated_time", "created_date", "mapped_asin", ] export_tb_target = f"{site_name}_asin_variation" export_tb_copy = f"{site_name}_asin_variation_copy1" export_table = export_tb_copy sql = f""" drop table if exists {export_tb_copy}; create table if not exists {export_tb_copy} ( like {export_tb_target} including comments including defaults ); truncate table {export_tb_copy}; SELECT create_distributed_table('{export_tb_copy}', 'asin'); CREATE TABLE public.{export_tb_copy}_part1 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (0) TO (50000000); CREATE TABLE public.{export_tb_copy}_part2 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (50000000) TO (100000000); CREATE TABLE public.{export_tb_copy}_part3 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (100000000) TO (150000000); CREATE TABLE public.{export_tb_copy}_part4 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (150000000) TO (200000000); CREATE TABLE public.{export_tb_copy}_part5 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (200000000) TO (250000000); CREATE TABLE public.{export_tb_copy}_part6 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (250000000) TO (300000000); CREATE TABLE public.{export_tb_copy}_part7 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (300000000) TO (350000000); CREATE TABLE public.{export_tb_copy}_part8 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (350000000) TO (400000000); CREATE TABLE public.{export_tb_copy}_part9 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (400000000) TO (450000000); CREATE TABLE public.{export_tb_copy}_part10 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (450000000) TO (500000000); CREATE TABLE public.{export_tb_copy}_part11 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (500000000) TO (550000000); CREATE TABLE public.{export_tb_copy}_part12 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (550000000) TO (600000000); CREATE TABLE public.{export_tb_copy}_part13 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (600000000) TO (650000000); CREATE TABLE public.{export_tb_copy}_part14 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (650000000) TO (700000000); CREATE TABLE public.{export_tb_copy}_part15 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (700000000) TO (750000000); CREATE TABLE public.{export_tb_copy}_part16 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (750000000) TO (800000000); CREATE TABLE public.{export_tb_copy}_part17 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (800000000) TO (850000000); CREATE TABLE public.{export_tb_copy}_part18 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (850000000) TO (900000000); CREATE TABLE public.{export_tb_copy}_part19 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (900000000) TO (950000000); CREATE TABLE public.{export_tb_copy}_part20 PARTITION OF public.{export_tb_copy} FOR VALUES FROM (950000000) TO (1000000000); """ # 执行SQL语句 DBUtil.engine_exec_sql(engine, sql) print("导出的字段:", export_cols) partition_dict = { "site_name": site_name, } # 导出执行sqoop的sh编写 sh = CommonUtil.build_export_sh( site_name=site_name, db_type=db_type, hive_tb="dim_asin_variation_info", export_tb=export_table, col=export_cols, partition_dict=partition_dict ) client = SSHUtil.get_ssh_client() SSHUtil.exec_command_async(client, sh, ignore_err=False) client.close() # 重新获取数据库连接 engine = DBUtil.get_db_engine(db_type, site_name) while True: try: with engine.begin() as conn: # 创建索引 sql_index_create = f""" CREATE INDEX {export_tb_copy}_asin_btree ON public.{export_tb_copy} USING btree (asin); CREATE INDEX {export_tb_copy}_parent_asin_btree ON public.{export_tb_copy} USING btree (parent_asin);""" conn.execute(sql_index_create) print(f"sql_index_create: {sql_index_create}") # 交换表名 sql_1 = f"""alter table {export_tb_target} rename to {export_tb_target}_back; """ conn.execute(sql_1) sql_2 = f"""alter table {export_tb_copy} rename to {export_tb_target};""" conn.execute(sql_2) sql_3 = f"""alter table {export_tb_target}_back rename to {export_tb_copy};""" conn.execute(sql_3) # 互换索引名称 sql_index_exchange = f""" ALTER INDEX {export_tb_target}_asin_btree RENAME TO {export_tb_target}_asin_btree_temp; ALTER INDEX {export_tb_copy}_asin_btree RENAME TO {export_tb_target}_asin_btree; ALTER INDEX {export_tb_target}_asin_btree_temp RENAME TO {export_tb_copy}_asin_btree; """ print(f"sql_index_exchange: {sql_index_exchange}") conn.execute(sql_index_exchange) break except Exception as e: time.sleep(60) print("创建索引或者交换表名失败:", e) engine = DBUtil.get_db_engine(db_type, site_name)