Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: new grpc call for subscribing alerts and low balance alert (#864) #2023

Open
wants to merge 21 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions docs/api.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions lib/Logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ export enum Context {
Http = 'HTTP',
Backup = 'BACKUP',
Service = 'SERVICE',
Alerts = 'ALERTS',
}

type Loggers = {
Expand All @@ -57,6 +58,7 @@ type Loggers = {
swaps: Logger;
http: Logger;
service: Logger;
alerts: Logger;
};

class Logger {
Expand Down Expand Up @@ -134,6 +136,7 @@ class Logger {
swaps: new Logger({ ...object, context: Context.Swaps }),
http: new Logger({ ...object, context: Context.Http }),
service: new Logger({ ...object, context: Context.Service }),
alerts: new Logger({ ...object, context: Context.Alerts }),
};
};

Expand Down
5 changes: 5 additions & 0 deletions lib/Xud.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import SwapClientManager from './swaps/SwapClientManager';
import Swaps from './swaps/Swaps';
import { createSimnetChannels } from './utils/simnet-connext-channels';
import { UnitConverter } from './utils/UnitConverter';
import Alerts from './alerts/Alerts';

const { version }: { version: string } = require('../package.json');

Expand All @@ -43,6 +44,7 @@ class Xud extends EventEmitter {
private shuttingDown = false;
private swapClientManager?: SwapClientManager;
private simnetChannels$?: Subscription;
private alerts!: Alerts;

/**
* Create an Exchange Union daemon.
Expand Down Expand Up @@ -198,6 +200,8 @@ class Xud extends EventEmitter {
// initialize pool and start listening/connecting only once other components are initialized
await this.pool.init();

this.alerts = new Alerts({ swapClientManager: this.swapClientManager, logger: loggers.alerts });

this.service = new Service({
version,
nodeKey,
Expand All @@ -207,6 +211,7 @@ class Xud extends EventEmitter {
swaps: this.swaps,
logger: loggers.service,
shutdown: this.beginShutdown,
alerts: this.alerts,
});

this.service.on('logLevel', (level) => {
Expand Down
70 changes: 70 additions & 0 deletions lib/alerts/Alerts.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import { EventEmitter } from 'events';
import { satsToCoinsStr } from '../cli/utils';
import { AlertType, ChannelSide } from '../constants/enums';
import Logger from '../Logger';
import SwapClientManager from '../swaps/SwapClientManager';
import { Alert, BalanceAlertEvent } from './types';

interface Alerts {
on(event: 'alert', listener: (alert: Alert) => void): this;
emit(event: 'alert', alert: Alert): boolean;
}

// TODO this class still requires a cleanup if alert is not being thrown anymore after a while
/**
* This class works as a middleware for thrown alerts from xud's main flow. Each alert will be caught here
* and re-thrown if last thrown time was before the minimum threshold that set in consts.ts
*/
class Alerts extends EventEmitter {
/** The minimum time in miliseconds to be passed to rethrow a balance alert. */
private static readonly MIN_BALANCE_ALERT_THRESHOLD_IN_MS = 10000;
private alerts = new Map<string, number>();
private logger: Logger;

constructor({ swapClientManager, logger }: { swapClientManager: SwapClientManager; logger: Logger }) {
super();
this.logger = logger;
this.listenLowTradingBalanceAlerts(swapClientManager);
}

private listenLowTradingBalanceAlerts(swapClientManager: SwapClientManager) {
const lndClients = swapClientManager.getLndClientsMap().values();
for (const lndClient of lndClients) {
lndClient.on('lowTradingBalance', this.onLowTradingBalance);
}
swapClientManager.connextClient?.on('lowTradingBalance', this.onLowTradingBalance);
}

private onLowTradingBalance = (balanceAlertEvent: BalanceAlertEvent) => {
// TODO don't use JSON.stringify instead find a way to define unique ids per alert and keep in the map to avoid memory issues
const stringRepresentation = JSON.stringify(balanceAlertEvent);
this.logger.trace(`received low trading balance alert ${stringRepresentation}`);
if (this.alerts.get(stringRepresentation) === undefined || this.checkAlertThreshold(stringRepresentation)) {
this.logger.trace(`triggering low balance alert ${stringRepresentation}`);

const message = `${ChannelSide[balanceAlertEvent.side || 0]} trading balance (${satsToCoinsStr(
balanceAlertEvent.sideBalance || 0,
)} ${balanceAlertEvent.currency}) is lower than 10% of trading capacity (${satsToCoinsStr(
balanceAlertEvent.totalBalance || 0,
)} ${balanceAlertEvent.currency})`;

const balanceAlert = {
...balanceAlertEvent,
message,
type: AlertType.LowTradingBalance,
date: Date.now(),
};

this.alerts.set(stringRepresentation, balanceAlert.date);
this.emit('alert', balanceAlert);
}
};

private checkAlertThreshold(stringRepresentation: string) {
const lastThrownTime = this.alerts.get(stringRepresentation) || 0;
const passedTime = Date.now() - lastThrownTime;
return passedTime > Alerts.MIN_BALANCE_ALERT_THRESHOLD_IN_MS;
}
}

export default Alerts;
23 changes: 23 additions & 0 deletions lib/alerts/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { AlertType, ChannelSide } from '../constants/enums';

type BaseAlert = {
type: AlertType;
message: string;
date: number;
};

export type BalanceAlertEvent = {
/** The total balance of the channel when the alert is triggered. */
totalBalance: number;
/** The side of the balance either local or remote. */
side: ChannelSide;
/** The balance that triggered the alert. */
sideBalance: number;
/** The alert threshold in percentage, e.g. 10 means %10. */
bound: number;
/** The currency of the channel. */
currency: string;
};
export type BalanceAlert = BaseAlert & BalanceAlertEvent;

export type Alert = BalanceAlert;
84 changes: 84 additions & 0 deletions lib/cli/commands/streamalerts.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import { Arguments, Argv } from 'yargs';
import moment from 'moment';
import { XudClient } from '../../proto/xudrpc_grpc_pb';
import * as xudrpc from '../../proto/xudrpc_pb';
import { loadXudClient } from '../command';
import { AlertType, ChannelSide } from '../../constants/enums';
import { onStreamError, waitForClient } from '../utils';

export const command = 'streamalerts';

export const describe = 'stream alert notifications from xud';

export const builder = (argv: Argv) =>
argv
.option('pretty', { type: 'boolean' })
.example('$0 streamalerts -j', 'prints alert payload in a JSON structure')
.example('$0 streamalerts', 'prints alert message only');

export const handler = async (argv: Arguments) => {
await ensureConnection(argv, true);
};

let client: XudClient;

const ensureConnection = async (argv: Arguments, printError?: boolean) => {
if (!client) {
client = await loadXudClient(argv);
}

waitForClient(client, argv, ensureConnection, streamalerts, printError);
};

const structAlertJson = (alertObject: xudrpc.Alert.AsObject) => {
const result: {
type: string;
date: number;
payload:
| {
totalBalance?: number;
side?: string;
bound?: number;
sideBalance?: number;
channelPoint?: string;
currency?: string;
}
| undefined;
} = {
type: AlertType[alertObject.type],
date: alertObject.date,
payload: undefined,
};

if (alertObject.type === xudrpc.Alert.AlertType.LOW_TRADING_BALANCE) {
result.payload = {
totalBalance: alertObject.balanceAlert?.totalBalance,
side: ChannelSide[alertObject.balanceAlert?.side || 0],
sideBalance: alertObject.balanceAlert?.sideBalance,
bound: alertObject.balanceAlert?.bound,
currency: alertObject.balanceAlert?.currency,
};
}

return result;
};

const streamalerts = (argv: Arguments<any>) => {
const request = new xudrpc.SubscribeAlertsRequest();
const alertsSubscription = client.subscribeAlerts(request);

alertsSubscription.on('data', (alert: xudrpc.Alert) => {
if (argv.json) {
console.log(JSON.stringify(structAlertJson(alert.toObject()), undefined, 2));
} else {
console.log(`(${moment(alert.getDate())}) ${AlertType[alert.getType()]}: ${alert.getMessage()}`);
}
});
alertsSubscription.on('end', reconnect.bind(undefined, argv));
alertsSubscription.on('error', onStreamError.bind(undefined, ensureConnection.bind(undefined, argv)));
};

const reconnect = async (argv: Arguments) => {
console.log('Stream has closed, trying to reconnect');
await ensureConnection(argv, false);
};
Loading