Skip to content

Commit

Permalink
feat: new grpc call for subscribing alerts and low balance alert (#864)
Browse files Browse the repository at this point in the history
  • Loading branch information
rsercano committed Dec 5, 2020
1 parent 405f50e commit e006009
Show file tree
Hide file tree
Showing 27 changed files with 2,168 additions and 378 deletions.
75 changes: 75 additions & 0 deletions docs/api.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions lib/Logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ export enum Context {
Http = 'HTTP',
Backup = 'BACKUP',
Service = 'SERVICE',
Alerts = 'ALERTS',
}

type Loggers = {
Expand All @@ -58,6 +59,7 @@ type Loggers = {
swaps: Logger,
http: Logger,
service: Logger,
alerts: Logger,
};

class Logger {
Expand Down Expand Up @@ -133,6 +135,7 @@ class Logger {
swaps: new Logger({ ...object, context: Context.Swaps }),
http: new Logger({ ...object, context: Context.Http }),
service: new Logger({ ...object, context: Context.Service }),
alerts: new Logger({ ...object, context: Context.Alerts }),
};
}

Expand Down
5 changes: 5 additions & 0 deletions lib/Xud.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import SwapClientManager from './swaps/SwapClientManager';
import Swaps from './swaps/Swaps';
import { createSimnetChannels } from './utils/simnet-connext-channels';
import { UnitConverter } from './utils/UnitConverter';
import Alerts from './alerts/Alerts';

const version: string = require('../package.json').version;

Expand All @@ -46,6 +47,7 @@ class Xud extends EventEmitter {
private swapClientManager?: SwapClientManager;
private unitConverter?: UnitConverter;
private simnetChannels$?: Subscription;
private alerts!: Alerts;

/**
* Create an Exchange Union daemon.
Expand Down Expand Up @@ -203,6 +205,8 @@ class Xud extends EventEmitter {
// initialize pool and start listening/connecting only once other components are initialized
await this.pool.init();

this.alerts = new Alerts({ swapClientManager: this.swapClientManager, logger: loggers.alerts });

this.service = new Service({
version,
nodeKey,
Expand All @@ -212,6 +216,7 @@ class Xud extends EventEmitter {
swaps: this.swaps,
logger: loggers.service,
shutdown: this.beginShutdown,
alerts: this.alerts,
});

this.service.on('logLevel', (level) => {
Expand Down
58 changes: 58 additions & 0 deletions lib/alerts/Alerts.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import { EventEmitter } from 'events';
import { BalanceAlert } from './types';
import SwapClientManager from '../swaps/SwapClientManager';
import { MIN_BALANCE_ALERT_THRESHOLD_IN_MS } from './consts';
import Logger from '../Logger';
import { AlertType, ChannelSide } from '../constants/enums';
import { satsToCoinsStr } from '../cli/utils';

interface Alerts {
on(event: 'alert', listener: (alert: any) => void): this;
emit(event: 'alert', alert: any): boolean;
}

// TODO this class still requires a cleanup if alert is not being thrown anymore after a while
/**
* This class works as a middleware for thrown alerts from xud's main flow. Each alert will be caught here
* and re-thrown if last thrown time was before the minimum threshold that set in consts.ts
*/
class Alerts extends EventEmitter {
private alerts = new Map<string, number>();
private logger: Logger;

constructor({ swapClientManager, logger }: {swapClientManager: SwapClientManager, logger: Logger}) {
super();
this.logger = logger;
this.listenLowTradingBalanceAlerts(swapClientManager);
}

private listenLowTradingBalanceAlerts(swapClientManager: SwapClientManager) {
const lndClients = swapClientManager.getLndClientsMap().values();
for (const lndClient of lndClients) {
lndClient.on('lowTradingBalance', this.onLowTradingBalance);
}
swapClientManager.connextClient?.on('lowTradingBalance', this.onLowTradingBalance);
}

private onLowTradingBalance = (balanceAlert: BalanceAlert) => {
const stringRepresentation = JSON.stringify(balanceAlert);
this.logger.trace(`received low trading balance alert ${stringRepresentation}`);
if (this.alerts.get(stringRepresentation) === undefined || this.checkAlertThreshold(stringRepresentation)) {
this.logger.trace(`triggering low balance alert ${stringRepresentation}`);

balanceAlert.message = `${ChannelSide[balanceAlert.side || 0]} trading balance (${satsToCoinsStr(balanceAlert.sideBalance || 0)} ${balanceAlert.currency}) is lower than 10% of trading capacity (${satsToCoinsStr(balanceAlert.totalBalance || 0)} ${balanceAlert.currency})`;
balanceAlert.type = AlertType.LowTradingBalance;

this.alerts.set(stringRepresentation, Date.now());
this.emit('alert', balanceAlert);
}
}

private checkAlertThreshold(stringRepresentation: string) {
const lastThrownTime = this.alerts.get(stringRepresentation) || 0;
const passedTime = Date.now() - lastThrownTime;
return passedTime > MIN_BALANCE_ALERT_THRESHOLD_IN_MS;
}
}

export default Alerts;
2 changes: 2 additions & 0 deletions lib/alerts/consts.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
/** The minimum time in miliseconds to be passed to rethrow a balance alert. */
export const MIN_BALANCE_ALERT_THRESHOLD_IN_MS = 10000;
19 changes: 19 additions & 0 deletions lib/alerts/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { AlertType, ChannelSide } from '../constants/enums';

export type BalanceAlert = Alert & {
/** The total balance of the channel when the alert is triggered. */
totalBalance: number;
/** The side of the balance either local or remote. */
side: ChannelSide;
/** The balance that triggered the alert. */
sideBalance: number;
/** The alert threshold in percentage, e.g. 10 means %10. */
bound: number;
/** The currency of the channel. */
currency: string;
};

export type Alert = {
type: AlertType;
message: string;
};
78 changes: 78 additions & 0 deletions lib/cli/commands/streamalerts.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import { Arguments, Argv } from 'yargs';
import { XudClient } from '../../proto/xudrpc_grpc_pb';
import * as xudrpc from '../../proto/xudrpc_pb';
import { loadXudClient } from '../command';
import { AlertType, ChannelSide } from '../../constants/enums';
import { onStreamError, waitForClient } from '../utils';
import moment from 'moment';

export const command = 'streamalerts';

export const describe = 'stream alert notifications from xud';

export const builder = (argv: Argv) => argv
.option('pretty', {
type: 'boolean',
})
.example('$0 streamalerts -j', 'prints alert payload in a JSON structure')
.example('$0 streamalerts', 'prints alert message only');

export const handler = async (argv: Arguments) => {
await ensureConnection(argv, true);
};

let client: XudClient;

const ensureConnection = async (argv: Arguments, printError?: boolean) => {
if (!client) {
client = await loadXudClient(argv);
}

waitForClient(client, argv, ensureConnection, streamalerts, printError);
};

const structAlertJson = (alertObject: xudrpc.Alert.AsObject) => {
const result: {type: string, payload: {
totalBalance?: number,
side?: string,
bound?: number,
sideBalance?: number,
channelPoint?: string,
currency?: string,
} | undefined } = {
type: AlertType[alertObject.type],
payload: undefined,
};

if (alertObject.type === xudrpc.Alert.AlertType.LOW_TRADING_BALANCE) {
result.payload = {
totalBalance: alertObject.balanceAlert?.totalBalance,
side: ChannelSide[alertObject.balanceAlert?.side || 0],
sideBalance: alertObject.balanceAlert?.sideBalance,
bound: alertObject.balanceAlert?.bound,
currency: alertObject.balanceAlert?.currency,
};
}

return result;
};

const streamalerts = (argv: Arguments<any>) => {
const request = new xudrpc.SubscribeAlertsRequest();
const alertsSubscription = client.subscribeAlerts(request);

alertsSubscription.on('data', (alert: xudrpc.Alert) => {
if (argv.json) {
console.log(JSON.stringify(structAlertJson(alert.toObject()), undefined, 2));
} else {
console.log(`(${moment()}) ${AlertType[alert.getType()]}: ${alert.getMessage()}`);
}
});
alertsSubscription.on('end', reconnect.bind(undefined, argv));
alertsSubscription.on('error', onStreamError.bind(undefined, ensureConnection.bind(undefined, argv)));
};

const reconnect = async (argv: Arguments) => {
console.log('Stream has closed, trying to reconnect');
await ensureConnection(argv, false);
};
27 changes: 3 additions & 24 deletions lib/cli/commands/streamorders.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import { ServiceError, status } from 'grpc';
import { Arguments, Argv } from 'yargs';
import { XudClient } from '../../proto/xudrpc_grpc_pb';
import * as xudrpc from '../../proto/xudrpc_pb';
import { setTimeoutPromise } from '../../utils/utils';
import { loadXudClient } from '../command';
import { onStreamError, waitForClient } from '../utils';

export const command = 'streamorders [existing]';

Expand All @@ -26,20 +25,8 @@ const ensureConnection = async (argv: Arguments, printError?: boolean) => {
if (!client) {
client = await loadXudClient(argv);
}
client.waitForReady(Date.now() + 3000, (error: Error | null) => {
if (error) {
if (error.message === 'Failed to connect before the deadline') {
console.error(`could not connect to xud at ${argv.rpchost}:${argv.rpcport}, is xud running?`);
process.exit(1);
}

if (printError) console.error(`${error.name}: ${error.message}`);
setTimeout(ensureConnection.bind(undefined, argv, printError), 3000);
} else {
console.log('Successfully connected, subscribing for orders');
streamOrders(argv);
}
});
waitForClient(client, argv, ensureConnection, streamOrders, printError);
};

const streamOrders = (argv: Arguments<any>) => {
Expand All @@ -57,15 +44,7 @@ const streamOrders = (argv: Arguments<any>) => {
// adding end, close, error events only once,
// since they'll be thrown for three of subscriptions in the corresponding cases, catching once is enough.
ordersSubscription.on('end', reconnect.bind(undefined, argv));
ordersSubscription.on('error', async (err: ServiceError) => {
if (err.code === status.UNIMPLEMENTED) {
console.error("xud is locked, run 'xucli unlock', 'xucli create', or 'xucli restore' then try again");
process.exit(1);
}
console.warn(`Unexpected error occured: ${err.message}, reconnecting in 1 second`);
await setTimeoutPromise(1000);
await ensureConnection(argv);
});
ordersSubscription.on('error', onStreamError.bind(undefined, ensureConnection.bind(undefined, argv)));

const swapsRequest = new xudrpc.SubscribeSwapsRequest();
swapsRequest.setIncludeTaker(true);
Expand Down
Loading

0 comments on commit e006009

Please sign in to comment.