Skip to content

Commit

Permalink
Merge pull request #51 from swarwick/master
Browse files Browse the repository at this point in the history
 Issue #47 and Issue #48
  • Loading branch information
iamolever authored Oct 8, 2018
2 parents dbfa153 + 6c275e8 commit c6bdec7
Show file tree
Hide file tree
Showing 21 changed files with 112 additions and 71 deletions.
2 changes: 1 addition & 1 deletion core/build.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
group 'com.epam.cme'
version '2.0.2'
version '2.1.0'

apply plugin: 'java'

Expand Down
1 change: 0 additions & 1 deletion core/src/main/java/com/epam/cme/mdp3/MdpPacket.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

import com.epam.cme.mdp3.sbe.message.SbeBuffer;
import com.epam.cme.mdp3.sbe.message.SbeBufferImpl;
import com.epam.cme.mdp3.sbe.message.SbeConstants;
import com.epam.cme.mdp3.sbe.message.SbeMessage;

import java.nio.ByteBuffer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@

package com.epam.cme.mdp3.core.channel;

public class
MdpFeedException extends Exception {
public class MdpFeedException extends Exception {

private static final long serialVersionUID = -6425092985801210129L;

public MdpFeedException() {
super();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@

public class MdpFeedWorker implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(MdpFeedWorker.class);
private static final int ACTIVE_MARK = 1;
private static final int SHUTDOWN_MARK = 2;
public static final int RCV_BUFFER_SIZE = 4*1024*1024;

private final ConnectionCfg cfg;
Expand Down Expand Up @@ -167,7 +165,7 @@ public void run() {

private void select(final ByteBuffer byteBuffer, final MdpPacket mdpPacket) throws IOException {
if (selector.select() > 0) {
Iterator selectedKeys = selector.selectedKeys().iterator();
Iterator<?> selectedKeys = selector.selectedKeys().iterator();
while (selectedKeys.hasNext()) {
final SelectionKey key = (SelectionKey) selectedKeys.next();
selectedKeys.remove();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ public interface SbeBuffer {
*/
void wrapForParse(final ByteBuffer bb);

void copyTo(BytesStore store);
void copyTo(BytesStore<?, ?> store);

void copyTo(int offset, BytesStore store, int len);
void copyTo(int offset, BytesStore<?, ?> store, int len);

void copyFrom(BytesStore store);
void copyFrom(BytesStore<?, ?> store);

void copyFrom(SbeBuffer buffer);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

public class SbeBufferImpl extends AbstractSbeBuffer implements SbeBuffer {
public static final byte BYTE_MASK = (byte) 0xff;
protected BytesStore bytes;
protected BytesStore<?, ?> bytes;

@Override
public void wrap(final SbeBuffer sb) {
Expand All @@ -31,17 +31,17 @@ public void wrap(final SbeBuffer sb) {
}

@Override
public void copyTo(final BytesStore store) {
public void copyTo(final BytesStore<?, ?> store) {
store.write(0, this.bytes, 0, this.length());
}

@Override
public void copyTo(final int offset, final BytesStore store, final int len) {
public void copyTo(final int offset, final BytesStore<?, ?> store, final int len) {
store.write(0, this.bytes, offset, len);
}

@Override
public void copyFrom(BytesStore store) {
public void copyFrom(BytesStore<?, ?> store) {
this.bytes.write(0, store, 0, store.length());
this.length = store.length();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ public long getMaxValue() {
public long getNullValue() {
return nullValue;
}

public String getJavaType() {
return javaType;
}

public boolean isNull(final long value) {
return value == this.nullValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
package com.epam.cme.mdp3.sbe.schema;

public class MdpMessageTypeBuildException extends Exception {
private static final long serialVersionUID = -6385550323191516252L;

public MdpMessageTypeBuildException() {
super();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
package com.epam.cme.mdp3.sbe.schema;

public class SchemaUnmarshallingException extends Exception {
private static final long serialVersionUID = 1944148130312471541L;

public SchemaUnmarshallingException() {
super();
}
Expand Down
2 changes: 1 addition & 1 deletion mbp-only/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ plugins {
id "me.champeau.gradle.jmh" version "0.3.1"
}
group = 'com.epam.cme'
version = '2.0.2'
version = '2.1.0'
sourceCompatibility = 1.8
targetCompatibility = 1.8

Expand Down
2 changes: 1 addition & 1 deletion mbp-with-mbo/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ plugins {
}

group = 'com.epam.cme'
version = '2.0.2'
version = '2.1.0'
sourceCompatibility = 1.8
targetCompatibility = 1.8

Expand Down
22 changes: 16 additions & 6 deletions mbp-with-mbo/src/main/java/com/epam/cme/mdp3/ChannelListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,39 @@ public interface ChannelListener extends CoreChannelListener {
*
* Only when MBO is enabled.
*
* @param mdpMessage The full MDP Message
* @param channelId ID of MDP Channel
* @param matchEventIndicator MDP Event indicator (bitset, @see <a href="http://www.cmegroup.com/confluence/display/EPICSANDBOX/MDP+3.0+-+Market+Data+Incremental+Refresh">MDP 3.0 - Market Data Incremental Refresh</a>)
* @param secDesc Security description
* @param msgSeqNum Message sequence number of message.
* @param securityId Security ID
* @param orderEntry MBO Entry of Group from MDP Incremental Refresh Message. It can not be null.
* @param mdEntry MBP Entry of Group from MDP Incremental Refresh Message. It can be null when MBO Incremental Refresh is received in separated template.
*/
void onIncrementalMBORefresh(final String channelId, final short matchEventIndicator, final int securityId,
final String secDesc, final long msgSeqNum, final FieldSet orderEntry, final FieldSet mdEntry);
void onIncrementalMBORefresh(final MdpMessage mdpMessage, final String channelId, final int securityId, final String secDesc, final long msgSeqNum, final FieldSet orderEntry, final FieldSet mdEntry);

/**
*
* @param mdpMessage The full MDP Message
* @param channelId ID of MDP Channel
* @param matchEventIndicator MDP Event indicator (bitset, @see <a href="http://www.cmegroup.com/confluence/display/EPICSANDBOX/MDP+3.0+-+Market+Data+Incremental+Refresh">MDP 3.0 - Market Data Incremental Refresh</a>)
* @param securityId Security ID
* @param secDesc Security description
* @param msgSeqNum Message sequence number of message.
* @param mdEntry MBP Entry of Group from MDP Incremental Refresh Message. It can not be null.
*/
void onIncrementalMBPRefresh(final String channelId, final short matchEventIndicator, final int securityId,
final String secDesc, final long msgSeqNum, final FieldSet mdEntry);
void onIncrementalMBPRefresh(final MdpMessage mdpMessage, final String channelId, final int securityId, final String secDesc, final long msgSeqNum, final FieldSet mdEntry);

/**
* Called when a Incremental MsgSeqNum has been fully processed
* This callback will be called for each securityId found in the MsgSeqNum packet
*
* @param mdpMessage The full MDP Message
* @param channelId ID of MDP Channel
* @param securityId Security ID
* @param secDesc Security description
* @param msgSeqNum Message sequence number of message.
*/
void onIncrementalComplete(final MdpMessage mdpMessage, final String channelId, final int securityId, final String secDesc, final long msgSeqNum);

/**
* Called when MDP Snapshot Full Refresh Message for MBO is received and processed.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,17 @@ default void onSecurityStatus(String channelId, int securityId, MdpMessage secSt
}

@Override
default void onIncrementalMBORefresh(String channelId, short matchEventIndicator, int securityId, String secDesc, long msgSeqNum, FieldSet orderEntry, FieldSet mdEntry) {
default void onIncrementalMBORefresh(MdpMessage mdpMessage, String channelId, int securityId, String secDesc, long msgSeqNum, FieldSet orderEntry, FieldSet mdEntry) {

}

@Override
default void onIncrementalMBPRefresh(String channelId, short matchEventIndicator, int securityId, String secDesc, long msgSeqNum, FieldSet mdEntry) {
default void onIncrementalMBPRefresh(MdpMessage mdpMessage, String channelId, int securityId, String secDesc, long msgSeqNum, FieldSet mdEntry) {

}

@Override
default void onIncrementalComplete(MdpMessage mdpMessage, String channelId, int securityId, String secDesc, long msgSeqNum) {

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.epam.cme.mdp3.sbe.schema.MdpMessageTypes;

import java.net.URI;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -35,7 +34,6 @@ public class MdpChannelBuilder {
private final String channelId;
private URI cfgURI;
private URI schemaURI;
private MdpMessageTypes mdpMessageTypes;
private Map<FeedType, String> feedANetworkInterfaces = new HashMap<>();
private Map<FeedType, String> feedBNetworkInterfaces = new HashMap<>();
private ChannelListener channelListener;
Expand Down Expand Up @@ -66,7 +64,6 @@ public MdpChannelBuilder setConfiguration(final URI cfgURI) {

public MdpChannelBuilder setSchema(final URI schemaURI) throws MdpMessageTypeBuildException {
this.schemaURI = schemaURI;
this.mdpMessageTypes = new MdpMessageTypes(this.schemaURI);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.epam.cme.mdp3.sbe.schema.MdpMessageTypes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.agrona.collections.IntHashSet;

import java.util.List;
import java.util.function.Consumer;
Expand All @@ -39,6 +40,7 @@ public class ChannelControllerRouter implements MdpChannelController {
private final String channelId;
private List<Integer> mboIncrementMessageTemplateIds;
private List<Integer> mboSnapshotMessageTemplateIds;
private IntHashSet securityIds = new IntHashSet();

public ChannelControllerRouter(String channelId, InstrumentManager instrumentManager,
MdpMessageTypes mdpMessageTypes, List<ChannelListener> channelListeners,
Expand Down Expand Up @@ -131,14 +133,25 @@ protected void routeMBPEntry(int securityId, MdpMessage mdpMessage, MdpGroupEntr
}
}

protected void routeIncrementalComplete(IntHashSet securityIds, MdpMessage mdpMessage, long msgSeqNum) {
for (int securityId : securityIds) {
InstrumentController instrumentController = instrumentManager.getInstrumentController(securityId);
if (instrumentController != null) {
instrumentController.handleIncrementalComplete(mdpMessage, msgSeqNum);
}
}
}

private void handleIncrementalMessage(MdpMessage mdpMessage, MdpGroup mdpGroup, MdpGroupEntry mdpGroupEntry, long msgSeqNum){
if (isIncrementalMessageSupported(mdpMessage)) {
securityIds.clear();
if (isIncrementOnlyForMBO(mdpMessage)) {
mdpMessage.getGroup(MdConstants.NO_MD_ENTRIES, mdpGroup);
while (mdpGroup.hashNext()) {
mdpGroup.next();
mdpGroup.getEntry(mdpGroupEntry);
int securityId = getSecurityId(mdpGroupEntry);
securityIds.add(securityId);
routeMBOEntry(securityId, mdpMessage, mdpGroupEntry, null, msgSeqNum);
}
} else {
Expand All @@ -151,6 +164,7 @@ private void handleIncrementalMessage(MdpMessage mdpMessage, MdpGroup mdpGroup,
emptyBookConsumers.forEach(mdpMessageConsumer -> mdpMessageConsumer.accept(mdpMessage));
} else {
int securityId = mdEntry.getInt32(MdConstants.SECURITY_ID);
securityIds.add(securityId);
routeMBPEntry(securityId, mdpMessage, mdEntry, msgSeqNum);
}
}
Expand All @@ -163,10 +177,12 @@ private void handleIncrementalMessage(MdpMessage mdpMessage, MdpGroup mdpGroup,
short entryNum = mdpGroupEntry.getUInt8(MdConstants.REFERENCE_ID);
noMdEntriesGroup.getEntry(entryNum, mdEntry);
int securityId = mdEntry.getInt32(MdConstants.SECURITY_ID);
securityIds.add(securityId);
routeMBOEntry(securityId, mdpMessage, mdpGroupEntry, mdEntry, msgSeqNum);
}
}
}
routeIncrementalComplete(securityIds, mdpMessage, msgSeqNum);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,13 @@ public GapChannelController(List<ChannelListener> channelListeners, ChannelContr
}

@Override
public List<Integer> getMBOIncrementMessageTemplateIds() {
return mboIncrementMessageTemplateIds == null ? MdpChannelController.super.getMBOIncrementMessageTemplateIds() : mboIncrementMessageTemplateIds;
public List<Integer> getMBOIncrementMessageTemplateIds() {
return mboIncrementMessageTemplateIds == null ? MdpChannelController.super.getMBOIncrementMessageTemplateIds() : mboIncrementMessageTemplateIds;
}

@Override
public List<Integer> getMBOSnapshotMessageTemplateIds() {
return mboSnapshotMessageTemplateIds == null ? MdpChannelController.super.getMBOSnapshotMessageTemplateIds() : mboSnapshotMessageTemplateIds;
return mboSnapshotMessageTemplateIds == null ? MdpChannelController.super.getMBOSnapshotMessageTemplateIds() : mboSnapshotMessageTemplateIds;
}

@Override
Expand Down Expand Up @@ -143,6 +143,8 @@ public void handleSnapshotPacket(MdpFeedContext feedContext, MdpPacket mdpPacket
target.handleSnapshotPacket(feedContext, mdpPacket);
}
break;
default:
break;
}
} finally {
lock.unlock();
Expand Down Expand Up @@ -199,6 +201,8 @@ public void handleIncrementalPacket(MdpFeedContext feedContext, MdpPacket mdpPac
feedContext.getFeedType(), feedContext.getFeed(), currentState, pkgSequence);
}
break;
default:
break;
}
} finally {
lock.unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
import com.epam.cme.mdp3.ChannelListener;
import com.epam.cme.mdp3.MdpGroupEntry;
import com.epam.cme.mdp3.MdpMessage;
import com.epam.cme.mdp3.sbe.message.SbeConstants;

import java.util.List;

public class InstrumentController {
Expand All @@ -35,18 +33,24 @@ public InstrumentController(List<ChannelListener> listeners, String channelId, i

public void handleMBOIncrementMDEntry(MdpMessage mdpMessage, MdpGroupEntry orderIDEntry, MdpGroupEntry mdEntry, long msgSeqNum){
if(enable) {
short matchEventIndicator = mdpMessage.getUInt8(SbeConstants.MATCHEVENTINDICATOR_TAG);
for (ChannelListener channelListener : listeners) {
channelListener.onIncrementalMBORefresh(channelId, matchEventIndicator, securityId, secDesc, msgSeqNum, orderIDEntry, mdEntry);
channelListener.onIncrementalMBORefresh(mdpMessage, channelId, securityId, secDesc, msgSeqNum, orderIDEntry, mdEntry);
}
}
}

public void handleMBPIncrementMDEntry(MdpMessage mdpMessage, MdpGroupEntry mdEntry, long msgSeqNum){
if(enable) {
short matchEventIndicator = mdpMessage.getUInt8(SbeConstants.MATCHEVENTINDICATOR_TAG);
for (ChannelListener channelListener : listeners) {
channelListener.onIncrementalMBPRefresh(channelId, matchEventIndicator, securityId, secDesc, msgSeqNum, mdEntry);
channelListener.onIncrementalMBPRefresh(mdpMessage, channelId, securityId, secDesc, msgSeqNum, mdEntry);
}
}
}

public void handleIncrementalComplete(MdpMessage mdpMessage, long msgSeqNum) {
if(enable) {
for (ChannelListener channelListener : listeners) {
channelListener.onIncrementalComplete(mdpMessage, channelId, securityId, secDesc, msgSeqNum);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ private void clearArray(LongArray array) {
}

private class LongArray {
private BytesStore bytes;
private BytesStore<?, ?> bytes;
private long length;

public LongArray(long length) {
Expand All @@ -149,14 +149,6 @@ public void setValue(long index, long value) {
}
}

public BytesStore getBytes() {
return bytes;
}

public void setBytes(BytesStore bytes) {
this.bytes = bytes;
}

public long getLength() {
return length;
}
Expand All @@ -183,10 +175,6 @@ public T getValue() {
return value;
}

public void setValue(T value) {
this.value = value;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down
Loading

0 comments on commit c6bdec7

Please sign in to comment.