From 3b1ee803e88e3cddc5f63739033291d98f1df891 Mon Sep 17 00:00:00 2001 From: MacX19 Date: Thu, 7 Nov 2024 00:24:12 +0300 Subject: [PATCH] Input/OutputStream support --- src/one/nio/serial/DataStream.java | 39 +++-- .../nio/serial/DeserializeInputStream.java | 133 ++++++++++++++++++ src/one/nio/serial/PersistOutputStream.java | 95 +++++++++++++ src/one/nio/serial/PersistStream.java | 2 +- test/one/nio/serial/StreamsTest.java | 38 +++++ 5 files changed, 293 insertions(+), 14 deletions(-) create mode 100644 src/one/nio/serial/DeserializeInputStream.java create mode 100644 src/one/nio/serial/PersistOutputStream.java diff --git a/src/one/nio/serial/DataStream.java b/src/one/nio/serial/DataStream.java index a74f75e..4a2a6a6 100755 --- a/src/one/nio/serial/DataStream.java +++ b/src/one/nio/serial/DataStream.java @@ -183,7 +183,8 @@ public void writeFrom(long address, int len) throws IOException { } public int read() throws IOException { - return unsafe.getByte(array, alloc(1)); + long offset = alloc(1); + return unsafe.getByte(array, offset); } public int read(byte[] b) throws IOException { @@ -197,11 +198,13 @@ public int read(byte[] b, int off, int len) throws IOException { } public void readFully(byte[] b) throws IOException { - unsafe.copyMemory(array, alloc(b.length), b, byteArrayOffset, b.length); + long offset = alloc(b.length); + unsafe.copyMemory(array, offset, b, byteArrayOffset, b.length); } public void readFully(byte[] b, int off, int len) throws IOException { - unsafe.copyMemory(array, alloc(len), b, byteArrayOffset + off, len); + long offset = alloc(len); + unsafe.copyMemory(array, offset, b, byteArrayOffset + off, len); } public long skip(long n) throws IOException { @@ -215,35 +218,43 @@ public int skipBytes(int n) throws IOException { } public boolean readBoolean() throws IOException { - return unsafe.getBoolean(array, alloc(1)); + long offset = alloc(1); + return unsafe.getBoolean(array, offset); } public byte readByte() throws IOException { - return unsafe.getByte(array, alloc(1)); + long offset = alloc(1); + return unsafe.getByte(array, offset); } public int readUnsignedByte() throws IOException { - return unsafe.getByte(array, alloc(1)) & 0xff; + long offset = alloc(1); + return unsafe.getByte(array, offset) & 0xff; } public short readShort() throws IOException { - return Short.reverseBytes(unsafe.getShort(array, alloc(2))); + long offset = alloc(2); + return Short.reverseBytes(unsafe.getShort(array, offset)); } public int readUnsignedShort() throws IOException { - return Short.reverseBytes(unsafe.getShort(array, alloc(2))) & 0xffff; + long offset = alloc(2); + return Short.reverseBytes(unsafe.getShort(array, offset)) & 0xffff; } public char readChar() throws IOException { - return Character.reverseBytes(unsafe.getChar(array, alloc(2))); + long offset = alloc(2); + return Character.reverseBytes(unsafe.getChar(array, offset)); } public int readInt() throws IOException { - return Integer.reverseBytes(unsafe.getInt(array, alloc(4))); + long offset = alloc(4); + return Integer.reverseBytes(unsafe.getInt(array, offset)); } public long readLong() throws IOException { - return Long.reverseBytes(unsafe.getLong(array, alloc(8))); + long offset = alloc(8); + return Long.reverseBytes(unsafe.getLong(array, offset)); } public float readFloat() throws IOException { @@ -276,7 +287,8 @@ public String readUTF() throws IOException { if (length > 0x7fff) { length = (length & 0x7fff) << 16 | readUnsignedShort(); } - return Utf8.read(array, alloc(length), length); + long offset = alloc(length); + return Utf8.read(array, offset, length); } public Object readObject() throws IOException, ClassNotFoundException { @@ -309,7 +321,8 @@ public void read(ByteBuffer dst) throws IOException { } public void readTo(long address, int len) throws IOException { - unsafe.copyMemory(array, alloc(len), null, address, len); + long offset = alloc(len); + unsafe.copyMemory(array, offset, null, address, len); } public ByteBuffer byteBuffer(int len) throws IOException { diff --git a/src/one/nio/serial/DeserializeInputStream.java b/src/one/nio/serial/DeserializeInputStream.java new file mode 100644 index 0000000..7d22ec1 --- /dev/null +++ b/src/one/nio/serial/DeserializeInputStream.java @@ -0,0 +1,133 @@ +package one.nio.serial; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +public class DeserializeInputStream extends InputStream { + private final InternalDeserializeStream stream; + private final InputStream in; + + public DeserializeInputStream(InputStream in) { + this.stream = new InternalDeserializeStream(); + this.in = in; + } + + public DeserializeInputStream(InputStream in, int initBufferSize) { + this.stream = new InternalDeserializeStream(initBufferSize); + this.in = in; + } + + @Override + public int read() throws IOException { + return in.read(); + } + + @Override + public int read(byte[] b) throws IOException { + return in.read(b); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + return in.read(b, off, len); + } + + public Object readObject() throws IOException, ClassNotFoundException { + stream.ensureOpen(); + return stream.readObject(); + } + + @Override + public long skip(long n) throws IOException { + return in.skip(n); + } + + @Override + public int available() throws IOException { + return in.available(); + } + + @Override + public void close() throws IOException { + try { + stream.close(); + } finally { + in.close(); + } + } + + private class InternalDeserializeStream extends DeserializeStream { + + public InternalDeserializeStream() { + super(new byte[400]); + limit = offset; + } + + public InternalDeserializeStream(int capacity) { + super(new byte[capacity]); + limit = offset; + } + + @Override + public ByteBuffer byteBuffer(int len) throws IOException { + ByteBuffer byteBuffer = ByteBuffer.allocate(len); + read(byteBuffer); + return byteBuffer; + } + + @Override + protected long alloc(int size) throws IOException { + if (size > array.length) { + int newSize = Math.max(size, array.length * 2); + byte[] newArray = new byte[newSize]; + if (offset < limit) { + int position = (int) (offset - address); + int len = (int) (limit - offset); + System.arraycopy(array, position, newArray, 0, len); + limit = limit - position; + } else { + limit = address; + } + array = newArray; + offset = address; + } + if (offset + size > limit) fillArray(size); + long currentOffset = offset; + if ((offset = currentOffset + size) > limit) throw new IndexOutOfBoundsException(); + return currentOffset; + } + + private void fillArray(int minSize) throws IOException { + if (offset - address + minSize > array.length) shiftArray(); + int size = (int) (limit - offset); + while (size < minSize) { + int position = (int) (limit - address); + int bytes = in.read(array, position, array.length - position); + if (bytes > 0) { + size += bytes; + limit += bytes; + } else if (bytes == -1) throw new EOFException("Unexpected end of input stream"); + } + } + + private void shiftArray() { + int position = (int) (offset - address); + int len = (int) (limit - offset); + if (len > 0) System.arraycopy(array, position, array, 0, len); + offset = address; + limit = limit - position; + } + + private void ensureOpen() throws IOException { + if (array == null) throw new IOException("Stream closed"); + } + + @Override + public void close() { + super.close(); + array = null; + } + } +} diff --git a/src/one/nio/serial/PersistOutputStream.java b/src/one/nio/serial/PersistOutputStream.java new file mode 100644 index 0000000..b137580 --- /dev/null +++ b/src/one/nio/serial/PersistOutputStream.java @@ -0,0 +1,95 @@ +package one.nio.serial; + +import java.io.IOException; +import java.io.OutputStream; + +public class PersistOutputStream extends OutputStream { + private final InternalPersistStream stream; + private final OutputStream out; + + public PersistOutputStream(OutputStream out) { + this.stream = new InternalPersistStream(); + this.out = out; + } + + public PersistOutputStream(OutputStream out, int initBufferSize) { + this.stream = new InternalPersistStream(initBufferSize); + this.out = out; + } + + @Override + public void write(int val) throws IOException { + out.write(val); + } + + @Override + public void write(byte[] b) throws IOException { + out.write(b); + } + + @Override + public void write(byte[] buf, int off, int len) throws IOException { + out.write(buf, off, len); + } + + public void writeObject(Object obj) throws IOException { + stream.ensureOpen(); + stream.writeObject(obj); + stream.flushArray(); + } + + @Override + public void flush() throws IOException { + stream.flushArray(); + out.flush(); + } + + @Override + public void close() throws IOException { + try (OutputStream out = this.out; InternalPersistStream stream = this.stream) { + stream.flushArray(); + } + } + + private class InternalPersistStream extends PersistStream { + + public InternalPersistStream() { + } + + public InternalPersistStream(int capacity) { + super(capacity); + } + + @Override + protected long alloc(int size) throws IOException { + if (offset + size > limit) { + if (flushArray()) { + return alloc(size); + } else { + return super.alloc(size); + } + } + long currentOffset = offset; + offset = currentOffset + size; + return currentOffset; + } + + private boolean flushArray() throws IOException { + if (array == null || offset == address) return false; + int position = (int) (offset - address); + out.write(array, 0, position); + offset = address; + return true; + } + + private void ensureOpen() throws IOException { + if (array == null) throw new IOException("Stream closed"); + } + + @Override + public void close() { + super.close(); + array = null; + } + } +} diff --git a/src/one/nio/serial/PersistStream.java b/src/one/nio/serial/PersistStream.java index a5bf714..b476d6c 100755 --- a/src/one/nio/serial/PersistStream.java +++ b/src/one/nio/serial/PersistStream.java @@ -69,7 +69,7 @@ public void writeObject(Object obj) throws IOException { } @Override - protected long alloc(int size) { + protected long alloc(int size) throws IOException { long currentOffset = offset; if ((offset = currentOffset + size) > limit) { limit = Math.max(offset, limit * 2); diff --git a/test/one/nio/serial/StreamsTest.java b/test/one/nio/serial/StreamsTest.java index 31e2e67..64b4af5 100755 --- a/test/one/nio/serial/StreamsTest.java +++ b/test/one/nio/serial/StreamsTest.java @@ -16,11 +16,19 @@ package one.nio.serial; +import one.nio.serial.sample.Chat; +import one.nio.serial.sample.Sample; import org.junit.Test; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.Serializable; +import java.math.BigDecimal; +import java.util.Arrays; +import java.util.List; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; public class StreamsTest { @@ -86,4 +94,34 @@ public int hashCode() { return result; } } + + @Test + public void testInputOutputStreams() throws IOException, ClassNotFoundException { + Place place = new Place("name", "altName", "intName"); + Chat chat = Sample.createChat(); + String str = "თავდაცვის"; + BigDecimal bigDecimal = new BigDecimal("999.999999999"); + byte[] bytes = new byte[1000]; + Arrays.fill(bytes, 0, 500, (byte) 2); + Arrays.fill(bytes, 500, 1000, (byte) 1); + List list = Arrays.asList(1, 2, 3, 4, 5); + byte[] data; + try (ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); PersistOutputStream out = new PersistOutputStream(byteStream)) { + out.writeObject(place); + out.writeObject(chat); + out.writeObject(str); + out.writeObject(bigDecimal); + out.writeObject(bytes); + out.writeObject(list); + data = byteStream.toByteArray(); + } + try (ByteArrayInputStream byteStream = new ByteArrayInputStream(data); DeserializeInputStream in = new DeserializeInputStream(byteStream)) { + assertEquals(place, in.readObject()); + assertEquals(chat, in.readObject()); + assertEquals(str, in.readObject()); + assertEquals(bigDecimal, in.readObject()); + assertArrayEquals(bytes, (byte[]) in.readObject()); + assertEquals(list, in.readObject()); + } + } }