diff --git a/src/one/nio/rpc/stream/BaseStream.java b/src/one/nio/rpc/stream/BaseStream.java new file mode 100644 index 0000000..0da2f21 --- /dev/null +++ b/src/one/nio/rpc/stream/BaseStream.java @@ -0,0 +1,29 @@ +/* + * Copyright 2018 Odnoklassniki Ltd, Mail.Ru Group + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package one.nio.rpc.stream; + +import java.net.InetSocketAddress; + +public interface BaseStream extends AutoCloseable { + InetSocketAddress getLocalAddress(); + InetSocketAddress getRemoteAddress(); + + long getBytesRead(); + long getBytesWritten(); + + void close(); +} diff --git a/src/one/nio/rpc/stream/BidiStream.java b/src/one/nio/rpc/stream/BidiStream.java new file mode 100644 index 0000000..4d9932d --- /dev/null +++ b/src/one/nio/rpc/stream/BidiStream.java @@ -0,0 +1,28 @@ +/* + * Copyright 2018 Odnoklassniki Ltd, Mail.Ru Group + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package one.nio.rpc.stream; + +import java.io.IOException; + +public interface BidiStream extends SendStream, ReceiveStream { + void flush() throws IOException; + R sendAndGet(S object) throws IOException, ClassNotFoundException; + + static BidiStream create(StreamHandler> handler) { + return new StreamProxy<>(handler); + } +} diff --git a/src/one/nio/rpc/stream/ReceiveStream.java b/src/one/nio/rpc/stream/ReceiveStream.java new file mode 100644 index 0000000..20ffd66 --- /dev/null +++ b/src/one/nio/rpc/stream/ReceiveStream.java @@ -0,0 +1,27 @@ +/* + * Copyright 2018 Odnoklassniki Ltd, Mail.Ru Group + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package one.nio.rpc.stream; + +import java.io.IOException; + +public interface ReceiveStream extends BaseStream { + R receive() throws IOException, ClassNotFoundException; + + static ReceiveStream create(StreamHandler> handler) { + return new StreamProxy<>(handler); + } +} diff --git a/src/one/nio/rpc/stream/RpcStream.java b/src/one/nio/rpc/stream/RpcStream.java new file mode 100644 index 0000000..7af6040 --- /dev/null +++ b/src/one/nio/rpc/stream/RpcStream.java @@ -0,0 +1,31 @@ +/* + * Copyright 2018 Odnoklassniki Ltd, Mail.Ru Group + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package one.nio.rpc.stream; + +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.nio.ByteBuffer; + +public interface RpcStream extends BaseStream, ObjectInput, ObjectOutput { + void read(ByteBuffer buf) throws IOException; + void write(ByteBuffer buf) throws IOException; + + static RpcStream create(StreamHandler handler) { + return new StreamProxy<>(handler); + } +} diff --git a/src/one/nio/rpc/stream/RpcStreamImpl.java b/src/one/nio/rpc/stream/RpcStreamImpl.java new file mode 100644 index 0000000..7a0d55e --- /dev/null +++ b/src/one/nio/rpc/stream/RpcStreamImpl.java @@ -0,0 +1,483 @@ +/* + * Copyright 2018 Odnoklassniki Ltd, Mail.Ru Group + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package one.nio.rpc.stream; + +import one.nio.net.Socket; +import one.nio.serial.ObjectInputChannel; +import one.nio.serial.ObjectOutputChannel; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; + +public class RpcStreamImpl implements RpcStream, BidiStream { + protected final Socket socket; + protected final ObjectInputChannel in; + protected final ObjectOutputChannel out; + protected boolean error; + + public RpcStreamImpl(Socket socket) { + this.socket = socket; + this.in = new ObjectInputChannel(socket); + this.out = new ObjectOutputChannel(socket); + } + + // BaseStream + + @Override + public InetSocketAddress getLocalAddress() { + return socket.getLocalAddress(); + } + + @Override + public InetSocketAddress getRemoteAddress() { + return socket.getRemoteAddress(); + } + + @Override + public long getBytesRead() { + return in.getBytesRead(); + } + + @Override + public long getBytesWritten() { + return out.getBytesWritten(); + } + + // ObjectInput + + @Override + public void read(ByteBuffer buf) throws IOException { + try { + in.read(buf); + } catch (Throwable e) { + error = true; + throw e; + } + } + + @Override + public Object readObject() throws IOException, ClassNotFoundException { + try { + Object result = in.readObject(); + in.reset(); + return result; + } catch (Throwable e) { + error = true; + throw e; + } + } + + @Override + public int read() throws IOException { + try { + return in.read(); + } catch (Throwable e) { + error = true; + throw e; + } + } + + @Override + public int read(byte[] b) throws IOException { + try { + return in.read(b); + } catch (Throwable e) { + error = true; + throw e; + } + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + try { + return in.read(b, off, len); + } catch (Throwable e) { + error = true; + throw e; + } + } + + @Override + public long skip(long n) throws IOException { + try { + return in.skip(n); + } catch (Throwable e) { + error = true; + throw e; + } + } + + @Override + public int available() { + return in.available(); + } + + @Override + public void readFully(byte[] b) throws IOException { + try { + in.readFully(b); + } catch (Throwable e) { + error = true; + throw e; + } + } + + @Override + public void readFully(byte[] b, int off, int len) throws IOException { + try { + in.read(b, off, len); + } catch (Throwable e) { + error = true; + throw e; + } + } + + @Override + public int skipBytes(int n) throws IOException { + try { + return in.skipBytes(n); + } catch (Throwable e) { + error = true; + throw e; + } + } + + @Override + public boolean readBoolean() throws IOException { + try { + return in.readBoolean(); + } catch (Throwable e) { + error = true; + throw e; + } + } + + @Override + public byte readByte() throws IOException { + try { + return in.readByte(); + } catch (Throwable e) { + error = true; + throw e; + } + } + + @Override + public int readUnsignedByte() throws IOException { + try { + return in.readUnsignedByte(); + } catch (Throwable e) { + error = true; + throw e; + } + } + + @Override + public short readShort() throws IOException { + try { + return in.readShort(); + } catch (Throwable e) { + error = true; + throw e; + } + } + + @Override + public int readUnsignedShort() throws IOException { + try { + return in.readUnsignedShort(); + } catch (Throwable e) { + error = true; + throw e; + } + } + + @Override + public char readChar() throws IOException { + try { + return in.readChar(); + } catch (Throwable e) { + error = true; + throw e; + } + } + + @Override + public int readInt() throws IOException { + try { + return in.readInt(); + } catch (Throwable e) { + error = true; + throw e; + } + } + + @Override + public long readLong() throws IOException { + try { + return in.readLong(); + } catch (Throwable e) { + error = true; + throw e; + } + } + + @Override + public float readFloat() throws IOException { + try { + return in.readFloat(); + } catch (Throwable e) { + error = true; + throw e; + } + } + + @Override + public double readDouble() throws IOException { + try { + return in.readDouble(); + } catch (Throwable e) { + error = true; + throw e; + } + } + + @Override + public String readLine() throws IOException { + try { + return in.readLine(); + } catch (Throwable e) { + error = true; + throw e; + } + } + + @Override + public String readUTF() throws IOException { + try { + return in.readUTF(); + } catch (Throwable e) { + error = true; + throw e; + } + } + + // ObjectOutput + + @Override + public void write(ByteBuffer buf) throws IOException { + try { + out.write(buf); + } catch (Throwable e) { + error = true; + throw e; + } + } + + @Override + public void writeObject(Object obj) throws IOException { + try { + out.writeObject(obj); + out.reset(); + } catch (Throwable e) { + error = true; + throw e; + } + } + + @Override + public void write(int b) throws IOException { + try { + out.write(b); + } catch (Throwable e) { + error = true; + throw e; + } + } + + @Override + public void write(byte[] b) throws IOException { + try { + out.write(b); + } catch (Throwable e) { + error = true; + throw e; + } + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + try { + out.write(b, off, len); + } catch (Throwable e) { + error = true; + throw e; + } + } + + @Override + public void writeBoolean(boolean v) throws IOException { + try { + out.writeBoolean(v); + } catch (Throwable e) { + error = true; + throw e; + } + } + + @Override + public void writeByte(int v) throws IOException { + try { + out.writeByte(v); + } catch (Throwable e) { + error = true; + throw e; + } + } + + @Override + public void writeShort(int v) throws IOException { + try { + out.writeShort(v); + } catch (Throwable e) { + error = true; + throw e; + } + } + + @Override + public void writeChar(int v) throws IOException { + try { + out.writeChar(v); + } catch (Throwable e) { + error = true; + throw e; + } + } + + @Override + public void writeInt(int v) throws IOException { + try { + out.writeInt(v); + } catch (Throwable e) { + error = true; + throw e; + } + } + + @Override + public void writeLong(long v) throws IOException { + try { + out.writeLong(v); + } catch (Throwable e) { + error = true; + throw e; + } + } + + @Override + public void writeFloat(float v) throws IOException { + try { + out.writeFloat(v); + } catch (Throwable e) { + error = true; + throw e; + } + } + + @Override + public void writeDouble(double v) throws IOException { + try { + out.writeDouble(v); + } catch (Throwable e) { + error = true; + throw e; + } + } + + @Override + public void writeBytes(String s) throws IOException { + try { + out.writeBytes(s); + } catch (Throwable e) { + error = true; + throw e; + } + } + + @Override + public void writeChars(String s) throws IOException { + try { + out.writeChars(s); + } catch (Throwable e) { + error = true; + throw e; + } + } + + @Override + public void writeUTF(String s) throws IOException { + try { + out.writeUTF(s); + } catch (Throwable e) { + error = true; + throw e; + } + } + + @Override + public void flush() throws IOException { + try { + out.flush(); + } catch (Throwable e) { + error = true; + throw e; + } + } + + @Override + public void close() { + try { + in.close(); + out.close(); + } catch (IOException e) { + error = true; + } + } + + // BidiStream + + @Override + @SuppressWarnings("unchecked") + public R receive() throws IOException, ClassNotFoundException { + return (R) readObject(); + } + + @Override + public void send(S object) throws IOException { + writeObject(object); + } + + @Override + @SuppressWarnings("unchecked") + public R sendAndGet(S object) throws IOException, ClassNotFoundException { + writeObject(object); + flush(); + return (R) readObject(); + } +} diff --git a/src/one/nio/rpc/stream/SendStream.java b/src/one/nio/rpc/stream/SendStream.java new file mode 100644 index 0000000..1213c8a --- /dev/null +++ b/src/one/nio/rpc/stream/SendStream.java @@ -0,0 +1,27 @@ +/* + * Copyright 2018 Odnoklassniki Ltd, Mail.Ru Group + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package one.nio.rpc.stream; + +import java.io.IOException; + +public interface SendStream extends BaseStream { + void send(S object) throws IOException; + + static SendStream create(StreamHandler> handler) { + return new StreamProxy<>(handler); + } +} diff --git a/src/one/nio/rpc/stream/StreamHandler.java b/src/one/nio/rpc/stream/StreamHandler.java new file mode 100644 index 0000000..37ea4a2 --- /dev/null +++ b/src/one/nio/rpc/stream/StreamHandler.java @@ -0,0 +1,23 @@ +/* + * Copyright 2018 Odnoklassniki Ltd, Mail.Ru Group + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package one.nio.rpc.stream; + +import java.io.IOException; + +public interface StreamHandler { + void communicate(S stream) throws IOException, ClassNotFoundException; +} diff --git a/src/one/nio/rpc/stream/StreamProxy.java b/src/one/nio/rpc/stream/StreamProxy.java new file mode 100644 index 0000000..ad86166 --- /dev/null +++ b/src/one/nio/rpc/stream/StreamProxy.java @@ -0,0 +1,232 @@ +/* + * Copyright 2018 Odnoklassniki Ltd, Mail.Ru Group + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package one.nio.rpc.stream; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; + +public final class StreamProxy implements RpcStream, BidiStream { + public final StreamHandler handler; + + public long bytesRead; + public long bytesWritten; + + StreamProxy(StreamHandler handler) { + this.handler = handler; + } + + // BaseStream + + public final InetSocketAddress getLocalAddress() { + return null; + } + + public final InetSocketAddress getRemoteAddress() { + return null; + } + + public final long getBytesRead() { + return bytesRead; + } + + public final long getBytesWritten() { + return bytesWritten; + } + + // ObjectInput + + public final void read(ByteBuffer buf) throws IOException { + throw exception(); + } + + public final Object readObject() throws IOException { + throw exception(); + } + + public final int read() throws IOException { + throw exception(); + } + + public final int read(byte[] b) throws IOException { + throw exception(); + } + + public final int read(byte[] b, int off, int len) throws IOException { + throw exception(); + } + + public final long skip(long n) throws IOException { + throw exception(); + } + + public final int available() { + return 0; + } + + public final void readFully(byte[] b) throws IOException { + throw exception(); + } + + public final void readFully(byte[] b, int off, int len) throws IOException { + throw exception(); + } + + public final int skipBytes(int n) throws IOException { + throw exception(); + } + + public final boolean readBoolean() throws IOException { + throw exception(); + } + + public final byte readByte() throws IOException { + throw exception(); + } + + public final int readUnsignedByte() throws IOException { + throw exception(); + } + + public final short readShort() throws IOException { + throw exception(); + } + + public final int readUnsignedShort() throws IOException { + throw exception(); + } + + public final char readChar() throws IOException { + throw exception(); + } + + public final int readInt() throws IOException { + throw exception(); + } + + public final long readLong() throws IOException { + throw exception(); + } + + public final float readFloat() throws IOException { + throw exception(); + } + + public final double readDouble() throws IOException { + throw exception(); + } + + public final String readLine() throws IOException { + throw exception(); + } + + public final String readUTF() throws IOException { + throw exception(); + } + + // ObjectOutput + + public final void write(ByteBuffer buf) throws IOException { + throw exception(); + } + + public final void writeObject(Object obj) throws IOException { + throw exception(); + } + + public final void write(int b) throws IOException { + throw exception(); + } + + public final void write(byte[] b) throws IOException { + throw exception(); + } + + public final void write(byte[] b, int off, int len) throws IOException { + throw exception(); + } + + public final void writeBoolean(boolean v) throws IOException { + throw exception(); + } + + public final void writeByte(int v) throws IOException { + throw exception(); + } + + public final void writeShort(int v) throws IOException { + throw exception(); + } + + public final void writeChar(int v) throws IOException { + throw exception(); + } + + public final void writeInt(int v) throws IOException { + throw exception(); + } + + public final void writeLong(long v) throws IOException { + throw exception(); + } + + public final void writeFloat(float v) throws IOException { + throw exception(); + } + + public final void writeDouble(double v) throws IOException { + throw exception(); + } + + public final void writeBytes(String s) throws IOException { + throw exception(); + } + + public final void writeChars(String s) throws IOException { + throw exception(); + } + + public final void writeUTF(String s) throws IOException { + throw exception(); + } + + public final void flush() throws IOException { + throw exception(); + } + + public final void close() { + // Nothing to do + } + + // BidiStream + + public final R receive() throws IOException { + throw exception(); + } + + public final void send(S object) throws IOException { + throw exception(); + } + + public final R sendAndGet(S object) throws IOException { + throw exception(); + } + + private static IOException exception() { + return new IOException("Must not be called"); + } +} diff --git a/src/one/nio/serial/ObjectInputChannel.java b/src/one/nio/serial/ObjectInputChannel.java new file mode 100644 index 0000000..c3ea5a7 --- /dev/null +++ b/src/one/nio/serial/ObjectInputChannel.java @@ -0,0 +1,148 @@ +/* + * Copyright 2018 Odnoklassniki Ltd, Mail.Ru Group + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package one.nio.serial; + +import one.nio.mem.DirectMemory; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; +import java.util.Arrays; + +import static one.nio.util.JavaInternals.unsafe; + +public class ObjectInputChannel extends DataStream { + private static final int INITIAL_CAPACITY = 16; + + private final ReadableByteChannel ch; + private int capacity; + private Object[] context; + private int contextSize; + private long bytesRead; + + public ObjectInputChannel(ReadableByteChannel ch) { + this(ch, 32768); + } + + public ObjectInputChannel(ReadableByteChannel ch, int bufSize) { + super(unsafe.allocateMemory(bufSize), 0); + this.ch = ch; + this.capacity = bufSize; + this.context = new Object[INITIAL_CAPACITY]; + } + + public long getBytesRead() { + return bytesRead; + } + + @Override + public Object readObject() throws IOException, ClassNotFoundException { + Serializer serializer; + byte b = readByte(); + if (b >= 0) { + offset--; + serializer = Repository.requestSerializer(readLong()); + } else { + switch (b) { + case REF_NULL: + return null; + case REF_RECURSIVE: + return context[readUnsignedShort() + 1]; + case REF_RECURSIVE2: + return context[readInt() + 1]; + case REF_EMBEDDED: + serializer = (Serializer) readObject(); + Repository.provideSerializer(serializer); + break; + default: + serializer = Repository.requestBootstrapSerializer(b); + } + } + + if (++contextSize >= context.length) { + context = Arrays.copyOf(context, context.length * 2); + } + + return serializer.read(this); + } + + public void reset() { + if (context.length > INITIAL_CAPACITY) { + context = new Object[INITIAL_CAPACITY]; + } else { + Arrays.fill(context, null); + } + contextSize = 0; + } + + @Override + public void close() throws IOException { + unsafe.freeMemory(address); + address = 0; + } + + @Override + public void register(Object obj) { + context[contextSize] = obj; + } + + @Override + protected long alloc(int size) throws IOException { + int available = (int) (limit - offset); + if (available < size) { + fetch(size - available); + } + long currentOffset = offset; + offset = currentOffset + size; + return currentOffset; + } + + private void fetch(int size) throws IOException { + int available = (int) (limit - offset); + if (available + size > capacity) { + int newBufSize = Math.max(available + size + 32768, capacity * 3 / 2); + long newAddress = unsafe.allocateMemory(newBufSize); + if (available > 0) { + unsafe.copyMemory(null, offset, null, newAddress, available); + } + unsafe.freeMemory(address); + + this.address = newAddress; + this.offset = newAddress; + this.limit = newAddress + available; + this.capacity = newBufSize; + } else { + if (available > 0) { + unsafe.copyMemory(null, offset, null, address, available); + } + this.offset = address; + this.limit = address + available; + } + + ByteBuffer bb = DirectMemory.wrap(limit, capacity - available); + while (size > 0) { + int bytes = ch.read(bb); + if (bytes < 0) { + throw new EOFException(); + } + limit += bytes; + size -= bytes; + bytesRead += bytes; + } + } +} diff --git a/src/one/nio/serial/ObjectOutputChannel.java b/src/one/nio/serial/ObjectOutputChannel.java new file mode 100644 index 0000000..b5557a1 --- /dev/null +++ b/src/one/nio/serial/ObjectOutputChannel.java @@ -0,0 +1,165 @@ +/* + * Copyright 2018 Odnoklassniki Ltd, Mail.Ru Group + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package one.nio.serial; + +import one.nio.mem.DirectMemory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; + +import static one.nio.util.JavaInternals.unsafe; + +public class ObjectOutputChannel extends DataStream { + private final WritableByteChannel ch; + private final SerializationContext context; + private long[] serializerSet; + private int serializerSetSize; + private long bytesWritten; + + public ObjectOutputChannel(WritableByteChannel ch) { + this(ch, 32768); + } + + public ObjectOutputChannel(WritableByteChannel ch, int bufSize) { + super(unsafe.allocateMemory(bufSize), bufSize); + this.ch = ch; + this.context = new SerializationContext(); + this.serializerSet = new long[16]; + } + + public long getBytesWritten() { + return bytesWritten; + } + + @Override + @SuppressWarnings("unchecked") + public void writeObject(Object obj) throws IOException { + if (obj == null) { + writeByte(REF_NULL); + } else { + int index = context.indexOf(obj); + if (index < 0) { + Serializer serializer = Repository.get(obj.getClass()); + if (serializer.uid < 0) { + writeByte((byte) serializer.uid); + } else if (addSerializer(serializer.uid)) { + writeByte(REF_EMBEDDED); + writeObject(serializer); + } else { + writeLong(serializer.uid); + } + context.put(obj); + serializer.write(obj, this); + } else if (index <= 0xffff) { + writeByte(REF_RECURSIVE); + writeShort(index); + } else { + writeByte(REF_RECURSIVE2); + writeInt(index); + } + } + } + + public void reset() { + context.clear(); + } + + @Override + public void flush() throws IOException { + int count = count(); + if (count > 0) { + ByteBuffer bb = DirectMemory.wrap(address, count); + do { + ch.write(bb); + } while (bb.hasRemaining()); + offset = address; + bytesWritten += count; + } + } + + @Override + public void close() throws IOException { + try { + flush(); + } finally { + unsafe.freeMemory(address); + address = 0; + } + } + + @Override + protected long alloc(int size) throws IOException { + if (offset + size > limit) { + grow(size); + } + long currentOffset = offset; + offset = currentOffset + size; + return currentOffset; + } + + private void grow(int size) throws IOException { + flush(); + + if (size > available()) { + unsafe.freeMemory(address); + int newBufSize = Math.max(size + 32768, available() * 3 / 2); + long newAddress = unsafe.allocateMemory(newBufSize); + + this.address = newAddress; + this.offset = newAddress; + this.limit = newAddress + newBufSize; + } + } + + // Embed serializer set for performance reasons + private boolean addSerializer(long uid) { + long[] set = this.serializerSet; + int mask = set.length - 1; + + int i = (int) uid & mask; + while (set[i] != 0) { + if (set[i] == uid) { + return false; + } + i = (i + 1) & mask; + } + + set[i] = uid; + if (uid != 0 && ++serializerSetSize * 2 > serializerSet.length) { + resizeSerializerSet(); + } + return true; + } + + private void resizeSerializerSet() { + long[] set = new long[serializerSet.length * 2]; + int mask = set.length - 1; + + for (long uid : serializerSet) { + if (uid != 0) { + int i = (int) uid & mask; + while (set[i] != 0) { + i = (i + 1) & mask; + } + set[i] = uid; + } + } + + this.serializerSet = set; + } +} diff --git a/src/one/nio/serial/SerializationContext.java b/src/one/nio/serial/SerializationContext.java index 3e02182..5e7088e 100755 --- a/src/one/nio/serial/SerializationContext.java +++ b/src/one/nio/serial/SerializationContext.java @@ -16,6 +16,8 @@ package one.nio.serial; +import java.util.Arrays; + class SerializationContext { private static final int INITIAL_CAPACITY = 64; @@ -69,6 +71,20 @@ public int indexOf(Object obj) { return -1; } + public void clear() { + this.first = null; + this.size = 1; + + if (keys != null) { + if (keys.length > INITIAL_CAPACITY) { + this.keys = null; + this.values = null; + } else { + Arrays.fill(keys, null); + } + } + } + private Object[] init() { this.keys = new Object[INITIAL_CAPACITY]; this.values = new int[INITIAL_CAPACITY]; diff --git a/test/one/nio/serial/StreamingPerf.java b/test/one/nio/serial/StreamingPerf.java new file mode 100644 index 0000000..8de5d00 --- /dev/null +++ b/test/one/nio/serial/StreamingPerf.java @@ -0,0 +1,86 @@ +/* + * Copyright 2018 Odnoklassniki Ltd, Mail.Ru Group + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package one.nio.serial; + +import one.nio.net.Socket; +import one.nio.serial.sample.Sample; + +import java.io.IOException; +import java.net.InetAddress; +import java.util.concurrent.atomic.AtomicLong; + +public class StreamingPerf { + private static final String host = "127.0.0.1"; + private static final int port = 33777; + private static final Object sampleObject = Sample.createChat(); + private static final AtomicLong processed = new AtomicLong(); + + public static void main(String[] args) throws Exception { + final Socket server = Socket.createServerSocket(); + server.setReuseAddr(true, false); + server.bind(host, port, 128); + server.listen(128); + + new Thread("Reader") { + @Override + public void run() { + try (Socket s = accept(); ObjectInputChannel ch = new ObjectInputChannel(s)) { + while (true) { + Object o = ch.readObject(); + if (o == null) break; + ch.reset(); + processed.incrementAndGet(); + } + } catch (Throwable e) { + throw new AssertionError(e); + } + } + + private Socket accept() throws IOException { + Socket s = server.accept(); + s.setNoDelay(true); + return s; + } + }.start(); + + new Thread("Writer") { + @Override + public void run() { + try (Socket s = connect(); ObjectOutputChannel ch = new ObjectOutputChannel(s)) { + while (true) { + ch.writeObject(sampleObject); + ch.reset(); + } + } catch (Throwable e) { + throw new AssertionError(e); + } + } + + private Socket connect() throws IOException { + Socket s = Socket.connectInet(InetAddress.getByName(host), port); + s.setNoDelay(true); + return s; + } + }.start(); + + while (!Thread.interrupted()) { + Thread.sleep(1000); + long objectsPerSec = processed.getAndSet(0); + System.out.println("Streaming speed: " + objectsPerSec + " objects/sec"); + } + } +}