import multiprocessing import os import sys import time import traceback def main(site_name='us'): while True: try: os.system(f"/opt/module/spark/bin/spark-submit --master yarn --driver-memory 1g --executor-memory 4g --executor-cores 1 --num-executors 1 --queue spark /opt/module/spark/demo/py_demo/image_search/pictures_dwd_id_index.py {site_name}") except Exception as e: print(e, traceback.format_exc()) time.sleep(20) error = "ValueError: Length mismatch: Expected axis has 0 elements" if error in e: print("当前已经跑完所有block块id对应的index关系,退出") quit() continue if __name__ == "__main__": site_name = sys.argv[1] process_num = int(sys.argv[2]) # 参数1:进程数 processes = [] for _ in range(process_num): # 用于设定进程数量 process = multiprocessing.Process(target=main, args=(site_name, )) process.start() processes.append(process) # 等待所有进程完成 for process in processes: process.join()