Skip to content

Instantly share code, notes, and snippets.

@grapeot
Last active November 24, 2024 07:41
Show Gist options
  • Save grapeot/4d165a9a8cc939300f844024c3552989 to your computer and use it in GitHub Desktop.
Save grapeot/4d165a9a8cc939300f844024c3552989 to your computer and use it in GitHub Desktop.
A reference implementation for performing offline inference using multiple GPUs, with each GPU hosting one instance of the model. Surprisingly, I couldn't find existing tools that easily support this. Therefore, I had to manually launch several server instances on different ports and use Ray's data set parallelization along with a server manager…
# The VLLM servers were launched using something like:
# CUDA_VISIBLE_DEVICES=2 vllm serve Qwen/Qwen2.5-32B-Instruct-GPTQ-Int4 --quantization gptq --max-model-len 4096 --port 8003
from openai import OpenAI
import ray
from ray.util import ActorPool
from tqdm import tqdm
def read_file(file_path: str) -> list[str]:
"""
Read lines from a file.
Args:
file_path (str): The path to the file to be read.
Returns:
list[str]: A list of lines from the file.
"""
with open(file_path, 'r', encoding='utf-8') as file:
return file.readlines()
def split_into_blocks(lines: list[str], max_words: int = 1000) -> list[str]:
"""
Split lines into blocks based on a maximum word count.
Args:
lines (list[str]): The lines to be split.
max_words (int): The maximum number of words per block.
Returns:
list[str]: A list of blocks, each containing up to `max_words` words.
"""
blocks = []
current_block = []
word_count = 0
for line in lines:
words_in_line = len(line.split())
if word_count + words_in_line > max_words and current_block:
blocks.append('\n'.join(current_block))
current_block = [line]
word_count = words_in_line
else:
current_block.append(line)
word_count += words_in_line
if current_block:
blocks.append('\n'.join(current_block))
return blocks
@ray.remote
class OpenAIPredictor:
def __init__(self, port: int):
"""
Initialize the OpenAIPredictor with a PortManager.
Args:
port_manager (ray.actor.ActorHandle): The PortManager actor handle.
"""
self.port = port
print(f'Init OpenAI Predictor on port {port}')
def translate(self, block: str) -> str:
"""
Translate blocks of text in the batch using OpenAI's API.
Args:
batch (DataFrame): A pandas DataFrame containing a column 'block' with text to be translated.
Returns:
DataFrame: The input DataFrame with an additional column 'translated_blocks' containing the translated text.
"""
messages = [
{"role": "user", "content": f"把下面的英语翻译成中文。只返回翻译后的文本,不要其他文本或注释。不要做任何总结,缩略和演绎,就是单纯的字面翻译。\n```\n{block}\n```"}
]
# Generate outputs
client = OpenAI(
base_url=f"http://localhost:{self.port}/v1",
api_key="token-abc123",
)
completion = client.chat.completions.create(
model="Qwen/Qwen2.5-32B-Instruct-GPTQ-Int4",
messages=messages
)
return completion.choices[0].message.content
def translate_in_batches(blocks: list[str]) -> list[str]:
"""
Translate blocks of text in batches using the OpenAIPredictor.
Args:
blocks (list[str]): A list of text blocks to be translated.
Returns:
list[str]: A list of translated text blocks.
"""
actors = [OpenAIPredictor.remote(port) for port in [8001, 8002, 8003]]
pool = ActorPool(actors)
translated_blocks = pool.map(lambda actor, block: actor.translate.remote(block), tqdm(blocks))
return translated_blocks
def main() -> None:
"""
Main function to read, translate, and write the script file.
"""
ray.init(num_gpus=3)
file_path = "meta_script.txt" # Replace with your file path
lines = read_file(file_path)
blocks = split_into_blocks(lines, 1000)
translated_blocks = translate_in_batches(blocks)
output_file_path = 'translated_meta_script.txt'
with open(output_file_path, 'w') as f:
for block in translated_blocks:
f.write(block + '\n')
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment