Skip to content

Commit

Permalink
feat(connext): lazy collateralization (#1916)
Browse files Browse the repository at this point in the history
* feat(connext): lazy collateralization

This ads functionality to track reserved outboudn & inbound balances
each tmie an own order is added to or removed from the order book. Every
time an order is added (and reserved amount increases) we check if the
inbound capaacity for that currency is sufficient to cover all orders.
In Connext's case, if it's not sufficient then we make a collateral
request to cover all orders +3% (unless we already have a pending
collateral request waiting). We don't do anything on the lnd side since
lnd can't dynamically increase its inbound capacity.

Closes #1896.

* test(sim): name order ids after test cases
  • Loading branch information
sangaman authored Oct 2, 2020
1 parent bd0210e commit 0f2b841
Show file tree
Hide file tree
Showing 11 changed files with 294 additions and 77 deletions.
92 changes: 57 additions & 35 deletions lib/connextclient/ConnextClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import {
} from './types';
import { parseResponseBody } from '../utils/utils';
import { Observable, fromEvent, from, combineLatest, defer, timer } from 'rxjs';
import { take, pluck, timeout, filter, mergeMap, catchError, mergeMapTo } from 'rxjs/operators';
import { take, pluck, timeout, filter, catchError, mergeMapTo } from 'rxjs/operators';
import { sha256 } from '@ethersproject/solidity';

interface ConnextClient {
Expand Down Expand Up @@ -268,6 +268,28 @@ class ConnextClient extends SwapClient {
return this._totalOutboundAmount.get(currency) || 0;
}

/**
* Checks whether we have a pending collateral request for the currency and,
* if one doesn't exist, starts a new request for the specified amount. Then
* calls channelBalance to refresh the inbound capacity for the currency.
*/
private requestCollateralInBackground = (currency: string, units: number) => {
// first check whether we already have a pending collateral request for this currency
// if not start a new request, and when it completes call channelBalance to refresh our inbound capacity
const requestCollateralPromise = this.requestCollateralPromises.get(currency) ?? this.sendRequest('/request-collateral', 'POST', {
assetId: this.tokenAddresses.get(currency),
amount: units.toLocaleString('fullwide', { useGrouping: false }),
}).then(() => {
this.requestCollateralPromises.delete(currency);
this.logger.debug(`completed collateral request of ${units} ${currency} units`);
return this.channelBalance(currency);
}).catch((err) => {
this.requestCollateralPromises.delete(currency);
this.logger.error(err);
});
this.requestCollateralPromises.set(currency, requestCollateralPromise);
}

public checkInboundCapacity = (inboundAmount: number, currency: string) => {
const inboundCapacity = this._maxChannelInboundAmount.get(currency) || 0;
if (inboundCapacity < inboundAmount) {
Expand All @@ -281,23 +303,32 @@ class ConnextClient extends SwapClient {
ConnextClient.MIN_COLLATERAL_REQUEST_SIZES[currency] ?? 0,
);
const unitsToRequest = this.unitConverter.amountToUnits({ currency, amount: quantityToRequest });

// first check whether we already have a pending collateral request for this currency
// if not start a new request, and when it completes call channelBalance to refresh our inbound capacity
const requestCollateralPromise = this.requestCollateralPromises.get(currency) ?? this.sendRequest('/request-collateral', 'POST', {
assetId: this.tokenAddresses.get(currency),
amount: unitsToRequest.toLocaleString('fullwide', { useGrouping: false }),
}).then(() => {
this.logger.debug(`completed collateral request of ${unitsToRequest} ${currency} units`);
this.requestCollateralPromises.delete(currency);
return this.channelBalance(currency);
}).catch(this.logger.error);
this.requestCollateralPromises.set(currency, requestCollateralPromise);
this.requestCollateralInBackground(currency, unitsToRequest);

throw errors.INSUFFICIENT_COLLATERAL;
}
}

public setReservedInboundAmount = (reservedInboundAmount: number, currency: string) => {
const inboundCapacity = this._maxChannelInboundAmount.get(currency) || 0;
if (inboundCapacity < reservedInboundAmount) {
// we do not have enough inbound capacity to fill all open orders, so we will request more
this.logger.debug(`collateral of ${inboundCapacity} for ${currency} is insufficient for reserved order amount of ${reservedInboundAmount}`);

// we want to make a request for the current collateral plus the greater of any
// minimum request size for the currency or the capacity shortage + 3% buffer
const quantityToRequest = inboundCapacity + Math.max(
reservedInboundAmount * 1.03 - inboundCapacity,
ConnextClient.MIN_COLLATERAL_REQUEST_SIZES[currency] ?? 0,
);
const unitsToRequest = this.unitConverter.amountToUnits({ currency, amount: quantityToRequest });

// we don't await this request - instead we allow for "lazy collateralization" to complete since
// we don't expect all orders to be filled at once, we can be patient
this.requestCollateralInBackground(currency, unitsToRequest);
}
}

protected updateCapacity = async () => {
try {
const channelBalancePromises = [];
Expand Down Expand Up @@ -340,9 +371,11 @@ class ConnextClient extends SwapClient {
await this.sendRequest('/health', 'GET');
await this.initWallet(this.seed);
const config = await this.initConnextClient(this.seed);
await this.subscribePreimage();
await this.subscribeIncomingTransfer();
await this.subscribeDeposit();
await Promise.all([
this.subscribePreimage(),
this.subscribeIncomingTransfer(),
this.subscribeDeposit(),
]);
const { userIdentifier } = config;
this.emit('connectionVerified', {
newIdentifier: userIdentifier,
Expand Down Expand Up @@ -631,8 +664,11 @@ class ConnextClient extends SwapClient {
});

this._totalOutboundAmount.set(currency, freeBalanceAmount);
this._maxChannelInboundAmount.set(currency, nodeFreeBalanceAmount);
this.logger.trace(`new inbound capacity (collateral) for ${currency} of ${nodeFreeBalanceAmount}`);
if (nodeFreeBalanceAmount !== this._maxChannelInboundAmount.get(currency)) {
this._maxChannelInboundAmount.set(currency, nodeFreeBalanceAmount);
this.logger.debug(`new inbound capacity (collateral) for ${currency} of ${nodeFreeBalanceAmount}`);
}

return {
balance: freeBalanceAmount,
inactiveBalance: 0,
Expand Down Expand Up @@ -714,28 +750,14 @@ class ConnextClient extends SwapClient {
const minCollateralRequestQuantity = ConnextClient.MIN_COLLATERAL_REQUEST_SIZES[currency];
if (minCollateralRequestQuantity !== undefined) {
const minCollateralRequestUnits = this.unitConverter.amountToUnits({ currency, amount: minCollateralRequestQuantity });
const channelCollateralized$ = fromEvent(this, 'depositConfirmed').pipe(
const depositConfirmed$ = fromEvent(this, 'depositConfirmed').pipe(
filter(hash => hash === txhash), // only proceed if the incoming hash matches our expected txhash
take(1), // complete the stream after 1 matching event
timeout(86400000), // clear up the listener after 1 day
mergeMap(() => {
// use defer to only create the inner observable when the outer one subscribes
return defer(() => {
return from(
this.sendRequest('/request-collateral', 'POST', {
assetId,
amount: (minCollateralRequestUnits.toLocaleString('fullwide', { useGrouping: false })),
}),
);
});
}),
);
channelCollateralized$.subscribe({
depositConfirmed$.subscribe({
complete: () => {
this.logger.verbose(`collateralized channel for ${currency}`);
},
error: (err) => {
this.logger.error(`failed requesting collateral for ${currency}`, err);
this.requestCollateralInBackground(currency, minCollateralRequestUnits);
},
});
}
Expand Down
4 changes: 4 additions & 0 deletions lib/lndclient/LndClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,10 @@ class LndClient extends SwapClient {
return; // we do not currently check inbound capacities for lnd
}

public setReservedInboundAmount = (_reservedInboundAmount: number) => {
return; // not currently used for lnd
}

/** Lnd specific procedure to mark the client as locked. */
private lock = () => {
if (!this.walletUnlocker) {
Expand Down
35 changes: 25 additions & 10 deletions lib/orderbook/OrderBook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ interface OrderBook {
on(event: 'peerOrder.invalidation', listener: (order: OrderPortion) => void): this;
/** Adds a listener to be called when all or part of a remote order was filled by an own order and removed */
on(event: 'peerOrder.filled', listener: (order: OrderPortion) => void): this;
/** Adds a listener to be called when all or part of a local order was swapped and removed, after it was filled and executed remotely */
/** Adds a listener to be called when all or part of a local order was swapped after being filled and executed remotely */
on(event: 'ownOrder.swapped', listener: (order: OrderPortion) => void): this;
/** Adds a listener to be called when all or part of a local order was filled by an own order and removed */
on(event: 'ownOrder.filled', listener: (order: OrderPortion) => void): this;
on(event: 'ownOrder.filled', listener: (order: OwnOrder) => void): this;
/** Adds a listener to be called when a local order was added */
on(event: 'ownOrder.added', listener: (order: OwnOrder) => void): this;
/** Adds a listener to be called when a local order was removed */
on(event: 'ownOrder.removed', listener: (order: OrderPortion) => void): this;
on(event: 'ownOrder.removed', listener: (order: OwnOrder) => void): this;

/** Notifies listeners that a remote order was added */
emit(event: 'peerOrder.incoming', order: PeerOrder): boolean;
Expand All @@ -47,11 +47,11 @@ interface OrderBook {
/** Notifies listeners that all or part of a local order was swapped and removed, after it was filled and executed remotely */
emit(event: 'ownOrder.swapped', order: OrderPortion): boolean;
/** Notifies listeners that all or part of a local order was filled by an own order and removed */
emit(event: 'ownOrder.filled', order: OrderPortion): boolean;
emit(event: 'ownOrder.filled', order: OwnOrder): boolean;
/** Notifies listeners that a local order was added */
emit(event: 'ownOrder.added', order: OwnOrder): boolean;
/** Notifies listeners that a local order was removed */
emit(event: 'ownOrder.removed', order: OrderPortion): boolean;
emit(event: 'ownOrder.removed', order: OwnOrder): boolean;
}

/**
Expand Down Expand Up @@ -119,6 +119,22 @@ class OrderBook extends EventEmitter {

this.repository = new OrderBookRepository(models);

const onOrderRemoved = (order: OwnOrder) => {
const { inboundCurrency, outboundCurrency, inboundAmount, outboundAmount } =
Swaps.calculateInboundOutboundAmounts(order.quantity, order.price, order.isBuy, order.pairId);
this.swaps.swapClientManager.subtractInboundReservedAmount(inboundCurrency, inboundAmount);
this.swaps.swapClientManager.subtractOutboundReservedAmount(outboundCurrency, outboundAmount);
};
this.on('ownOrder.removed', onOrderRemoved);
this.on('ownOrder.filled', onOrderRemoved);

this.on('ownOrder.added', (order) => {
const { inboundCurrency, outboundCurrency, inboundAmount, outboundAmount } =
Swaps.calculateInboundOutboundAmounts(order.quantity, order.price, order.isBuy, order.pairId);
this.swaps.swapClientManager.addInboundReservedAmount(inboundCurrency, inboundAmount);
this.swaps.swapClientManager.addOutboundReservedAmount(outboundCurrency, outboundAmount);
});

this.bindPool();
this.bindSwaps();
}
Expand Down Expand Up @@ -557,22 +573,21 @@ class OrderBook extends EventEmitter {
*/
const handleMatch = async (maker: Order, taker: OwnOrder) => {
onUpdate && onUpdate({ type: PlaceOrderEventType.Match, order: maker });
const portion: OrderPortion = { id: maker.id, pairId: maker.pairId, quantity: maker.quantity };
if (isOwnOrder(maker)) {
// this is an internal match which is effectively executed immediately upon being found
this.logger.info(`internal match executed on taker ${taker.id} and maker ${maker.id} for ${maker.quantity}`);
portion.localId = maker.localId;
internalMatches.push(maker);
this.pool.broadcastOrderInvalidation(portion);
this.emit('ownOrder.filled', portion);
this.pool.broadcastOrderInvalidation(maker);
this.emit('ownOrder.filled', maker);
await this.persistTrade({
quantity: portion.quantity,
quantity: maker.quantity,
makerOrder: maker,
takerOrder: taker,
});
} else {
// this is a match with a peer order which cannot be considered executed until after a
// successful swap, which is an asynchronous process that can fail for numerous reasons
const portion: OrderPortion = { id: maker.id, pairId: maker.pairId, quantity: maker.quantity };
const alias = pubKeyToAlias(maker.peerPubKey);
this.logger.debug(`matched with peer ${maker.peerPubKey} (${alias}), executing swap on taker ${taker.id} and maker ${maker.id} for ${maker.quantity}`);
try {
Expand Down
1 change: 1 addition & 0 deletions lib/swaps/SwapClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ abstract class SwapClient extends EventEmitter {
* and throws an error if there isn't, otherwise does nothing.
*/
public abstract checkInboundCapacity(inboundAmount: number, currency?: string): void;
public abstract setReservedInboundAmount(reservedInboundAmount: number, currency?: string): void;
protected abstract updateCapacity(): Promise<void>;

public verifyConnectionWithTimeout = () => {
Expand Down
32 changes: 32 additions & 0 deletions lib/swaps/SwapClientManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import Peer from '../p2p/Peer';
import { UnitConverter } from '../utils/UnitConverter';
import errors from './errors';
import SwapClient, { ClientStatus } from './SwapClient';
import assert from 'assert';

export function isConnextClient(swapClient: SwapClient): swapClient is ConnextClient {
return (swapClient.type === SwapClientType.Connext);
Expand Down Expand Up @@ -44,6 +45,11 @@ class SwapClientManager extends EventEmitter {
public misconfiguredClients = new Set<SwapClient>();
private walletPassword?: string;

/** A map of supported currency tickers to the inbound amount that is reserved by existing orders. */
private inboundReservedAmounts = new Map<string, number>();
/** A map of supported currency tickers to the outbound amount that is reserved by existing orders. */
private outboundReservedAmounts = new Map<string, number>();

constructor(
private config: Config,
private loggers: Loggers,
Expand Down Expand Up @@ -157,6 +163,32 @@ class SwapClientManager extends EventEmitter {
}
}

public addOutboundReservedAmount = (currency: string, amount: number) => {
const outboundReservedAmount = this.outboundReservedAmounts.get(currency);
const newOutboundReservedAmount = (outboundReservedAmount ?? 0) + amount;
this.outboundReservedAmounts.set(currency, newOutboundReservedAmount);
}

public addInboundReservedAmount = (currency: string, amount: number) => {
const inboundReservedAmount = this.inboundReservedAmounts.get(currency);
const newInboundReservedAmount = (inboundReservedAmount ?? 0) + amount;
this.inboundReservedAmounts.set(currency, newInboundReservedAmount);

this.swapClients.get(currency)?.setReservedInboundAmount(newInboundReservedAmount, currency);
}

public subtractOutboundReservedAmount = (currency: string, amount: number) => {
const outboundReservedAmount = this.outboundReservedAmounts.get(currency);
assert(outboundReservedAmount && outboundReservedAmount >= amount);
this.outboundReservedAmounts.set(currency, (outboundReservedAmount ?? 0) - amount);
}

public subtractInboundReservedAmount = (currency: string, amount: number) => {
const inboundReservedAmount = this.inboundReservedAmounts.get(currency);
assert(inboundReservedAmount && inboundReservedAmount >= amount);
this.inboundReservedAmounts.set(currency, (inboundReservedAmount ?? 0) - amount);
}

/**
* Initializes wallets with seed and password.
*/
Expand Down
4 changes: 4 additions & 0 deletions test/integration/OrderBook.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ const getMockSwaps = (sandbox: sinon.SinonSandbox) => {
swaps.swapClientManager['swapClients'] = new Map<string, SwapClient>();
swaps.swapClientManager['swapClients'].set('BTC', lndBTC);
swaps.swapClientManager['swapClients'].set('LTC', lndLTC);
swaps.swapClientManager.addInboundReservedAmount = () => {};
swaps.swapClientManager.subtractInboundReservedAmount = () => {};
swaps.swapClientManager.addOutboundReservedAmount = () => {};
swaps.swapClientManager.subtractOutboundReservedAmount = () => {};
swaps.swapClientManager.get = (currency: any) => {
const client = swaps.swapClientManager['swapClients'].get(currency);
if (!client) {
Expand Down
35 changes: 35 additions & 0 deletions test/jest/Connext.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,41 @@ describe('ConnextClient', () => {
});
});

describe('setReservedInboundAmount', () => {
const amount = 50000000;
const currency = 'ETH';

beforeEach(() => {
connext['sendRequest'] = jest.fn().mockResolvedValue(undefined);
});

it('requests collateral plus 3% buffer when we have none', async () => {
connext['_maxChannelInboundAmount'].set('ETH', 0);
connext.setReservedInboundAmount(amount, currency);
expect(connext['sendRequest']).toHaveBeenCalledWith(
'/request-collateral',
'POST',
expect.objectContaining({ assetId: ETH_ASSET_ID, amount: (amount * 1.03 * 10 ** 10).toLocaleString('fullwide', { useGrouping: false }) }),
);
});

it('requests collateral plus 3% buffer when we have some collateral already', async () => {
connext['_maxChannelInboundAmount'].set('ETH', amount * 0.5);
connext.setReservedInboundAmount(amount, currency);
expect(connext['sendRequest']).toHaveBeenCalledWith(
'/request-collateral',
'POST',
expect.objectContaining({ assetId: ETH_ASSET_ID, amount: (amount * 1.03 * 10 ** 10).toLocaleString('fullwide', { useGrouping: false }) }),
);
});

it('does not request collateral when we have more than enough to cover the reserved inbound amount', async () => {
connext['_maxChannelInboundAmount'].set('ETH', amount * 2);
connext.setReservedInboundAmount(amount, currency);
expect(connext['sendRequest']).toHaveBeenCalledTimes(0);
});
});

describe('checkInboundCapacity', () => {
const quantity = 20000000;
const smallQuantity = 100;
Expand Down
Loading

0 comments on commit 0f2b841

Please sign in to comment.