Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: new grpc call for subscribing alerts and low balance alert (#864) #2023

Open
wants to merge 21 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 75 additions & 2 deletions docs/api.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions lib/Logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ export enum Context {
Http = 'HTTP',
Backup = 'BACKUP',
Service = 'SERVICE',
Alerts = 'ALERTS',
}

type Loggers = {
Expand All @@ -58,6 +59,7 @@ type Loggers = {
swaps: Logger,
http: Logger,
service: Logger,
alerts: Logger,
};

class Logger {
Expand Down Expand Up @@ -133,6 +135,7 @@ class Logger {
swaps: new Logger({ ...object, context: Context.Swaps }),
http: new Logger({ ...object, context: Context.Http }),
service: new Logger({ ...object, context: Context.Service }),
alerts: new Logger({ ...object, context: Context.Alerts }),
};
}

Expand Down
5 changes: 5 additions & 0 deletions lib/Xud.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import SwapClientManager from './swaps/SwapClientManager';
import Swaps from './swaps/Swaps';
import { createSimnetChannels } from './utils/simnet-connext-channels';
import { UnitConverter } from './utils/UnitConverter';
import Alerts from './alerts/Alerts';

const version: string = require('../package.json').version;

Expand All @@ -46,6 +47,7 @@ class Xud extends EventEmitter {
private swapClientManager?: SwapClientManager;
private unitConverter?: UnitConverter;
private simnetChannels$?: Subscription;
private alerts!: Alerts;

/**
* Create an Exchange Union daemon.
Expand Down Expand Up @@ -203,6 +205,8 @@ class Xud extends EventEmitter {
// initialize pool and start listening/connecting only once other components are initialized
await this.pool.init();

this.alerts = new Alerts({ swapClientManager: this.swapClientManager, logger: loggers.alerts });

this.service = new Service({
version,
nodeKey,
Expand All @@ -212,6 +216,7 @@ class Xud extends EventEmitter {
swaps: this.swaps,
logger: loggers.service,
shutdown: this.beginShutdown,
alerts: this.alerts,
});

this.service.on('logLevel', (level) => {
Expand Down
58 changes: 58 additions & 0 deletions lib/alerts/Alerts.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import { EventEmitter } from 'events';
import { BalanceAlert } from './types';
import SwapClientManager from '../swaps/SwapClientManager';
import { MIN_BALANCE_ALERT_THRESHOLD_IN_MS } from './consts';
import Logger from '../Logger';
import { AlertType, ChannelSide } from '../constants/enums';
import { satsToCoinsStr } from '../cli/utils';

interface Alerts {
on(event: 'alert', listener: (alert: any) => void): this;
rsercano marked this conversation as resolved.
Show resolved Hide resolved
emit(event: 'alert', alert: any): boolean;
}

// TODO this class still requires a cleanup if alert is not being thrown anymore after a while
/**
* This class works as a middleware for thrown alerts from xud's main flow. Each alert will be caught here
* and re-thrown if last thrown time was before the minimum threshold that set in consts.ts
*/
class Alerts extends EventEmitter {
private alerts = new Map<string, number>();
private logger: Logger;

constructor({ swapClientManager, logger }: {swapClientManager: SwapClientManager, logger: Logger}) {
super();
this.logger = logger;
this.listenLowTradingBalanceAlerts(swapClientManager);
}

private listenLowTradingBalanceAlerts(swapClientManager: SwapClientManager) {
const lndClients = swapClientManager.getLndClientsMap().values();
for (const lndClient of lndClients) {
lndClient.on('lowTradingBalance', this.onLowTradingBalance);
}
swapClientManager.connextClient?.on('lowTradingBalance', this.onLowTradingBalance);
}

private onLowTradingBalance = (balanceAlert: BalanceAlert) => {
const stringRepresentation = JSON.stringify(balanceAlert);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I see the desire to convert to a string here, but I'm concernedthis will cause some issues using plain old JSON.stringify, which isn't guaranteed to give the same string for equivalent objects. It's also a bit expensive of a procedure, and uses up more memory in the map.

Doesn't have to be addressed in this PR since it's not that simple, but maybe it can be addressed at the same time as cleanup. I'm thinking it'd make sense to add a timestamp property to the Alert object, and then convert alert's to a string identifier like [type]-[channel_id] rather than converting the entire object to a json string.

For now I think add a TODO here and it's ok with me.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you're right, alerts already have timestamps attached within BaseAlert, adding TODO there, but I guess we should prepare cleanup as soon as possible to avoid memory issues.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe even within this PR, wdyt? @sangaman

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Up to you, it's not a trivial effort.

this.logger.trace(`received low trading balance alert ${stringRepresentation}`);
if (this.alerts.get(stringRepresentation) === undefined || this.checkAlertThreshold(stringRepresentation)) {
this.logger.trace(`triggering low balance alert ${stringRepresentation}`);

balanceAlert.message = `${ChannelSide[balanceAlert.side || 0]} trading balance (${satsToCoinsStr(balanceAlert.sideBalance || 0)} ${balanceAlert.currency}) is lower than 10% of trading capacity (${satsToCoinsStr(balanceAlert.totalBalance || 0)} ${balanceAlert.currency})`;
balanceAlert.type = AlertType.LowTradingBalance;

this.alerts.set(stringRepresentation, Date.now());
this.emit('alert', balanceAlert);
}
}

private checkAlertThreshold(stringRepresentation: string) {
const lastThrownTime = this.alerts.get(stringRepresentation) || 0;
const passedTime = Date.now() - lastThrownTime;
return passedTime > MIN_BALANCE_ALERT_THRESHOLD_IN_MS;
}
}

export default Alerts;
2 changes: 2 additions & 0 deletions lib/alerts/consts.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
/** The minimum time in miliseconds to be passed to rethrow a balance alert. */
export const MIN_BALANCE_ALERT_THRESHOLD_IN_MS = 10000;
rsercano marked this conversation as resolved.
Show resolved Hide resolved
19 changes: 19 additions & 0 deletions lib/alerts/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { AlertType, ChannelSide } from '../constants/enums';

export type BalanceAlert = Alert & {
/** The total balance of the channel when the alert is triggered. */
totalBalance: number;
/** The side of the balance either local or remote. */
side: ChannelSide;
/** The balance that triggered the alert. */
sideBalance: number;
/** The alert threshold in percentage, e.g. 10 means %10. */
bound: number;
/** The currency of the channel. */
currency: string;
};

export type Alert = {
type: AlertType;
message: string;
};
78 changes: 78 additions & 0 deletions lib/cli/commands/streamalerts.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import { Arguments, Argv } from 'yargs';
import { XudClient } from '../../proto/xudrpc_grpc_pb';
import * as xudrpc from '../../proto/xudrpc_pb';
import { loadXudClient } from '../command';
import { AlertType, ChannelSide } from '../../constants/enums';
import { onStreamError, waitForClient } from '../utils';
import moment from 'moment';

export const command = 'streamalerts';

export const describe = 'stream alert notifications from xud';

export const builder = (argv: Argv) => argv
.option('pretty', {
type: 'boolean',
})
.example('$0 streamalerts -j', 'prints alert payload in a JSON structure')
.example('$0 streamalerts', 'prints alert message only');

export const handler = async (argv: Arguments) => {
await ensureConnection(argv, true);
};

let client: XudClient;

const ensureConnection = async (argv: Arguments, printError?: boolean) => {
if (!client) {
client = await loadXudClient(argv);
}

waitForClient(client, argv, ensureConnection, streamalerts, printError);
};

const structAlertJson = (alertObject: xudrpc.Alert.AsObject) => {
const result: {type: string, payload: {
totalBalance?: number,
side?: string,
bound?: number,
sideBalance?: number,
channelPoint?: string,
currency?: string,
} | undefined } = {
type: AlertType[alertObject.type],
payload: undefined,
};

if (alertObject.type === xudrpc.Alert.AlertType.LOW_TRADING_BALANCE) {
result.payload = {
totalBalance: alertObject.balanceAlert?.totalBalance,
side: ChannelSide[alertObject.balanceAlert?.side || 0],
sideBalance: alertObject.balanceAlert?.sideBalance,
bound: alertObject.balanceAlert?.bound,
currency: alertObject.balanceAlert?.currency,
};
}

return result;
};

const streamalerts = (argv: Arguments<any>) => {
const request = new xudrpc.SubscribeAlertsRequest();
const alertsSubscription = client.subscribeAlerts(request);

alertsSubscription.on('data', (alert: xudrpc.Alert) => {
if (argv.json) {
console.log(JSON.stringify(structAlertJson(alert.toObject()), undefined, 2));
} else {
console.log(`(${moment()}) ${AlertType[alert.getType()]}: ${alert.getMessage()}`);
}
});
alertsSubscription.on('end', reconnect.bind(undefined, argv));
alertsSubscription.on('error', onStreamError.bind(undefined, ensureConnection.bind(undefined, argv)));
};

const reconnect = async (argv: Arguments) => {
console.log('Stream has closed, trying to reconnect');
await ensureConnection(argv, false);
};
Loading