From dd089d89c722d6eaec3df5a8408242fa25f4516f Mon Sep 17 00:00:00 2001 From: Steven Warwick Date: Mon, 24 Sep 2018 13:29:50 -0400 Subject: [PATCH 1/5] Issue #47 and Issue #48 Added TransactTime to incremental message callback Added a new Incremental complete callback --- .../cme/mdp3/sbe/message/SbeConstants.java | 1 + .../com/epam/cme/mdp3/ChannelListener.java | 14 +++++++++-- .../epam/cme/mdp3/VoidChannelListener.java | 9 +++++-- .../mdp3/control/ChannelControllerRouter.java | 16 +++++++++++++ .../mdp3/control/InstrumentController.java | 14 +++++++++-- .../epam/cme/mdp3/test/MBOWithMBPMain.java | 9 +++++-- .../cme/mdp3/test/TestChannelListener.java | 24 +++++++++++++------ 7 files changed, 72 insertions(+), 15 deletions(-) diff --git a/core/src/main/java/com/epam/cme/mdp3/sbe/message/SbeConstants.java b/core/src/main/java/com/epam/cme/mdp3/sbe/message/SbeConstants.java index 149c418..011ebaf 100644 --- a/core/src/main/java/com/epam/cme/mdp3/sbe/message/SbeConstants.java +++ b/core/src/main/java/com/epam/cme/mdp3/sbe/message/SbeConstants.java @@ -29,4 +29,5 @@ private SbeConstants() { public final static int VERSION_OFFSET = 6; public final static int RESERVED_OFFSET = 7; public final static int MATCHEVENTINDICATOR_TAG = 5799; + public final static int TRANSACT_TIME_TAG = 60; } diff --git a/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/ChannelListener.java b/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/ChannelListener.java index e2da7b6..ac902c7 100644 --- a/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/ChannelListener.java +++ b/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/ChannelListener.java @@ -30,7 +30,7 @@ public interface ChannelListener extends CoreChannelListener { * @param mdEntry MBP Entry of Group from MDP Incremental Refresh Message. It can be null when MBO Incremental Refresh is received in separated template. */ void onIncrementalMBORefresh(final String channelId, final short matchEventIndicator, final int securityId, - final String secDesc, final long msgSeqNum, final FieldSet orderEntry, final FieldSet mdEntry); + final String secDesc, final long msgSeqNum, long transactTime, final FieldSet orderEntry, final FieldSet mdEntry); /** * @@ -42,8 +42,18 @@ void onIncrementalMBORefresh(final String channelId, final short matchEventIndic * @param mdEntry MBP Entry of Group from MDP Incremental Refresh Message. It can not be null. */ void onIncrementalMBPRefresh(final String channelId, final short matchEventIndicator, final int securityId, - final String secDesc, final long msgSeqNum, final FieldSet mdEntry); + final String secDesc, final long msgSeqNum, long transactTime, final FieldSet mdEntry); + /** + * Called when a Incremental MsgSeqNum has been fully processed + * This callback will be called for each securityId found in the MsgSeqNum packet + * + * @param channelId ID of MDP Channel + * @param securityId Security ID + * @param msgSeqNum Message sequence number of message. + */ + void onIncrementalComplete(final String channelId, final int securityId, final long msgSeqNum); + /** * Called when MDP Snapshot Full Refresh Message for MBO is received and processed. * diff --git a/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/VoidChannelListener.java b/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/VoidChannelListener.java index 8b2330b..c4335dd 100644 --- a/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/VoidChannelListener.java +++ b/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/VoidChannelListener.java @@ -60,12 +60,17 @@ default void onSecurityStatus(String channelId, int securityId, MdpMessage secSt } @Override - default void onIncrementalMBORefresh(String channelId, short matchEventIndicator, int securityId, String secDesc, long msgSeqNum, FieldSet orderEntry, FieldSet mdEntry) { + default void onIncrementalMBORefresh(String channelId, short matchEventIndicator, int securityId, String secDesc, long msgSeqNum, long transactTime, FieldSet orderEntry, FieldSet mdEntry) { } @Override - default void onIncrementalMBPRefresh(String channelId, short matchEventIndicator, int securityId, String secDesc, long msgSeqNum, FieldSet mdEntry) { + default void onIncrementalMBPRefresh(String channelId, short matchEventIndicator, int securityId, String secDesc, long msgSeqNum, long transactTime, FieldSet mdEntry) { + + } + + @Override + default void onIncrementalComplete(final String channelId, final int securityId, final long msgSeqNum) { } diff --git a/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/ChannelControllerRouter.java b/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/ChannelControllerRouter.java index 14319de..14187c8 100644 --- a/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/ChannelControllerRouter.java +++ b/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/ChannelControllerRouter.java @@ -22,7 +22,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.function.Consumer; import static com.epam.cme.mdp3.MdConstants.INCR_RFRSH_MD_ENTRY_TYPE; @@ -131,14 +133,25 @@ protected void routeMBPEntry(int securityId, MdpMessage mdpMessage, MdpGroupEntr } } + protected void routeIncrementalComplete(Set securityIds, long msgSeqNum) { + for (Integer securityId : securityIds) { + InstrumentController instrumentController = instrumentManager.getInstrumentController(securityId); + if (instrumentController != null) { + instrumentController.handleIncrementalComplete(msgSeqNum); + } + } + } + private void handleIncrementalMessage(MdpMessage mdpMessage, MdpGroup mdpGroup, MdpGroupEntry mdpGroupEntry, long msgSeqNum){ if (isIncrementalMessageSupported(mdpMessage)) { + Set securityIds = new HashSet<>(); if (isIncrementOnlyForMBO(mdpMessage)) { mdpMessage.getGroup(MdConstants.NO_MD_ENTRIES, mdpGroup); while (mdpGroup.hashNext()) { mdpGroup.next(); mdpGroup.getEntry(mdpGroupEntry); int securityId = getSecurityId(mdpGroupEntry); + securityIds.add(securityId); routeMBOEntry(securityId, mdpMessage, mdpGroupEntry, null, msgSeqNum); } } else { @@ -151,6 +164,7 @@ private void handleIncrementalMessage(MdpMessage mdpMessage, MdpGroup mdpGroup, emptyBookConsumers.forEach(mdpMessageConsumer -> mdpMessageConsumer.accept(mdpMessage)); } else { int securityId = mdEntry.getInt32(MdConstants.SECURITY_ID); + securityIds.add(securityId); routeMBPEntry(securityId, mdpMessage, mdEntry, msgSeqNum); } } @@ -163,10 +177,12 @@ private void handleIncrementalMessage(MdpMessage mdpMessage, MdpGroup mdpGroup, short entryNum = mdpGroupEntry.getUInt8(MdConstants.REFERENCE_ID); noMdEntriesGroup.getEntry(entryNum, mdEntry); int securityId = mdEntry.getInt32(MdConstants.SECURITY_ID); + securityIds.add(securityId); routeMBOEntry(securityId, mdpMessage, mdpGroupEntry, mdEntry, msgSeqNum); } } } + routeIncrementalComplete(securityIds, msgSeqNum); } } diff --git a/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/InstrumentController.java b/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/InstrumentController.java index 52a4443..ff48b7d 100644 --- a/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/InstrumentController.java +++ b/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/InstrumentController.java @@ -36,8 +36,9 @@ public InstrumentController(List listeners, String channelId, i public void handleMBOIncrementMDEntry(MdpMessage mdpMessage, MdpGroupEntry orderIDEntry, MdpGroupEntry mdEntry, long msgSeqNum){ if(enable) { short matchEventIndicator = mdpMessage.getUInt8(SbeConstants.MATCHEVENTINDICATOR_TAG); + long transactTime = mdpMessage.getUInt64(SbeConstants.TRANSACT_TIME_TAG); for (ChannelListener channelListener : listeners) { - channelListener.onIncrementalMBORefresh(channelId, matchEventIndicator, securityId, secDesc, msgSeqNum, orderIDEntry, mdEntry); + channelListener.onIncrementalMBORefresh(channelId, matchEventIndicator, securityId, secDesc, msgSeqNum, transactTime, orderIDEntry, mdEntry); } } } @@ -45,8 +46,17 @@ public void handleMBOIncrementMDEntry(MdpMessage mdpMessage, MdpGroupEntry order public void handleMBPIncrementMDEntry(MdpMessage mdpMessage, MdpGroupEntry mdEntry, long msgSeqNum){ if(enable) { short matchEventIndicator = mdpMessage.getUInt8(SbeConstants.MATCHEVENTINDICATOR_TAG); + long transactTime = mdpMessage.getUInt64(SbeConstants.TRANSACT_TIME_TAG); for (ChannelListener channelListener : listeners) { - channelListener.onIncrementalMBPRefresh(channelId, matchEventIndicator, securityId, secDesc, msgSeqNum, mdEntry); + channelListener.onIncrementalMBPRefresh(channelId, matchEventIndicator, securityId, secDesc, msgSeqNum, transactTime, mdEntry); + } + } + } + + public void handleIncrementalComplete(long msgSeqNum) { + if(enable) { + for (ChannelListener channelListener : listeners) { + channelListener.onIncrementalComplete(channelId, securityId, msgSeqNum); } } } diff --git a/mbp-with-mbo/src/test/java/com/epam/cme/mdp3/test/MBOWithMBPMain.java b/mbp-with-mbo/src/test/java/com/epam/cme/mdp3/test/MBOWithMBPMain.java index 8b62f3d..541495a 100644 --- a/mbp-with-mbo/src/test/java/com/epam/cme/mdp3/test/MBOWithMBPMain.java +++ b/mbp-with-mbo/src/test/java/com/epam/cme/mdp3/test/MBOWithMBPMain.java @@ -121,7 +121,7 @@ public int onSecurityDefinition(final String channelId, final MdpMessage mdpMess @Override public void onIncrementalMBPRefresh(final String channelId, final short matchEventIndicator, final int securityId, - final String secDesc, final long msgSeqNum, final FieldSet mdEntry){ + final String secDesc, final long msgSeqNum, long transactTime, final FieldSet mdEntry){ //logger.debug("IncrementalMBPRefresh of {}, {} = {}", channelId, secDesc, msgSeqNum); MultipleDepthBookHandler multipleDepthBookHandler = multipleDepthBookHandlers.computeIfAbsent(securityId, integer -> new MultipleDepthBookHandler(integer, MdEventFlags.BOOK, (byte) 20)); char mdEntryType = mdEntry.getChar(269); @@ -136,7 +136,7 @@ public void onIncrementalMBPRefresh(final String channelId, final short matchEve @Override public void onIncrementalMBORefresh(final String channelId, final short matchEventIndicator, final int securityId, - final String secDesc, final long msgSeqNum, final FieldSet orderIDEntry, final FieldSet mdEntry){ + final String secDesc, final long msgSeqNum, long transactTime, final FieldSet orderIDEntry, final FieldSet mdEntry){ try { long orderId; long mdOrderPriority; @@ -182,6 +182,11 @@ public void onIncrementalMBORefresh(final String channelId, final short matchEve } } } + + @Override + public void onIncrementalComplete(final String channelId, final int securityId, final long msgSeqNum){ + logger.trace(String.format("onIncrementalComplete : ChannelId: %s, SecurityId: %s, MsgSeqNum: %s", channelId, securityId, msgSeqNum)); + } @Override public void onSnapshotMBOFullRefresh(final String channelId, final String secDesc, final MdpMessage snptMessage){ diff --git a/mbp-with-mbo/src/test/java/com/epam/cme/mdp3/test/TestChannelListener.java b/mbp-with-mbo/src/test/java/com/epam/cme/mdp3/test/TestChannelListener.java index a8f718a..c6b2824 100644 --- a/mbp-with-mbo/src/test/java/com/epam/cme/mdp3/test/TestChannelListener.java +++ b/mbp-with-mbo/src/test/java/com/epam/cme/mdp3/test/TestChannelListener.java @@ -56,14 +56,18 @@ public int onSecurityDefinition(String channelId, MdpMessage secDefMessage) { @Override public void onIncrementalMBORefresh(final String channelId, final short matchEventIndicator, final int securityId, - final String secDesc, final long msgSeqNum, final FieldSet orderIDEntry, final FieldSet mdEntry){ - incrementMBOQueue.add(new IncrementalRefreshEntity(channelId, matchEventIndicator, securityId, secDesc, msgSeqNum, orderIDEntry != null ? orderIDEntry.copy() : null, mdEntry != null ? mdEntry.copy() : null)); + final String secDesc, final long msgSeqNum, long transactTime, final FieldSet orderIDEntry, final FieldSet mdEntry){ + incrementMBOQueue.add(new IncrementalRefreshEntity(channelId, matchEventIndicator, securityId, secDesc, msgSeqNum, transactTime, orderIDEntry != null ? orderIDEntry.copy() : null, mdEntry != null ? mdEntry.copy() : null)); } @Override public void onIncrementalMBPRefresh(final String channelId, final short matchEventIndicator, final int securityId, - final String secDesc, final long msgSeqNum, final FieldSet mdEntry){ - incrementMBPQueue.add(new IncrementalRefreshEntity(channelId, matchEventIndicator, securityId, secDesc, msgSeqNum, (mdEntry != null) ? mdEntry.copy() : null)); + final String secDesc, final long msgSeqNum, long transactTime, final FieldSet mdEntry){ + incrementMBPQueue.add(new IncrementalRefreshEntity(channelId, matchEventIndicator, securityId, secDesc, msgSeqNum, transactTime, (mdEntry != null) ? mdEntry.copy() : null)); + } + + @Override + public void onIncrementalComplete(final String channelId, final int securityId, final long msgSeqNum){ } @Override @@ -120,19 +124,21 @@ public class IncrementalRefreshEntity { private int securityId; private String secDesc; private long msgSeqNum; + private long transactTime; private FieldSet orderIDEntry; private FieldSet mdEntry; - public IncrementalRefreshEntity(String channelId, short matchEventIndicator, int securityId, String secDesc, long msgSeqNum, FieldSet mdEntry) { - this(channelId, matchEventIndicator, securityId, secDesc, msgSeqNum, null, mdEntry); + public IncrementalRefreshEntity(String channelId, short matchEventIndicator, int securityId, String secDesc, long msgSeqNum, long transactTime, FieldSet mdEntry) { + this(channelId, matchEventIndicator, securityId, secDesc, msgSeqNum, transactTime, null, mdEntry); } - public IncrementalRefreshEntity(String channelId, short matchEventIndicator, int securityId, String secDesc, long msgSeqNum, FieldSet orderIDEntry, FieldSet mdEntry) { + public IncrementalRefreshEntity(String channelId, short matchEventIndicator, int securityId, String secDesc, long msgSeqNum, long transactTime, FieldSet orderIDEntry, FieldSet mdEntry) { this.channelId = channelId; this.matchEventIndicator = matchEventIndicator; this.securityId = securityId; this.secDesc = secDesc; this.msgSeqNum = msgSeqNum; + this.transactTime = transactTime; this.orderIDEntry = orderIDEntry; this.mdEntry = mdEntry; } @@ -157,6 +163,10 @@ public long getMsgSeqNum() { return msgSeqNum; } + public long getTransactTime() { + return transactTime; + } + public FieldSet getOrderIDEntry() { return orderIDEntry; } From fef786c9ba90f3912bca13d4bb9bb7530133ebfc Mon Sep 17 00:00:00 2001 From: Steven Warwick Date: Wed, 26 Sep 2018 11:49:13 -0400 Subject: [PATCH 2/5] Update code with main message in the incremental callback Code clean up to remove warnings Updated build version to 2.1.0 --- core/build.gradle | 2 +- .../java/com/epam/cme/mdp3/MdpPacket.java | 1 - .../mdp3/core/channel/MdpFeedException.java | 6 ++- .../cme/mdp3/core/channel/MdpFeedWorker.java | 4 +- .../epam/cme/mdp3/sbe/message/SbeBuffer.java | 6 +-- .../cme/mdp3/sbe/message/SbeBufferImpl.java | 8 ++-- .../cme/mdp3/sbe/message/SbeConstants.java | 1 - .../sbe/message/meta/SbePrimitiveType.java | 4 ++ .../schema/MdpMessageTypeBuildException.java | 2 + .../schema/SchemaUnmarshallingException.java | 2 + mbp-only/build.gradle | 2 +- mbp-with-mbo/build.gradle | 2 +- .../com/epam/cme/mdp3/ChannelListener.java | 14 +++---- .../epam/cme/mdp3/VoidChannelListener.java | 6 +-- .../cme/mdp3/channel/MdpChannelBuilder.java | 3 -- .../mdp3/control/ChannelControllerRouter.java | 6 +-- .../mdp3/control/GapChannelController.java | 11 +++++ .../mdp3/control/InstrumentController.java | 14 ++----- .../control/OffHeapSnapshotCycleHandler.java | 14 +------ .../epam/cme/mdp3/test/MBOWithMBPMain.java | 12 +++--- .../cme/mdp3/test/TestChannelListener.java | 40 ++++++++----------- .../MBOChannelControllerRouterTest.java | 3 +- 22 files changed, 77 insertions(+), 86 deletions(-) diff --git a/core/build.gradle b/core/build.gradle index dc0e0d7..5bf3ec3 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -1,5 +1,5 @@ group 'com.epam.cme' -version '2.0.2' +version '2.1.0' apply plugin: 'java' diff --git a/core/src/main/java/com/epam/cme/mdp3/MdpPacket.java b/core/src/main/java/com/epam/cme/mdp3/MdpPacket.java index fa9735b..f1b5ebc 100644 --- a/core/src/main/java/com/epam/cme/mdp3/MdpPacket.java +++ b/core/src/main/java/com/epam/cme/mdp3/MdpPacket.java @@ -14,7 +14,6 @@ import com.epam.cme.mdp3.sbe.message.SbeBuffer; import com.epam.cme.mdp3.sbe.message.SbeBufferImpl; -import com.epam.cme.mdp3.sbe.message.SbeConstants; import com.epam.cme.mdp3.sbe.message.SbeMessage; import java.nio.ByteBuffer; diff --git a/core/src/main/java/com/epam/cme/mdp3/core/channel/MdpFeedException.java b/core/src/main/java/com/epam/cme/mdp3/core/channel/MdpFeedException.java index 7123568..5e7a52e 100644 --- a/core/src/main/java/com/epam/cme/mdp3/core/channel/MdpFeedException.java +++ b/core/src/main/java/com/epam/cme/mdp3/core/channel/MdpFeedException.java @@ -12,8 +12,10 @@ package com.epam.cme.mdp3.core.channel; -public class -MdpFeedException extends Exception { +public class MdpFeedException extends Exception { + + private static final long serialVersionUID = -6425092985801210129L; + public MdpFeedException() { super(); } diff --git a/core/src/main/java/com/epam/cme/mdp3/core/channel/MdpFeedWorker.java b/core/src/main/java/com/epam/cme/mdp3/core/channel/MdpFeedWorker.java index c5b1c97..4944de0 100644 --- a/core/src/main/java/com/epam/cme/mdp3/core/channel/MdpFeedWorker.java +++ b/core/src/main/java/com/epam/cme/mdp3/core/channel/MdpFeedWorker.java @@ -37,8 +37,6 @@ public class MdpFeedWorker implements Runnable { private static final Logger logger = LoggerFactory.getLogger(MdpFeedWorker.class); - private static final int ACTIVE_MARK = 1; - private static final int SHUTDOWN_MARK = 2; public static final int RCV_BUFFER_SIZE = 4*1024*1024; private final ConnectionCfg cfg; @@ -167,7 +165,7 @@ public void run() { private void select(final ByteBuffer byteBuffer, final MdpPacket mdpPacket) throws IOException { if (selector.select() > 0) { - Iterator selectedKeys = selector.selectedKeys().iterator(); + Iterator selectedKeys = selector.selectedKeys().iterator(); while (selectedKeys.hasNext()) { final SelectionKey key = (SelectionKey) selectedKeys.next(); selectedKeys.remove(); diff --git a/core/src/main/java/com/epam/cme/mdp3/sbe/message/SbeBuffer.java b/core/src/main/java/com/epam/cme/mdp3/sbe/message/SbeBuffer.java index 8c9a846..eb7d7ef 100644 --- a/core/src/main/java/com/epam/cme/mdp3/sbe/message/SbeBuffer.java +++ b/core/src/main/java/com/epam/cme/mdp3/sbe/message/SbeBuffer.java @@ -26,11 +26,11 @@ public interface SbeBuffer { */ void wrapForParse(final ByteBuffer bb); - void copyTo(BytesStore store); + void copyTo(BytesStore store); - void copyTo(int offset, BytesStore store, int len); + void copyTo(int offset, BytesStore store, int len); - void copyFrom(BytesStore store); + void copyFrom(BytesStore store); void copyFrom(SbeBuffer buffer); diff --git a/core/src/main/java/com/epam/cme/mdp3/sbe/message/SbeBufferImpl.java b/core/src/main/java/com/epam/cme/mdp3/sbe/message/SbeBufferImpl.java index b22de12..c7b5ad4 100644 --- a/core/src/main/java/com/epam/cme/mdp3/sbe/message/SbeBufferImpl.java +++ b/core/src/main/java/com/epam/cme/mdp3/sbe/message/SbeBufferImpl.java @@ -19,7 +19,7 @@ public class SbeBufferImpl extends AbstractSbeBuffer implements SbeBuffer { public static final byte BYTE_MASK = (byte) 0xff; - protected BytesStore bytes; + protected BytesStore bytes; @Override public void wrap(final SbeBuffer sb) { @@ -31,17 +31,17 @@ public void wrap(final SbeBuffer sb) { } @Override - public void copyTo(final BytesStore store) { + public void copyTo(final BytesStore store) { store.write(0, this.bytes, 0, this.length()); } @Override - public void copyTo(final int offset, final BytesStore store, final int len) { + public void copyTo(final int offset, final BytesStore store, final int len) { store.write(0, this.bytes, offset, len); } @Override - public void copyFrom(BytesStore store) { + public void copyFrom(BytesStore store) { this.bytes.write(0, store, 0, store.length()); this.length = store.length(); } diff --git a/core/src/main/java/com/epam/cme/mdp3/sbe/message/SbeConstants.java b/core/src/main/java/com/epam/cme/mdp3/sbe/message/SbeConstants.java index 011ebaf..149c418 100644 --- a/core/src/main/java/com/epam/cme/mdp3/sbe/message/SbeConstants.java +++ b/core/src/main/java/com/epam/cme/mdp3/sbe/message/SbeConstants.java @@ -29,5 +29,4 @@ private SbeConstants() { public final static int VERSION_OFFSET = 6; public final static int RESERVED_OFFSET = 7; public final static int MATCHEVENTINDICATOR_TAG = 5799; - public final static int TRANSACT_TIME_TAG = 60; } diff --git a/core/src/main/java/com/epam/cme/mdp3/sbe/message/meta/SbePrimitiveType.java b/core/src/main/java/com/epam/cme/mdp3/sbe/message/meta/SbePrimitiveType.java index 06efd32..cd5d26c 100644 --- a/core/src/main/java/com/epam/cme/mdp3/sbe/message/meta/SbePrimitiveType.java +++ b/core/src/main/java/com/epam/cme/mdp3/sbe/message/meta/SbePrimitiveType.java @@ -86,6 +86,10 @@ public long getMaxValue() { public long getNullValue() { return nullValue; } + + public String getJavaType() { + return javaType; + } public boolean isNull(final long value) { return value == this.nullValue; diff --git a/core/src/main/java/com/epam/cme/mdp3/sbe/schema/MdpMessageTypeBuildException.java b/core/src/main/java/com/epam/cme/mdp3/sbe/schema/MdpMessageTypeBuildException.java index 81fe97a..8851d74 100644 --- a/core/src/main/java/com/epam/cme/mdp3/sbe/schema/MdpMessageTypeBuildException.java +++ b/core/src/main/java/com/epam/cme/mdp3/sbe/schema/MdpMessageTypeBuildException.java @@ -13,6 +13,8 @@ package com.epam.cme.mdp3.sbe.schema; public class MdpMessageTypeBuildException extends Exception { + private static final long serialVersionUID = -6385550323191516252L; + public MdpMessageTypeBuildException() { super(); } diff --git a/core/src/main/java/com/epam/cme/mdp3/sbe/schema/SchemaUnmarshallingException.java b/core/src/main/java/com/epam/cme/mdp3/sbe/schema/SchemaUnmarshallingException.java index 95a4095..511f095 100644 --- a/core/src/main/java/com/epam/cme/mdp3/sbe/schema/SchemaUnmarshallingException.java +++ b/core/src/main/java/com/epam/cme/mdp3/sbe/schema/SchemaUnmarshallingException.java @@ -13,6 +13,8 @@ package com.epam.cme.mdp3.sbe.schema; public class SchemaUnmarshallingException extends Exception { + private static final long serialVersionUID = 1944148130312471541L; + public SchemaUnmarshallingException() { super(); } diff --git a/mbp-only/build.gradle b/mbp-only/build.gradle index 7ddcedc..326504d 100644 --- a/mbp-only/build.gradle +++ b/mbp-only/build.gradle @@ -7,7 +7,7 @@ plugins { id "me.champeau.gradle.jmh" version "0.3.1" } group = 'com.epam.cme' -version = '2.0.2' +version = '2.1.0' sourceCompatibility = 1.8 targetCompatibility = 1.8 diff --git a/mbp-with-mbo/build.gradle b/mbp-with-mbo/build.gradle index 00de762..bf22fb7 100644 --- a/mbp-with-mbo/build.gradle +++ b/mbp-with-mbo/build.gradle @@ -6,7 +6,7 @@ plugins { } group = 'com.epam.cme' -version = '2.0.2' +version = '2.1.0' sourceCompatibility = 1.8 targetCompatibility = 1.8 diff --git a/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/ChannelListener.java b/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/ChannelListener.java index ac902c7..b8a24e5 100644 --- a/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/ChannelListener.java +++ b/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/ChannelListener.java @@ -21,38 +21,38 @@ public interface ChannelListener extends CoreChannelListener { * * Only when MBO is enabled. * + * @param mdpMessage The full MDP Message * @param channelId ID of MDP Channel - * @param matchEventIndicator MDP Event indicator (bitset, @see MDP 3.0 - Market Data Incremental Refresh) * @param secDesc Security description * @param msgSeqNum Message sequence number of message. * @param securityId Security ID * @param orderEntry MBO Entry of Group from MDP Incremental Refresh Message. It can not be null. * @param mdEntry MBP Entry of Group from MDP Incremental Refresh Message. It can be null when MBO Incremental Refresh is received in separated template. */ - void onIncrementalMBORefresh(final String channelId, final short matchEventIndicator, final int securityId, - final String secDesc, final long msgSeqNum, long transactTime, final FieldSet orderEntry, final FieldSet mdEntry); + void onIncrementalMBORefresh(final MdpMessage mdpMessage, final String channelId, final int securityId, final String secDesc, final long msgSeqNum, final FieldSet orderEntry, final FieldSet mdEntry); /** * + * @param mdpMessage The full MDP Message * @param channelId ID of MDP Channel - * @param matchEventIndicator MDP Event indicator (bitset, @see MDP 3.0 - Market Data Incremental Refresh) * @param securityId Security ID * @param secDesc Security description * @param msgSeqNum Message sequence number of message. * @param mdEntry MBP Entry of Group from MDP Incremental Refresh Message. It can not be null. */ - void onIncrementalMBPRefresh(final String channelId, final short matchEventIndicator, final int securityId, - final String secDesc, final long msgSeqNum, long transactTime, final FieldSet mdEntry); + void onIncrementalMBPRefresh(final MdpMessage mdpMessage, final String channelId, final int securityId, final String secDesc, final long msgSeqNum, final FieldSet mdEntry); /** * Called when a Incremental MsgSeqNum has been fully processed * This callback will be called for each securityId found in the MsgSeqNum packet * + * @param mdpMessage The full MDP Message * @param channelId ID of MDP Channel * @param securityId Security ID + * @param secDesc Security description * @param msgSeqNum Message sequence number of message. */ - void onIncrementalComplete(final String channelId, final int securityId, final long msgSeqNum); + void onIncrementalComplete(final MdpMessage mdpMessage, final String channelId, final int securityId, final String secDesc, final long msgSeqNum); /** * Called when MDP Snapshot Full Refresh Message for MBO is received and processed. diff --git a/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/VoidChannelListener.java b/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/VoidChannelListener.java index c4335dd..b918ae9 100644 --- a/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/VoidChannelListener.java +++ b/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/VoidChannelListener.java @@ -60,17 +60,17 @@ default void onSecurityStatus(String channelId, int securityId, MdpMessage secSt } @Override - default void onIncrementalMBORefresh(String channelId, short matchEventIndicator, int securityId, String secDesc, long msgSeqNum, long transactTime, FieldSet orderEntry, FieldSet mdEntry) { + default void onIncrementalMBORefresh(MdpMessage mdpMessage, String channelId, int securityId, String secDesc, long msgSeqNum, FieldSet orderEntry, FieldSet mdEntry) { } @Override - default void onIncrementalMBPRefresh(String channelId, short matchEventIndicator, int securityId, String secDesc, long msgSeqNum, long transactTime, FieldSet mdEntry) { + default void onIncrementalMBPRefresh(MdpMessage mdpMessage, String channelId, int securityId, String secDesc, long msgSeqNum, FieldSet mdEntry) { } @Override - default void onIncrementalComplete(final String channelId, final int securityId, final long msgSeqNum) { + default void onIncrementalComplete(MdpMessage mdpMessage, String channelId, int securityId, String secDesc, long msgSeqNum) { } diff --git a/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/channel/MdpChannelBuilder.java b/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/channel/MdpChannelBuilder.java index 73ba106..42b5056 100644 --- a/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/channel/MdpChannelBuilder.java +++ b/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/channel/MdpChannelBuilder.java @@ -23,7 +23,6 @@ import com.epam.cme.mdp3.sbe.schema.MdpMessageTypes; import java.net.URI; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -35,7 +34,6 @@ public class MdpChannelBuilder { private final String channelId; private URI cfgURI; private URI schemaURI; - private MdpMessageTypes mdpMessageTypes; private Map feedANetworkInterfaces = new HashMap<>(); private Map feedBNetworkInterfaces = new HashMap<>(); private ChannelListener channelListener; @@ -66,7 +64,6 @@ public MdpChannelBuilder setConfiguration(final URI cfgURI) { public MdpChannelBuilder setSchema(final URI schemaURI) throws MdpMessageTypeBuildException { this.schemaURI = schemaURI; - this.mdpMessageTypes = new MdpMessageTypes(this.schemaURI); return this; } diff --git a/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/ChannelControllerRouter.java b/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/ChannelControllerRouter.java index 14187c8..b7aece5 100644 --- a/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/ChannelControllerRouter.java +++ b/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/ChannelControllerRouter.java @@ -133,11 +133,11 @@ protected void routeMBPEntry(int securityId, MdpMessage mdpMessage, MdpGroupEntr } } - protected void routeIncrementalComplete(Set securityIds, long msgSeqNum) { + protected void routeIncrementalComplete(Set securityIds, MdpMessage mdpMessage, long msgSeqNum) { for (Integer securityId : securityIds) { InstrumentController instrumentController = instrumentManager.getInstrumentController(securityId); if (instrumentController != null) { - instrumentController.handleIncrementalComplete(msgSeqNum); + instrumentController.handleIncrementalComplete(mdpMessage, msgSeqNum); } } } @@ -182,7 +182,7 @@ private void handleIncrementalMessage(MdpMessage mdpMessage, MdpGroup mdpGroup, } } } - routeIncrementalComplete(securityIds, msgSeqNum); + routeIncrementalComplete(securityIds, mdpMessage, msgSeqNum); } } diff --git a/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/GapChannelController.java b/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/GapChannelController.java index 50ea543..9b11e76 100644 --- a/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/GapChannelController.java +++ b/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/GapChannelController.java @@ -143,6 +143,12 @@ public void handleSnapshotPacket(MdpFeedContext feedContext, MdpPacket mdpPacket target.handleSnapshotPacket(feedContext, mdpPacket); } break; + case CLOSED: + case CLOSING: + case SYNC: + default: + log.error("handleSnapshotPacket invalid current state: {}", currentState); + break; } } finally { lock.unlock(); @@ -199,6 +205,11 @@ public void handleIncrementalPacket(MdpFeedContext feedContext, MdpPacket mdpPac feedContext.getFeedType(), feedContext.getFeed(), currentState, pkgSequence); } break; + case CLOSED: + case CLOSING: + default: + log.error("handleIncrementalPacket invalid current state: {}", currentState); + break; } } finally { lock.unlock(); diff --git a/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/InstrumentController.java b/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/InstrumentController.java index ff48b7d..4321d72 100644 --- a/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/InstrumentController.java +++ b/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/InstrumentController.java @@ -15,8 +15,6 @@ import com.epam.cme.mdp3.ChannelListener; import com.epam.cme.mdp3.MdpGroupEntry; import com.epam.cme.mdp3.MdpMessage; -import com.epam.cme.mdp3.sbe.message.SbeConstants; - import java.util.List; public class InstrumentController { @@ -35,28 +33,24 @@ public InstrumentController(List listeners, String channelId, i public void handleMBOIncrementMDEntry(MdpMessage mdpMessage, MdpGroupEntry orderIDEntry, MdpGroupEntry mdEntry, long msgSeqNum){ if(enable) { - short matchEventIndicator = mdpMessage.getUInt8(SbeConstants.MATCHEVENTINDICATOR_TAG); - long transactTime = mdpMessage.getUInt64(SbeConstants.TRANSACT_TIME_TAG); for (ChannelListener channelListener : listeners) { - channelListener.onIncrementalMBORefresh(channelId, matchEventIndicator, securityId, secDesc, msgSeqNum, transactTime, orderIDEntry, mdEntry); + channelListener.onIncrementalMBORefresh(mdpMessage, channelId, securityId, secDesc, msgSeqNum, orderIDEntry, mdEntry); } } } public void handleMBPIncrementMDEntry(MdpMessage mdpMessage, MdpGroupEntry mdEntry, long msgSeqNum){ if(enable) { - short matchEventIndicator = mdpMessage.getUInt8(SbeConstants.MATCHEVENTINDICATOR_TAG); - long transactTime = mdpMessage.getUInt64(SbeConstants.TRANSACT_TIME_TAG); for (ChannelListener channelListener : listeners) { - channelListener.onIncrementalMBPRefresh(channelId, matchEventIndicator, securityId, secDesc, msgSeqNum, transactTime, mdEntry); + channelListener.onIncrementalMBPRefresh(mdpMessage, channelId, securityId, secDesc, msgSeqNum, mdEntry); } } } - public void handleIncrementalComplete(long msgSeqNum) { + public void handleIncrementalComplete(MdpMessage mdpMessage, long msgSeqNum) { if(enable) { for (ChannelListener channelListener : listeners) { - channelListener.onIncrementalComplete(channelId, securityId, msgSeqNum); + channelListener.onIncrementalComplete(mdpMessage, channelId, securityId, secDesc, msgSeqNum); } } } diff --git a/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/OffHeapSnapshotCycleHandler.java b/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/OffHeapSnapshotCycleHandler.java index 2ef0052..f0ee0b8 100644 --- a/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/OffHeapSnapshotCycleHandler.java +++ b/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/OffHeapSnapshotCycleHandler.java @@ -123,7 +123,7 @@ private void clearArray(LongArray array) { } private class LongArray { - private BytesStore bytes; + private BytesStore bytes; private long length; public LongArray(long length) { @@ -149,14 +149,6 @@ public void setValue(long index, long value) { } } - public BytesStore getBytes() { - return bytes; - } - - public void setBytes(BytesStore bytes) { - this.bytes = bytes; - } - public long getLength() { return length; } @@ -183,10 +175,6 @@ public T getValue() { return value; } - public void setValue(T value) { - this.value = value; - } - @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/mbp-with-mbo/src/test/java/com/epam/cme/mdp3/test/MBOWithMBPMain.java b/mbp-with-mbo/src/test/java/com/epam/cme/mdp3/test/MBOWithMBPMain.java index 541495a..cab3eb7 100644 --- a/mbp-with-mbo/src/test/java/com/epam/cme/mdp3/test/MBOWithMBPMain.java +++ b/mbp-with-mbo/src/test/java/com/epam/cme/mdp3/test/MBOWithMBPMain.java @@ -19,6 +19,7 @@ import com.epam.cme.mdp3.sbe.message.SbeGroup; import com.epam.cme.mdp3.sbe.message.SbeGroupEntry; import com.epam.cme.mdp3.sbe.message.SbeString; +import com.epam.cme.mdp3.sbe.message.SbeConstants; import com.epam.cme.mdp3.test.mbpbook.MultipleDepthBookHandler; import com.epam.cme.mdp3.test.mbpbook.OrderBookPriceLevel; import org.apache.commons.lang3.time.DateFormatUtils; @@ -120,8 +121,7 @@ public int onSecurityDefinition(final String channelId, final MdpMessage mdpMess } @Override - public void onIncrementalMBPRefresh(final String channelId, final short matchEventIndicator, final int securityId, - final String secDesc, final long msgSeqNum, long transactTime, final FieldSet mdEntry){ + public void onIncrementalMBPRefresh(final MdpMessage mdpMessage, final String channelId, final int securityId, final String secDesc, final long msgSeqNum, final FieldSet mdEntry){ //logger.debug("IncrementalMBPRefresh of {}, {} = {}", channelId, secDesc, msgSeqNum); MultipleDepthBookHandler multipleDepthBookHandler = multipleDepthBookHandlers.computeIfAbsent(securityId, integer -> new MultipleDepthBookHandler(integer, MdEventFlags.BOOK, (byte) 20)); char mdEntryType = mdEntry.getChar(269); @@ -135,8 +135,7 @@ public void onIncrementalMBPRefresh(final String channelId, final short matchEve } @Override - public void onIncrementalMBORefresh(final String channelId, final short matchEventIndicator, final int securityId, - final String secDesc, final long msgSeqNum, long transactTime, final FieldSet orderIDEntry, final FieldSet mdEntry){ + public void onIncrementalMBORefresh(final MdpMessage mdpMessage, final String channelId, final int securityId, final String secDesc, final long msgSeqNum, final FieldSet orderIDEntry, final FieldSet mdEntry){ try { long orderId; long mdOrderPriority; @@ -144,6 +143,7 @@ public void onIncrementalMBORefresh(final String channelId, final short matchEve long mdDisplayQty; int mdUpdateAction; char mdEntryType; + short matchEventIndicator = mdpMessage.getUInt8(SbeConstants.MATCHEVENTINDICATOR_TAG); if (mdEntry == null) {//MBO only orderId = orderIDEntry.getUInt64(37); mdOrderPriority = orderIDEntry.getUInt64(37707); @@ -184,8 +184,8 @@ public void onIncrementalMBORefresh(final String channelId, final short matchEve } @Override - public void onIncrementalComplete(final String channelId, final int securityId, final long msgSeqNum){ - logger.trace(String.format("onIncrementalComplete : ChannelId: %s, SecurityId: %s, MsgSeqNum: %s", channelId, securityId, msgSeqNum)); + public void onIncrementalComplete(final MdpMessage mdpMessage, final String channelId, final int securityId, final String secDesc, final long msgSeqNum){ + logger.trace(String.format("onIncrementalComplete : ChannelId: %s, SecurityId: %s, SecurityDesc: %s, MsgSeqNum: %s", channelId, securityId, secDesc, msgSeqNum)); } @Override diff --git a/mbp-with-mbo/src/test/java/com/epam/cme/mdp3/test/TestChannelListener.java b/mbp-with-mbo/src/test/java/com/epam/cme/mdp3/test/TestChannelListener.java index c6b2824..d385dd3 100644 --- a/mbp-with-mbo/src/test/java/com/epam/cme/mdp3/test/TestChannelListener.java +++ b/mbp-with-mbo/src/test/java/com/epam/cme/mdp3/test/TestChannelListener.java @@ -8,6 +8,8 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import com.epam.cme.mdp3.sbe.message.SbeConstants; + public class TestChannelListener implements ChannelListener { private ChannelState prevSate; private ChannelState currentSate; @@ -55,19 +57,17 @@ public int onSecurityDefinition(String channelId, MdpMessage secDefMessage) { } @Override - public void onIncrementalMBORefresh(final String channelId, final short matchEventIndicator, final int securityId, - final String secDesc, final long msgSeqNum, long transactTime, final FieldSet orderIDEntry, final FieldSet mdEntry){ - incrementMBOQueue.add(new IncrementalRefreshEntity(channelId, matchEventIndicator, securityId, secDesc, msgSeqNum, transactTime, orderIDEntry != null ? orderIDEntry.copy() : null, mdEntry != null ? mdEntry.copy() : null)); + public void onIncrementalMBORefresh(final MdpMessage mdpMessage, final String channelId, final int securityId, final String secDesc, final long msgSeqNum, final FieldSet orderIDEntry, final FieldSet mdEntry){ + incrementMBOQueue.add(new IncrementalRefreshEntity(mdpMessage, channelId, securityId, secDesc, msgSeqNum, orderIDEntry != null ? orderIDEntry.copy() : null, mdEntry != null ? mdEntry.copy() : null)); } @Override - public void onIncrementalMBPRefresh(final String channelId, final short matchEventIndicator, final int securityId, - final String secDesc, final long msgSeqNum, long transactTime, final FieldSet mdEntry){ - incrementMBPQueue.add(new IncrementalRefreshEntity(channelId, matchEventIndicator, securityId, secDesc, msgSeqNum, transactTime, (mdEntry != null) ? mdEntry.copy() : null)); + public void onIncrementalMBPRefresh(final MdpMessage mdpMessage, final String channelId, final int securityId, final String secDesc, final long msgSeqNum, final FieldSet mdEntry){ + incrementMBPQueue.add(new IncrementalRefreshEntity(mdpMessage, channelId, securityId, secDesc, msgSeqNum, (mdEntry != null) ? mdEntry.copy() : null)); } @Override - public void onIncrementalComplete(final String channelId, final int securityId, final long msgSeqNum){ + public void onIncrementalComplete(final MdpMessage mdpMessage, final String channelId, final int securityId, final String secDesc, final long msgSeqNum){ } @Override @@ -119,38 +119,36 @@ public ChannelState getPrevSate() { } public class IncrementalRefreshEntity { - private String channelId; - private short matchEventIndicator; + private MdpMessage mdpMessage; + private String channelId; private int securityId; private String secDesc; private long msgSeqNum; - private long transactTime; private FieldSet orderIDEntry; private FieldSet mdEntry; - public IncrementalRefreshEntity(String channelId, short matchEventIndicator, int securityId, String secDesc, long msgSeqNum, long transactTime, FieldSet mdEntry) { - this(channelId, matchEventIndicator, securityId, secDesc, msgSeqNum, transactTime, null, mdEntry); + public IncrementalRefreshEntity(MdpMessage mdpMessage, String channelId, int securityId, String secDesc, long msgSeqNum, FieldSet mdEntry) { + this(mdpMessage, channelId, securityId, secDesc, msgSeqNum, null, mdEntry); } - public IncrementalRefreshEntity(String channelId, short matchEventIndicator, int securityId, String secDesc, long msgSeqNum, long transactTime, FieldSet orderIDEntry, FieldSet mdEntry) { + public IncrementalRefreshEntity(MdpMessage mdpMessage, String channelId, int securityId, String secDesc, long msgSeqNum, FieldSet orderIDEntry, FieldSet mdEntry) { + this.mdpMessage = mdpMessage; this.channelId = channelId; - this.matchEventIndicator = matchEventIndicator; this.securityId = securityId; this.secDesc = secDesc; this.msgSeqNum = msgSeqNum; - this.transactTime = transactTime; this.orderIDEntry = orderIDEntry; this.mdEntry = mdEntry; } + public MdpMessage getMDPMEssage() { + return mdpMessage; + } + public String getChannelId() { return channelId; } - public short getMatchEventIndicator() { - return matchEventIndicator; - } - public int getSecurityId() { return securityId; } @@ -162,10 +160,6 @@ public String getSecDesc() { public long getMsgSeqNum() { return msgSeqNum; } - - public long getTransactTime() { - return transactTime; - } public FieldSet getOrderIDEntry() { return orderIDEntry; diff --git a/mbp-with-mbo/src/test/java/com/epam/cme/mdp3/test/control/MBOChannelControllerRouterTest.java b/mbp-with-mbo/src/test/java/com/epam/cme/mdp3/test/control/MBOChannelControllerRouterTest.java index e9b56f3..eb8826f 100644 --- a/mbp-with-mbo/src/test/java/com/epam/cme/mdp3/test/control/MBOChannelControllerRouterTest.java +++ b/mbp-with-mbo/src/test/java/com/epam/cme/mdp3/test/control/MBOChannelControllerRouterTest.java @@ -3,6 +3,7 @@ import com.epam.cme.mdp3.*; import com.epam.cme.mdp3.control.*; import com.epam.cme.mdp3.core.channel.MdpFeedContext; +import com.epam.cme.mdp3.sbe.message.SbeConstants; import com.epam.cme.mdp3.sbe.schema.MdpMessageTypes; import com.epam.cme.mdp3.test.Constants; import com.epam.cme.mdp3.test.ModelUtils; @@ -78,7 +79,7 @@ public void controllerMustProcessMBOIncrementAndSendToClient() throws Interrupte assertNotNull(incrementalRefreshEntity); Assert.assertEquals(channelId, incrementalRefreshEntity.getChannelId()); Assert.assertEquals(secDesc, incrementalRefreshEntity.getSecDesc()); - Assert.assertEquals(1, incrementalRefreshEntity.getMatchEventIndicator()); + Assert.assertEquals(1, incrementalRefreshEntity.getMDPMEssage().getUInt8(SbeConstants.MATCHEVENTINDICATOR_TAG)); Assert.assertEquals(1, incrementalRefreshEntity.getMsgSeqNum()); FieldSet orderIDEntry = incrementalRefreshEntity.getOrderIDEntry(); assertNotNull(orderIDEntry); From 500a28976ad270f3687f791ab1b8e369ce8b2f89 Mon Sep 17 00:00:00 2001 From: Steven Warwick Date: Wed, 26 Sep 2018 12:55:02 -0400 Subject: [PATCH 3/5] Removing invalid error log --- .../epam/cme/mdp3/control/GapChannelController.java | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/GapChannelController.java b/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/GapChannelController.java index 9b11e76..7542b89 100644 --- a/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/GapChannelController.java +++ b/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/GapChannelController.java @@ -78,13 +78,13 @@ public GapChannelController(List channelListeners, ChannelContr } @Override - public List getMBOIncrementMessageTemplateIds() { - return mboIncrementMessageTemplateIds == null ? MdpChannelController.super.getMBOIncrementMessageTemplateIds() : mboIncrementMessageTemplateIds; + public List getMBOIncrementMessageTemplateIds() { + return mboIncrementMessageTemplateIds == null ? MdpChannelController.super.getMBOIncrementMessageTemplateIds() : mboIncrementMessageTemplateIds; } @Override public List getMBOSnapshotMessageTemplateIds() { - return mboSnapshotMessageTemplateIds == null ? MdpChannelController.super.getMBOSnapshotMessageTemplateIds() : mboSnapshotMessageTemplateIds; + return mboSnapshotMessageTemplateIds == null ? MdpChannelController.super.getMBOSnapshotMessageTemplateIds() : mboSnapshotMessageTemplateIds; } @Override @@ -143,11 +143,7 @@ public void handleSnapshotPacket(MdpFeedContext feedContext, MdpPacket mdpPacket target.handleSnapshotPacket(feedContext, mdpPacket); } break; - case CLOSED: - case CLOSING: - case SYNC: default: - log.error("handleSnapshotPacket invalid current state: {}", currentState); break; } } finally { @@ -205,10 +201,7 @@ public void handleIncrementalPacket(MdpFeedContext feedContext, MdpPacket mdpPac feedContext.getFeedType(), feedContext.getFeed(), currentState, pkgSequence); } break; - case CLOSED: - case CLOSING: default: - log.error("handleIncrementalPacket invalid current state: {}", currentState); break; } } finally { From 060a9f34a8cf8481d4c40796894f6907d32f1958 Mon Sep 17 00:00:00 2001 From: Steven Warwick Date: Thu, 4 Oct 2018 09:45:25 -0400 Subject: [PATCH 4/5] Moving creation of the securityIds set to the class and calling reset on each new incremental to avoid memory creation. --- .../com/epam/cme/mdp3/control/ChannelControllerRouter.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/ChannelControllerRouter.java b/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/ChannelControllerRouter.java index b7aece5..9c4953a 100644 --- a/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/ChannelControllerRouter.java +++ b/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/ChannelControllerRouter.java @@ -41,6 +41,7 @@ public class ChannelControllerRouter implements MdpChannelController { private final String channelId; private List mboIncrementMessageTemplateIds; private List mboSnapshotMessageTemplateIds; + private Set securityIds = new HashSet<>(); public ChannelControllerRouter(String channelId, InstrumentManager instrumentManager, MdpMessageTypes mdpMessageTypes, List channelListeners, @@ -144,7 +145,7 @@ protected void routeIncrementalComplete(Set securityIds, MdpMessage mdp private void handleIncrementalMessage(MdpMessage mdpMessage, MdpGroup mdpGroup, MdpGroupEntry mdpGroupEntry, long msgSeqNum){ if (isIncrementalMessageSupported(mdpMessage)) { - Set securityIds = new HashSet<>(); + securityIds.clear(); if (isIncrementOnlyForMBO(mdpMessage)) { mdpMessage.getGroup(MdConstants.NO_MD_ENTRIES, mdpGroup); while (mdpGroup.hashNext()) { From 6c275e838e9feba27af7631ad3141bcccf5e5226 Mon Sep 17 00:00:00 2001 From: Steven Warwick Date: Mon, 8 Oct 2018 08:23:07 -0400 Subject: [PATCH 5/5] Replacing generic java hashset with IntHashSet to remove boxing/unboxing of Integer. --- .../epam/cme/mdp3/control/ChannelControllerRouter.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/ChannelControllerRouter.java b/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/ChannelControllerRouter.java index 9c4953a..89e5df9 100644 --- a/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/ChannelControllerRouter.java +++ b/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/ChannelControllerRouter.java @@ -21,10 +21,9 @@ import com.epam.cme.mdp3.sbe.schema.MdpMessageTypes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.agrona.collections.IntHashSet; -import java.util.HashSet; import java.util.List; -import java.util.Set; import java.util.function.Consumer; import static com.epam.cme.mdp3.MdConstants.INCR_RFRSH_MD_ENTRY_TYPE; @@ -41,7 +40,7 @@ public class ChannelControllerRouter implements MdpChannelController { private final String channelId; private List mboIncrementMessageTemplateIds; private List mboSnapshotMessageTemplateIds; - private Set securityIds = new HashSet<>(); + private IntHashSet securityIds = new IntHashSet(); public ChannelControllerRouter(String channelId, InstrumentManager instrumentManager, MdpMessageTypes mdpMessageTypes, List channelListeners, @@ -134,8 +133,8 @@ protected void routeMBPEntry(int securityId, MdpMessage mdpMessage, MdpGroupEntr } } - protected void routeIncrementalComplete(Set securityIds, MdpMessage mdpMessage, long msgSeqNum) { - for (Integer securityId : securityIds) { + protected void routeIncrementalComplete(IntHashSet securityIds, MdpMessage mdpMessage, long msgSeqNum) { + for (int securityId : securityIds) { InstrumentController instrumentController = instrumentManager.getInstrumentController(securityId); if (instrumentController != null) { instrumentController.handleIncrementalComplete(mdpMessage, msgSeqNum);