Skip to content

Commit

Permalink
formatting / linting
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexCheema committed Jul 28, 2024
1 parent f1bd5fe commit ce76103
Show file tree
Hide file tree
Showing 42 changed files with 3,828 additions and 2,435 deletions.
472 changes: 472 additions & 0 deletions .pylintrc

Large diffs are not rendered by default.

568 changes: 318 additions & 250 deletions exo/api/chatgpt_api.py

Large diffs are not rendered by default.

212 changes: 111 additions & 101 deletions exo/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,124 +18,134 @@
"""

def get_system_info():
if psutil.MACOS:
if platform.machine() == 'arm64':
return "Apple Silicon Mac"
elif platform.machine() in ['x86_64', 'i386']:
return "Intel Mac"
else:
return "Unknown Mac architecture"
elif psutil.LINUX:
return "Linux"
if psutil.MACOS:
if platform.machine() == "arm64":
return "Apple Silicon Mac"
elif platform.machine() in ["x86_64", "i386"]:
return "Intel Mac"
else:
return "Non-Mac, non-Linux system"
return "Unknown Mac architecture"
elif psutil.LINUX:
return "Linux"
else:
return "Non-Mac, non-Linux system"


def get_inference_engine(inference_engine_name):
if inference_engine_name == "mlx":
from exo.inference.mlx.sharded_inference_engine import MLXDynamicShardInferenceEngine
return MLXDynamicShardInferenceEngine()
elif inference_engine_name == "tinygrad":
from exo.inference.tinygrad.inference import TinygradDynamicShardInferenceEngine
return TinygradDynamicShardInferenceEngine()
else:
raise ValueError(f"Inference engine {inference_engine_name} not supported")

def find_available_port(host: str = '', min_port: int = 49152, max_port: int = 65535) -> int:
used_ports_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), '.exo_used_ports')

def read_used_ports():
if os.path.exists(used_ports_file):
with open(used_ports_file, 'r') as f:
return [int(line.strip()) for line in f if line.strip().isdigit()]
return []

def write_used_port(port, used_ports):
with open(used_ports_file, 'w') as f:
print(used_ports[-19:])
for p in used_ports[-19:] + [port]:
f.write(f"{p}\n")

used_ports = read_used_ports()
available_ports = set(range(min_port, max_port + 1)) - set(used_ports)

while available_ports:
port = random.choice(list(available_ports))
if DEBUG >= 2: print(f"Trying to find available port {port=}")
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind((host, port))
write_used_port(port, used_ports)
return port
except socket.error:
available_ports.remove(port)

raise RuntimeError("No available ports in the specified range")
if inference_engine_name == "mlx":
from exo.inference.mlx.sharded_inference_engine import MLXDynamicShardInferenceEngine

return MLXDynamicShardInferenceEngine()
elif inference_engine_name == "tinygrad":
from exo.inference.tinygrad.inference import TinygradDynamicShardInferenceEngine

return TinygradDynamicShardInferenceEngine()
else:
raise ValueError(f"Inference engine {inference_engine_name} not supported")


def find_available_port(host: str = "", min_port: int = 49152, max_port: int = 65535) -> int:
used_ports_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), ".exo_used_ports")

def read_used_ports():
if os.path.exists(used_ports_file):
with open(used_ports_file, "r") as f:
return [int(line.strip()) for line in f if line.strip().isdigit()]
return []

def write_used_port(port, used_ports):
with open(used_ports_file, "w") as f:
print(used_ports[-19:])
for p in used_ports[-19:] + [port]:
f.write(f"{p}\n")

used_ports = read_used_ports()
available_ports = set(range(min_port, max_port + 1)) - set(used_ports)

while available_ports:
port = random.choice(list(available_ports))
if DEBUG >= 2: print(f"Trying to find available port {port=}")
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind((host, port))
write_used_port(port, used_ports)
return port
except socket.error:
available_ports.remove(port)

raise RuntimeError("No available ports in the specified range")


def print_exo():
print(exo_text)
print(exo_text)


def print_yellow_exo():
yellow = "\033[93m" # ANSI escape code for yellow
reset = "\033[0m" # ANSI escape code to reset color
print(f"{yellow}{exo_text}{reset}")
yellow = "\033[93m" # ANSI escape code for yellow
reset = "\033[0m" # ANSI escape code to reset color
print(f"{yellow}{exo_text}{reset}")


def terminal_link(uri, label=None):
if label is None:
label = uri
parameters = ''
if label is None:
label = uri
parameters = ""

# OSC 8 ; params ; URI ST <name> OSC 8 ;; ST
escape_mask = "\033]8;{};{}\033\\{}\033]8;;\033\\"

return escape_mask.format(parameters, uri, label)

# OSC 8 ; params ; URI ST <name> OSC 8 ;; ST
escape_mask = '\033]8;{};{}\033\\{}\033]8;;\033\\'

return escape_mask.format(parameters, uri, label)
T = TypeVar("T")
K = TypeVar("K")

T = TypeVar('T')
K = TypeVar('K')

class AsyncCallback(Generic[T]):
def __init__(self) -> None:
self.condition: asyncio.Condition = asyncio.Condition()
self.result: Optional[Tuple[T, ...]] = None
self.observers: list[Callable[..., None]] = []

async def wait(self,
check_condition: Callable[..., bool],
timeout: Optional[float] = None) -> Tuple[T, ...]:
async with self.condition:
await asyncio.wait_for(self.condition.wait_for(lambda: self.result is not None and check_condition(*self.result)), timeout)
assert self.result is not None # for type checking
return self.result

def on_next(self, callback: Callable[..., None]) -> None:
self.observers.append(callback)

def set(self, *args: T) -> None:
self.result = args
for observer in self.observers:
observer(*args)
asyncio.create_task(self.notify())

async def notify(self) -> None:
async with self.condition:
self.condition.notify_all()
def __init__(self) -> None:
self.condition: asyncio.Condition = asyncio.Condition()
self.result: Optional[Tuple[T, ...]] = None
self.observers: list[Callable[..., None]] = []

async def wait(self, check_condition: Callable[..., bool], timeout: Optional[float] = None) -> Tuple[T, ...]:
async with self.condition:
await asyncio.wait_for(
self.condition.wait_for(lambda: self.result is not None and check_condition(*self.result)), timeout
)
assert self.result is not None # for type checking
return self.result

def on_next(self, callback: Callable[..., None]) -> None:
self.observers.append(callback)

def set(self, *args: T) -> None:
self.result = args
for observer in self.observers:
observer(*args)
asyncio.create_task(self.notify())

async def notify(self) -> None:
async with self.condition:
self.condition.notify_all()


class AsyncCallbackSystem(Generic[K, T]):
def __init__(self) -> None:
self.callbacks: Dict[K, AsyncCallback[T]] = {}
def __init__(self) -> None:
self.callbacks: Dict[K, AsyncCallback[T]] = {}

def register(self, name: K) -> AsyncCallback[T]:
if name not in self.callbacks:
self.callbacks[name] = AsyncCallback[T]()
return self.callbacks[name]
def register(self, name: K) -> AsyncCallback[T]:
if name not in self.callbacks:
self.callbacks[name] = AsyncCallback[T]()
return self.callbacks[name]

def deregister(self, name: K) -> None:
if name in self.callbacks:
del self.callbacks[name]
def deregister(self, name: K) -> None:
if name in self.callbacks:
del self.callbacks[name]

def trigger(self, name: K, *args: T) -> None:
if name in self.callbacks:
self.callbacks[name].set(*args)
def trigger(self, name: K, *args: T) -> None:
if name in self.callbacks:
self.callbacks[name].set(*args)

def trigger_all(self, *args: T) -> None:
for callback in self.callbacks.values():
callback.set(*args)
def trigger_all(self, *args: T) -> None:
for callback in self.callbacks.values():
callback.set(*args)
72 changes: 51 additions & 21 deletions exo/inference/debug_inference_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,64 @@
import asyncio
import numpy as np


# An inference engine should work the same for any number of Shards, as long as the Shards are continuous.
async def test_inference_engine(inference_engine_1: InferenceEngine, inference_engine_2: InferenceEngine, model_id: str):
from exo.inference.tinygrad.inference import Tokenizer
from pathlib import Path
_tokenizer = Tokenizer(str(Path(model_id) / "tokenizer.model"))
async def test_inference_engine(
inference_engine_1: InferenceEngine, inference_engine_2: InferenceEngine, model_id: str
):
from exo.inference.tinygrad.inference import Tokenizer
from pathlib import Path

_tokenizer = Tokenizer(str(Path(model_id) / "tokenizer.model"))

prompt = "In a single word only, what is the last name of the president of the United States? "
resp_full, inference_state_full, _ = await inference_engine_1.infer_prompt("A", shard=Shard(model_id=model_id, start_layer=0, end_layer=31, n_layers=32), prompt=prompt)
next_resp_full, next_inference_state_full, _ = await inference_engine_1.infer_tensor("A", shard=Shard(model_id=model_id, start_layer=0, end_layer=31, n_layers=32), input_data=resp_full, inference_state=inference_state_full)
prompt = "In a single word only, what is the last name of the president of the United States? "
resp_full, inference_state_full, _ = await inference_engine_1.infer_prompt(
"A", shard=Shard(model_id=model_id, start_layer=0, end_layer=31, n_layers=32), prompt=prompt
)
next_resp_full, next_inference_state_full, _ = await inference_engine_1.infer_tensor(
"A",
shard=Shard(model_id=model_id, start_layer=0, end_layer=31, n_layers=32),
input_data=resp_full,
inference_state=inference_state_full,
)

resp1, inference_state_1, _ = await inference_engine_1.infer_prompt("B", shard=Shard(model_id=model_id, start_layer=0, end_layer=30, n_layers=32), prompt=prompt)
resp2, inference_state_2, _ = await inference_engine_2.infer_tensor("B", shard=Shard(model_id=model_id, start_layer=31, end_layer=31, n_layers=32), input_data=resp1, inference_state=inference_state_1)
resp3, inference_state_3, _ = await inference_engine_1.infer_tensor("B", shard=Shard(model_id=model_id, start_layer=0, end_layer=30, n_layers=32), input_data=resp2, inference_state=inference_state_2)
resp4, inference_state_4, _ = await inference_engine_2.infer_tensor("B", shard=Shard(model_id=model_id, start_layer=31, end_layer=31, n_layers=32), input_data=resp3, inference_state=inference_state_3)
resp1, inference_state_1, _ = await inference_engine_1.infer_prompt(
"B", shard=Shard(model_id=model_id, start_layer=0, end_layer=30, n_layers=32), prompt=prompt
)
resp2, inference_state_2, _ = await inference_engine_2.infer_tensor(
"B",
shard=Shard(model_id=model_id, start_layer=31, end_layer=31, n_layers=32),
input_data=resp1,
inference_state=inference_state_1,
)
resp3, inference_state_3, _ = await inference_engine_1.infer_tensor(
"B",
shard=Shard(model_id=model_id, start_layer=0, end_layer=30, n_layers=32),
input_data=resp2,
inference_state=inference_state_2,
)
resp4, inference_state_4, _ = await inference_engine_2.infer_tensor(
"B",
shard=Shard(model_id=model_id, start_layer=31, end_layer=31, n_layers=32),
input_data=resp3,
inference_state=inference_state_3,
)

print(f"{resp2=}")
print(f"full: {_tokenizer.decode(resp_full)}")
print(f"next full: {_tokenizer.decode(next_resp_full)}")
print(f"resp2: {_tokenizer.decode(resp2)}")
print(f"{resp4=}")
print(f"resp4: {_tokenizer.decode(resp4)}")
print(f"{resp2=}")
print(f"full: {_tokenizer.decode(resp_full)}")
print(f"next full: {_tokenizer.decode(next_resp_full)}")
print(f"resp2: {_tokenizer.decode(resp2)}")
print(f"{resp4=}")
print(f"resp4: {_tokenizer.decode(resp4)}")

assert np.array_equal(resp_full, resp2)
assert np.array_equal(next_resp_full, resp4)
assert np.array_equal(resp_full, resp2)
assert np.array_equal(next_resp_full, resp4)


asyncio.run(test_inference_engine(
asyncio.run(
test_inference_engine(
TinygradDynamicShardInferenceEngine(),
TinygradDynamicShardInferenceEngine(),
"llama3-8b-sfr",
))
)
)
17 changes: 11 additions & 6 deletions exo/inference/inference_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,16 @@
from abc import ABC, abstractmethod
from .shard import Shard


class InferenceEngine(ABC):
@abstractmethod
async def infer_tensor(self, request_id: str, shard: Shard, input_data: np.ndarray, inference_state: Optional[str] = None) -> Tuple[np.ndarray, str, bool]:
pass
@abstractmethod
async def infer_tensor(
self, request_id: str, shard: Shard, input_data: np.ndarray, inference_state: Optional[str] = None
) -> Tuple[np.ndarray, str, bool]:
pass

@abstractmethod
async def infer_prompt(self, request_id: str, shard: Shard, prompt: str, inference_state: Optional[str] = None) -> Tuple[np.ndarray, str, bool]:
pass
@abstractmethod
async def infer_prompt(
self, request_id: str, shard: Shard, prompt: str, inference_state: Optional[str] = None
) -> Tuple[np.ndarray, str, bool]:
pass
Loading

0 comments on commit ce76103

Please sign in to comment.