import openai import tiktoken from cachetools import cached import random from http import HTTPStatus import dashscope from app.util.common_util import CommonUtil global_req_count = 0 # 当前队列 global_req_queue_dict = {} def get_api_key(platform: str): import threading lock = threading.Lock() global global_req_count try: global_req_count = global_req_count + 1 lock.acquire() platform_key = { "gpt": [ # cjy chat_gpt 3.5 key # "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", # "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", # "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", # "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", # "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", # "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", # 目前key均不可用 "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" ], "qwen": [ # wjc qianwen key "sk-ea265337fdc644f58822e13947500368" ] } key_pool = platform_key.get(platform) # 加锁轮训获取密匙 index = global_req_count % len(key_pool) return key_pool[index] finally: lock.release() pass @cached(cache={}) def check_ip(host: str): from tcping import Ping ping = Ping(host=host, timeout=1) ping.ping(2) result = ping.result.rows[0] return result.successed > 0 def num_tokens_from_messages(messages, model="gpt-3.5-turbo"): """Returns the number of tokens used by a list of messages.""" encoding = tiktoken.encoding_for_model(model) if model == "gpt-3.5-turbo": print("Warning: gpt-3.5-turbo may change over time. Returning num tokens assuming gpt-3.5-turbo-0301.") return num_tokens_from_messages(messages, model="gpt-3.5-turbo-0301") elif model == "gpt-4": print("Warning: gpt-4 may change over time. Returning num tokens assuming gpt-4-0314.") return num_tokens_from_messages(messages, model="gpt-4-0314") elif model == "gpt-3.5-turbo-0301": tokens_per_message = 4 # every message follows <im_start>{role/name}\n{content}<im_end>\n tokens_per_name = -1 # if there's a name, the role is omitted elif model == "gpt-4-0314": tokens_per_message = 3 tokens_per_name = 1 else: raise NotImplementedError( f""" num_tokens_from_messages() is not implemented for model {model}. See https://github.com/openai/openai-python/blob/main/chatml.md for information on how messages are converted to tokens.""") num_tokens = 0 for message in messages: num_tokens += tokens_per_message for key, value in message.items(): num_tokens += len(encoding.encode(value)) if key == "name": num_tokens += tokens_per_name num_tokens += 2 # every reply is primed with <im_start>assistant return num_tokens def qwen_num_tokens_from_input(messages, model="qwen-max"): dashscope.api_key = get_api_key("qwen") response = dashscope.Tokenization.call(model=model, messages=messages, ) if response.status_code == HTTPStatus.OK: num_tokens = response.usage['input_tokens'] else: raise NotImplementedError('Failed request_id: %s, status_code: %s, code: %s, message:%s' % (response.request_id, response.status_code, response.code, response.message)) return num_tokens class GptChatSession: def __init__(self, ): self.messages = [ # system message first, it helps set the behavior of the assistant {"role": "system", "content": "You are a research assistant."}, ] def send_msg(self, msg: str): assert msg is not None, "msg 不能为空!" assert check_ip("api.openai.com"), "ai网络连接失败,请检查!!" self.messages.append( {"role": "user", "content": msg}, ) model = "gpt-3.5-turbo" num = num_tokens_from_messages(self.messages, model) # https://platform.openai.com/docs/models/gpt-3-5 max_token_num = 4096 assert num <= max_token_num, "token超过最大长度,无法解析!!" start = CommonUtil.current_time() global chatCompletion # 最大重试次数 try: openai.api_key = get_api_key("gpt") chatCompletion = openai.ChatCompletion.create( model=model, messages=self.messages ) except openai.error.OpenAIError as err: raise Exception(f"chatgpt调用失败,{err},请重试!!") pass end = CommonUtil.current_time() cost_time = end - start reply = chatCompletion.choices[0].message.content cost_token = chatCompletion.usage.total_tokens self.messages.append({"role": "assistant", "content": reply}) return reply, cost_token, cost_time # 通义千问消息发送请求 def send_message_qwen(self, msg: str, model="qwen-max"): assert msg is not None, "msg 不能为空!" self.messages.append( {"role": "user", "content": msg}, ) # todo 添加token验证 num = qwen_num_tokens_from_input(messages=self.messages, model=model) # https://platform.openai.com/docs/models/gpt-3-5 print(f"传入的message的token数量为:{num}") max_token_num = 6000 assert num <= max_token_num, "token超过最大长度,无法解析!!" start = CommonUtil.current_time() global chatCompletion dashscope.api_key = get_api_key("qwen") chatCompletion = dashscope.Generation.call( model=model, messages=self.messages, seed=random.randint(1, 10000), result_format='message' ) if chatCompletion.status_code == HTTPStatus.OK: end = CommonUtil.current_time() cost_time = end - start reply = chatCompletion.output.choices[0].message.content cost_token = chatCompletion.usage.total_tokens print("请求分析结果消耗时间:", cost_time) self.messages.append({"role": "assistant", "content": reply}) else: raise Exception( f"通义千问调用失败,status_code: {chatCompletion.status_code}, code: {chatCompletion.code}, message:{chatCompletion.message},请重试!!") pass self.messages.append({"role": "assistant", "content": reply}) return reply, cost_token, cost_time def send_message_qwen_stream(self, msg: str, model="qwen-max"): """ 通义千问消息发送请求 :param msg: :param model: :return: """ assert msg is not None, "msg 不能为空!" self.messages.append( {"role": "user", "content": msg}, ) num = qwen_num_tokens_from_input(messages=self.messages, model=model) print(f"传入的message的token数量为:{num}") max_token_num = 6000 assert num <= max_token_num, "token超过最大长度,无法解析!!" global chatCompletion dashscope.api_key = get_api_key("qwen") response_generator = dashscope.Generation.call( model=model, messages=self.messages, seed=random.randint(1, 10000), result_format='message', stream=True ) return response_generator if __name__ == '__main__': pass