diff --git a/artiq/master/worker.py b/artiq/master/worker.py index 36d5a202f6..a72bae3400 100644 --- a/artiq/master/worker.py +++ b/artiq/master/worker.py @@ -2,6 +2,7 @@ import os import asyncio import logging +import pickle import subprocess import time @@ -157,8 +158,16 @@ async def close(self, term_timeout=2.0): async def _send(self, obj, cancellable=True): assert self.io_lock.locked() - line = pyon.encode(obj) - self.ipc.write((line + "\n").encode()) + + data = pickle.dumps(obj) + size_str = (str(len(data)) + "\n").encode() + + # Write the size of the data, delimited by \n. Then, write the data + # itself. This might contain \n chars, but the receiving reader knows + # how many bytes it's expecting so it doesn't need delimiters + self.ipc.write(size_str) + self.ipc.write(data) + ifs = [self.ipc.drain()] if cancellable: ifs.append(self.closed.wait()) @@ -193,11 +202,25 @@ async def _recv(self, timeout): raise WorkerError( "Worker ended while attempting to receive data (RID {})". format(self.rid)) + + try: + data_size = int(line.decode()) + except Exception as e: + raise WorkerError( + "Worker sent invalid data_size (RID {}): %s".format(self.rid, line) + ) from e + + data = b"" + while len(data) < data_size: + data += await self.ipc.read(data_size - len(data)) + try: - obj = pyon.decode(line.decode()) + obj = pickle.loads(data) except: - raise WorkerError("Worker sent invalid PYON data (RID {})".format( - self.rid)) + raise WorkerError( + "Worker sent invalid picked data (RID {})".format(self.rid) + ) + return obj async def _handle_worker_requests(self): diff --git a/artiq/master/worker_impl.py b/artiq/master/worker_impl.py index 0467a8096a..a024892f49 100644 --- a/artiq/master/worker_impl.py +++ b/artiq/master/worker_impl.py @@ -6,6 +6,7 @@ process via IPC. """ +import pickle import sys import time import os @@ -37,13 +38,23 @@ def get_object(): - line = ipc.readline().decode() - return pyon.decode(line) + line = ipc.readline() + data_size = int(line.decode()) + + data = b'' + while len(data) < data_size: + data += ipc.read(data_size - len(data)) + + return pickle.loads(data) def put_object(obj): - ds = pyon.encode(obj) - ipc.write((ds + "\n").encode()) + # ds = pyon.encode(obj) + + data = pickle.dumps(obj) + size_str = (str(len(data)) + '\n').encode() + ipc.write(size_str) + ipc.write(data) def make_parent_action(action):