Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Input/OutputStream support #86

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 26 additions & 13 deletions src/one/nio/serial/DataStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,8 @@ public void writeFrom(long address, int len) throws IOException {
}

public int read() throws IOException {
return unsafe.getByte(array, alloc(1));
long offset = alloc(1);
return unsafe.getByte(array, offset);
}

public int read(byte[] b) throws IOException {
Expand All @@ -197,11 +198,13 @@ public int read(byte[] b, int off, int len) throws IOException {
}

public void readFully(byte[] b) throws IOException {
unsafe.copyMemory(array, alloc(b.length), b, byteArrayOffset, b.length);
long offset = alloc(b.length);
unsafe.copyMemory(array, offset, b, byteArrayOffset, b.length);
}

public void readFully(byte[] b, int off, int len) throws IOException {
unsafe.copyMemory(array, alloc(len), b, byteArrayOffset + off, len);
long offset = alloc(len);
unsafe.copyMemory(array, offset, b, byteArrayOffset + off, len);
}

public long skip(long n) throws IOException {
Expand All @@ -215,35 +218,43 @@ public int skipBytes(int n) throws IOException {
}

public boolean readBoolean() throws IOException {
return unsafe.getBoolean(array, alloc(1));
long offset = alloc(1);
return unsafe.getBoolean(array, offset);
}

public byte readByte() throws IOException {
return unsafe.getByte(array, alloc(1));
long offset = alloc(1);
return unsafe.getByte(array, offset);
}

public int readUnsignedByte() throws IOException {
return unsafe.getByte(array, alloc(1)) & 0xff;
long offset = alloc(1);
return unsafe.getByte(array, offset) & 0xff;
}

public short readShort() throws IOException {
return Short.reverseBytes(unsafe.getShort(array, alloc(2)));
long offset = alloc(2);
return Short.reverseBytes(unsafe.getShort(array, offset));
}

public int readUnsignedShort() throws IOException {
return Short.reverseBytes(unsafe.getShort(array, alloc(2))) & 0xffff;
long offset = alloc(2);
return Short.reverseBytes(unsafe.getShort(array, offset)) & 0xffff;
}

public char readChar() throws IOException {
return Character.reverseBytes(unsafe.getChar(array, alloc(2)));
long offset = alloc(2);
return Character.reverseBytes(unsafe.getChar(array, offset));
}

public int readInt() throws IOException {
return Integer.reverseBytes(unsafe.getInt(array, alloc(4)));
long offset = alloc(4);
return Integer.reverseBytes(unsafe.getInt(array, offset));
}

public long readLong() throws IOException {
return Long.reverseBytes(unsafe.getLong(array, alloc(8)));
long offset = alloc(8);
return Long.reverseBytes(unsafe.getLong(array, offset));
}

public float readFloat() throws IOException {
Expand Down Expand Up @@ -276,7 +287,8 @@ public String readUTF() throws IOException {
if (length > 0x7fff) {
length = (length & 0x7fff) << 16 | readUnsignedShort();
}
return Utf8.read(array, alloc(length), length);
long offset = alloc(length);
return Utf8.read(array, offset, length);
}

public Object readObject() throws IOException, ClassNotFoundException {
Expand Down Expand Up @@ -309,7 +321,8 @@ public void read(ByteBuffer dst) throws IOException {
}

public void readTo(long address, int len) throws IOException {
unsafe.copyMemory(array, alloc(len), null, address, len);
long offset = alloc(len);
unsafe.copyMemory(array, offset, null, address, len);
}

public ByteBuffer byteBuffer(int len) throws IOException {
Expand Down
133 changes: 133 additions & 0 deletions src/one/nio/serial/DeserializeInputStream.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package one.nio.serial;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;

public class DeserializeInputStream extends InputStream {
private final InternalDeserializeStream stream;
private final InputStream in;

public DeserializeInputStream(InputStream in) {
this.stream = new InternalDeserializeStream();
this.in = in;
}

public DeserializeInputStream(InputStream in, int initBufferSize) {
this.stream = new InternalDeserializeStream(initBufferSize);
this.in = in;
}

@Override
public int read() throws IOException {
return in.read();
}

@Override
public int read(byte[] b) throws IOException {
return in.read(b);
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
return in.read(b, off, len);
}

public Object readObject() throws IOException, ClassNotFoundException {
stream.ensureOpen();
return stream.readObject();
}

@Override
public long skip(long n) throws IOException {
return in.skip(n);
}

@Override
public int available() throws IOException {
return in.available();
}

@Override
public void close() throws IOException {
try {
stream.close();
} finally {
in.close();
}
}

private class InternalDeserializeStream extends DeserializeStream {

public InternalDeserializeStream() {
super(new byte[400]);
limit = offset;
}

public InternalDeserializeStream(int capacity) {
super(new byte[capacity]);
limit = offset;
}

@Override
public ByteBuffer byteBuffer(int len) throws IOException {
ByteBuffer byteBuffer = ByteBuffer.allocate(len);
read(byteBuffer);
return byteBuffer;
}

@Override
protected long alloc(int size) throws IOException {
if (size > array.length) {
int newSize = Math.max(size, array.length * 2);
byte[] newArray = new byte[newSize];
if (offset < limit) {
int position = (int) (offset - address);
int len = (int) (limit - offset);
System.arraycopy(array, position, newArray, 0, len);
limit = limit - position;
} else {
limit = address;
}
array = newArray;
offset = address;
}
if (offset + size > limit) fillArray(size);
long currentOffset = offset;
if ((offset = currentOffset + size) > limit) throw new IndexOutOfBoundsException();
return currentOffset;
}

private void fillArray(int minSize) throws IOException {
if (offset - address + minSize > array.length) shiftArray();
int size = (int) (limit - offset);
while (size < minSize) {
int position = (int) (limit - address);
int bytes = in.read(array, position, array.length - position);
if (bytes > 0) {
size += bytes;
limit += bytes;
} else if (bytes == -1) throw new EOFException("Unexpected end of input stream");
}
}

private void shiftArray() {
int position = (int) (offset - address);
int len = (int) (limit - offset);
if (len > 0) System.arraycopy(array, position, array, 0, len);
offset = address;
limit = limit - position;
}

private void ensureOpen() throws IOException {
if (array == null) throw new IOException("Stream closed");
}

@Override
public void close() {
super.close();
array = null;
}
}
}
95 changes: 95 additions & 0 deletions src/one/nio/serial/PersistOutputStream.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package one.nio.serial;

import java.io.IOException;
import java.io.OutputStream;

public class PersistOutputStream extends OutputStream {
private final InternalPersistStream stream;
private final OutputStream out;

public PersistOutputStream(OutputStream out) {
this.stream = new InternalPersistStream();
this.out = out;
}

public PersistOutputStream(OutputStream out, int initBufferSize) {
this.stream = new InternalPersistStream(initBufferSize);
this.out = out;
}

@Override
public void write(int val) throws IOException {
out.write(val);
}

@Override
public void write(byte[] b) throws IOException {
out.write(b);
}

@Override
public void write(byte[] buf, int off, int len) throws IOException {
out.write(buf, off, len);
}

public void writeObject(Object obj) throws IOException {
stream.ensureOpen();
stream.writeObject(obj);
stream.flushArray();
}

@Override
public void flush() throws IOException {
stream.flushArray();
out.flush();
}

@Override
public void close() throws IOException {
try (OutputStream out = this.out; InternalPersistStream stream = this.stream) {
stream.flushArray();
}
}

private class InternalPersistStream extends PersistStream {

public InternalPersistStream() {
}

public InternalPersistStream(int capacity) {
super(capacity);
}

@Override
protected long alloc(int size) throws IOException {
if (offset + size > limit) {
if (flushArray()) {
return alloc(size);
} else {
return super.alloc(size);
}
}
long currentOffset = offset;
offset = currentOffset + size;
return currentOffset;
}

private boolean flushArray() throws IOException {
if (array == null || offset == address) return false;
int position = (int) (offset - address);
out.write(array, 0, position);
offset = address;
return true;
}

private void ensureOpen() throws IOException {
if (array == null) throw new IOException("Stream closed");
}

@Override
public void close() {
super.close();
array = null;
}
}
}
2 changes: 1 addition & 1 deletion src/one/nio/serial/PersistStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void writeObject(Object obj) throws IOException {
}

@Override
protected long alloc(int size) {
protected long alloc(int size) throws IOException {
long currentOffset = offset;
if ((offset = currentOffset + size) > limit) {
limit = Math.max(offset, limit * 2);
Expand Down
38 changes: 38 additions & 0 deletions test/one/nio/serial/StreamsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,19 @@

package one.nio.serial;

import one.nio.serial.sample.Chat;
import one.nio.serial.sample.Sample;
import org.junit.Test;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.math.BigDecimal;
import java.util.Arrays;
import java.util.List;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;

public class StreamsTest {
Expand Down Expand Up @@ -86,4 +94,34 @@ public int hashCode() {
return result;
}
}

@Test
public void testInputOutputStreams() throws IOException, ClassNotFoundException {
Place place = new Place("name", "altName", "intName");
Chat chat = Sample.createChat();
String str = "თავდაცვის";
BigDecimal bigDecimal = new BigDecimal("999.999999999");
byte[] bytes = new byte[1000];
Arrays.fill(bytes, 0, 500, (byte) 2);
Arrays.fill(bytes, 500, 1000, (byte) 1);
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
byte[] data;
try (ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); PersistOutputStream out = new PersistOutputStream(byteStream)) {
out.writeObject(place);
out.writeObject(chat);
out.writeObject(str);
out.writeObject(bigDecimal);
out.writeObject(bytes);
out.writeObject(list);
data = byteStream.toByteArray();
}
try (ByteArrayInputStream byteStream = new ByteArrayInputStream(data); DeserializeInputStream in = new DeserializeInputStream(byteStream)) {
assertEquals(place, in.readObject());
assertEquals(chat, in.readObject());
assertEquals(str, in.readObject());
assertEquals(bigDecimal, in.readObject());
assertArrayEquals(bytes, (byte[]) in.readObject());
assertEquals(list, in.readObject());
}
}
}