Skip to content
This repository has been archived by the owner on Jun 8, 2020. It is now read-only.

#502 kraken resubscribe issue #551

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,6 @@
import info.bitrich.xchangestream.kraken.dto.enums.KrakenSubscriptionName;
import info.bitrich.xchangestream.service.netty.StreamingObjectMapperHelper;
import io.reactivex.Observable;
import java.math.BigDecimal;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.commons.beanutils.ConvertUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.knowm.xchange.currency.CurrencyPair;
Expand All @@ -27,6 +20,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.math.BigDecimal;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you run a maven build so the coveo-fmt plugin runs?

We've introduced that to avoid formatting issues like these creeping into PRs.

Thanks!

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

/** @author makarid, pchertalev */
public class KrakenStreamingMarketDataService implements StreamingMarketDataService {

Expand Down Expand Up @@ -145,7 +146,10 @@ public Observable<List> subscribe(String channelName, int maxItems, Integer dept

private String getChannelName(
KrakenSubscriptionName subscriptionName, CurrencyPair currencyPair) {
String pair = currencyPair.base.toString() + "/" + currencyPair.counter.toString();
String pair =
KrakenAdapters.adaptCurrencyPair(
currencyPair.base.getCurrencyCode() + currencyPair.counter.getCurrencyCode())
.toString();
return subscriptionName + KRAKEN_CHANNEL_DELIMITER + pair;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package info.bitrich.xchangestream.kraken;

import static info.bitrich.xchangestream.kraken.dto.enums.KrakenEventType.subscribe;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;
import info.bitrich.xchangestream.kraken.dto.KrakenSubscriptionConfig;
import info.bitrich.xchangestream.kraken.dto.KrakenSubscriptionMessage;
import info.bitrich.xchangestream.kraken.dto.KrakenSubscriptionStatusMessage;
Expand All @@ -18,25 +17,34 @@
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.reactivex.Completable;
import io.reactivex.Observable;
import org.apache.commons.lang3.StringUtils;
import org.knowm.xchange.kraken.KrakenAdapters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static info.bitrich.xchangestream.kraken.KrakenStreamingMarketDataService.KRAKEN_CHANNEL_DELIMITER;
import static info.bitrich.xchangestream.kraken.dto.enums.KrakenEventType.subscribe;

/** @author makarid, pchertalev */
public class KrakenStreamingService extends JsonNettyStreamingService {
private static final Logger LOG = LoggerFactory.getLogger(KrakenStreamingService.class);
private static final String EVENT = "event";
private final Map<Integer, String> channels = new ConcurrentHashMap<>();
private final Map<Integer, String> channelIds = new ConcurrentHashMap<>();
private ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper();
private final boolean isPrivate;

private final Map<Integer, String> subscriptionRequestMap = new ConcurrentHashMap<>();
private final Map<Integer, Set<String>> subscriptionRequestMap = new ConcurrentHashMap<>();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may find guava synchronized multimaps deal with this better and more thread-safely (not convinced that the remove() is being used in a thread-safe way).


public KrakenStreamingService(boolean isPrivate, String uri) {
super(uri, Integer.MAX_VALUE);
Expand Down Expand Up @@ -71,19 +79,30 @@ protected void handleMessage(JsonNode message) {
KrakenSubscriptionStatusMessage statusMessage =
mapper.treeToValue(message, KrakenSubscriptionStatusMessage.class);
Integer reqid = statusMessage.getReqid();
if (!isPrivate && reqid != null) channelName = subscriptionRequestMap.remove(reqid);

String currencyPair =
KrakenAdapters.adaptCurrencyPair(statusMessage.getPair().replace("/", ""))
.toString();
if (!isPrivate && reqid != null) {
Set<String> channelsList = subscriptionRequestMap.get(reqid);
channelName =
statusMessage.getKrakenSubscriptionConfig().getName()
+ KRAKEN_CHANNEL_DELIMITER
+ currencyPair;
channelsList.remove(channelName);
if (channelsList.isEmpty()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the bit I'm not convinced is thread-safe.

subscriptionRequestMap.remove(reqid);
}
}
switch (statusMessage.getStatus()) {
case subscribed:
LOG.info("Channel {} has been subscribed", channelName);

if (statusMessage.getChannelID() != null)
channels.put(statusMessage.getChannelID(), channelName);

if (statusMessage.getChannelID() != null) {
channelIds.put(statusMessage.getChannelID(), channelName);
}
break;
case unsubscribed:
LOG.info("Channel {} has been unsubscribed", channelName);
channels.remove(statusMessage.getChannelID());
channelIds.remove(statusMessage.getChannelID());
break;
case error:
LOG.error(
Expand Down Expand Up @@ -118,15 +137,15 @@ protected void handleMessage(JsonNode message) {
protected String getChannelNameFromMessage(JsonNode message) throws IOException {
String channelName = null;
if (message.has("channelID")) {
channelName = channels.get(message.get("channelID").asInt());
channelName = channelIds.get(message.get("channelID").asInt());
}
if (message.has("channelName")) {
channelName = message.get("channelName").asText();
}

if (message.isArray()) {
if (message.get(0).isInt()) {
channelName = channels.get(message.get(0).asInt());
channelName = channelIds.get(message.get(0).asInt());
}
if (message.get(1).isTextual()) {
channelName = message.get(1).asText();
Expand All @@ -142,8 +161,7 @@ protected String getChannelNameFromMessage(JsonNode message) throws IOException
@Override
public String getSubscribeMessage(String channelName, Object... args) throws IOException {
int reqID = Math.abs(UUID.randomUUID().hashCode());
String[] channelData =
channelName.split(KrakenStreamingMarketDataService.KRAKEN_CHANNEL_DELIMITER);
String[] channelData = channelName.split(KRAKEN_CHANNEL_DELIMITER);
KrakenSubscriptionName subscriptionName = KrakenSubscriptionName.valueOf(channelData[0]);

if (isPrivate) {
Expand All @@ -161,7 +179,7 @@ public String getSubscribeMessage(String channelName, Object... args) throws IOE
if (args.length > 0 && args[0] != null) {
depth = (Integer) args[0];
}
subscriptionRequestMap.put(reqID, channelName);
subscriptionRequestMap.put(reqID, Sets.newHashSet(channelName));

KrakenSubscriptionMessage subscriptionMessage =
new KrakenSubscriptionMessage(
Expand All @@ -176,8 +194,7 @@ public String getSubscribeMessage(String channelName, Object... args) throws IOE
@Override
public String getUnsubscribeMessage(String channelName) throws IOException {
int reqID = Math.abs(UUID.randomUUID().hashCode());
String[] channelData =
channelName.split(KrakenStreamingMarketDataService.KRAKEN_CHANNEL_DELIMITER);
String[] channelData = channelName.split(KRAKEN_CHANNEL_DELIMITER);
KrakenSubscriptionName subscriptionName = KrakenSubscriptionName.valueOf(channelData[0]);

if (isPrivate) {
Expand All @@ -191,7 +208,7 @@ public String getUnsubscribeMessage(String channelName) throws IOException {
} else {
String pair = channelData[1];

subscriptionRequestMap.put(reqID, channelName);
subscriptionRequestMap.put(reqID, Sets.newHashSet(channelName));
KrakenSubscriptionMessage subscriptionMessage =
new KrakenSubscriptionMessage(
reqID,
Expand Down Expand Up @@ -252,4 +269,50 @@ public void channelInactive(ChannelHandlerContext ctx) {
}
}
}

@Override
public void resubscribeChannels() {
if (isPrivate) {
super.resubscribeChannels();
} else {
subscriptionRequestMap.clear();
channelIds.clear();
HashMap<String, KrakenSubscriptionMessage> messages = new HashMap<>();
for (Map.Entry<String, Subscription> entry : super.channels.entrySet()) {

String[] channelData = entry.getKey().split(KRAKEN_CHANNEL_DELIMITER);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this would be easier to read if maybe you created a KrakenSubscription object and added a decode method, e.g.

private KrakenSubscription decodeSubscription(Subscription subscription) {}

KrakenSubscriptionName subscriptionName = KrakenSubscriptionName.valueOf(channelData[0]);

String pair = channelData[1];
Object[] args = entry.getValue().getArgs();
Integer depth = null;
if (args.length > 0 && args[0] != null) {
depth = (Integer) args[0];
}

Integer finalDepth = depth;
KrakenSubscriptionMessage toSend =
messages.computeIfAbsent(
subscriptionName + (depth == null ? "" : (KRAKEN_CHANNEL_DELIMITER + depth)),
d ->
new KrakenSubscriptionMessage(
Math.abs(UUID.randomUUID().hashCode()),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will probably collide quite often. Maybe safer to use a static AtomicLong? Does it need to be unique?

subscribe,
new ArrayList<>(),
new KrakenSubscriptionConfig(subscriptionName, finalDepth, null)));
toSend.getPairs().add(pair);
Set<String> channelsSet =
subscriptionRequestMap.computeIfAbsent(toSend.getReqid(), rid -> new HashSet<>());
channelsSet.add(entry.getKey());
}

for (KrakenSubscriptionMessage message : messages.values()) {
try {
sendMessage(objectMapper.writeValueAsString(message));
} catch (IOException e) {
LOG.error("Failed to reconnect channel: {}", message.getPairs());
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.subjects.PublishSubject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
Expand All @@ -49,8 +52,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class NettyStreamingService<T> extends ConnectableService {
private final Logger LOG = LoggerFactory.getLogger(this.getClass());
Expand All @@ -59,7 +60,7 @@ public abstract class NettyStreamingService<T> extends ConnectableService {
protected static final Duration DEFAULT_RETRY_DURATION = Duration.ofSeconds(15);
protected static final int DEFAULT_IDLE_TIMEOUT = 0;

private class Subscription {
protected class Subscription {
final ObservableEmitter<T> emitter;
final String channelName;
final Object[] args;
Expand All @@ -69,6 +70,14 @@ public Subscription(ObservableEmitter<T> emitter, String channelName, Object[] a
this.channelName = channelName;
this.args = args;
}

public String getChannelName() {
return channelName;
}

public Object[] getArgs() {
return args;
}
}

private final int maxFramePayloadLength;
Expand Down