Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multiple plugins hooking into db_write #4220

Merged
merged 2 commits into from
Nov 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions doc/PLUGINS.md
Original file line number Diff line number Diff line change
Expand Up @@ -897,6 +897,14 @@ to error without
committing to the database!
This is the expected way to halt and catch fire.

`db_write` is a parallel-chained hook, i.e., multiple plugins can
register it, and all of them will be invoked simultaneously without
regard for order of registration.
The hook is considered handled if all registered plugins return
`{"result": "continue"}`.
If any plugin returns anything else, `lightningd` will error without
committing to the database.

### `invoice_payment`

This hook is called whenever a valid payment for an unpaid invoice has arrived.
Expand Down
20 changes: 14 additions & 6 deletions lightningd/plugin.c
Original file line number Diff line number Diff line change
Expand Up @@ -1715,20 +1715,28 @@ void plugin_request_send(struct plugin *plugin,
req->stream = NULL;
}

void *plugin_exclusive_loop(struct plugin *plugin)
void *plugins_exclusive_loop(struct plugin **plugins)
{
void *ret;
size_t i;
bool last = false;
assert(tal_count(plugins) != 0);

io_conn_out_exclusive(plugin->stdin_conn, true);
io_conn_exclusive(plugin->stdout_conn, true);
for (i = 0; i < tal_count(plugins); ++i) {
io_conn_out_exclusive(plugins[i]->stdin_conn, true);
io_conn_exclusive(plugins[i]->stdout_conn, true);
}

/* We don't service timers here, either! */
ret = io_loop(NULL, NULL);

io_conn_out_exclusive(plugin->stdin_conn, false);
if (io_conn_exclusive(plugin->stdout_conn, false))
for (i = 0; i < tal_count(plugins); ++i) {
io_conn_out_exclusive(plugins[i]->stdin_conn, false);
last = io_conn_exclusive(plugins[i]->stdout_conn, false);
}
if (last)
fatal("Still io_exclusive after removing plugin %s?",
plugin->cmd);
plugins[tal_count(plugins) - 1]->cmd);

return ret;
}
Expand Down
10 changes: 7 additions & 3 deletions lightningd/plugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -304,12 +304,16 @@ void json_add_opt_disable_plugins(struct json_stream *response,
const struct plugins *plugins);

/**
* Used by db hooks which can't have any other I/O while talking to plugin.
* Used by db hooks which can't have any other I/O while talking to
* hooked plugins.
*
* Returns output of io_loop() (ie. whatever gets passed to io_break()
* @param plugins - a `tal`-allocated array of plugins that are the
* only ones we talk to.
*
* @return output of io_loop() (ie. whatever gets passed to io_break()
* to end exclusive loop).
*/
void *plugin_exclusive_loop(struct plugin *plugin);
void *plugins_exclusive_loop(struct plugin **plugins);

/**
* Add a directory to the plugin path to automatically load plugins.
Expand Down
94 changes: 66 additions & 28 deletions lightningd/plugin_hook.c
Original file line number Diff line number Diff line change
Expand Up @@ -294,34 +294,51 @@ bool plugin_hook_call_(struct lightningd *ld, const struct plugin_hook *hook,
* annoying, and to make it clear that it's totally synchronous. */

/* Special synchronous hook for db */
static struct plugin_hook db_write_hook = {"db_write", PLUGIN_HOOK_SINGLE, NULL,
static struct plugin_hook db_write_hook = {"db_write", PLUGIN_HOOK_CHAIN, NULL,
NULL, NULL};
AUTODATA(hooks, &db_write_hook);

/* A `db_write` for one particular plugin hook. */
struct db_write_hook_req {
struct plugin *plugin;
struct plugin_hook_request *ph_req;
size_t *num_hooks;
};

static void db_hook_response(const char *buffer, const jsmntok_t *toks,
const jsmntok_t *idtok,
struct plugin_hook_request *ph_req)
struct db_write_hook_req *dwh_req)
{
const jsmntok_t *resulttok;

resulttok = json_get_member(buffer, toks, "result");
if (!resulttok)
fatal("Plugin returned an invalid response to the db_write "
"hook: %s", buffer);
fatal("Plugin '%s' returned an invalid response to the "
"db_write hook: %s", dwh_req->plugin->cmd, buffer);

/* We expect result: { 'result' : 'continue' }.
* Anything else we abort.
*/
resulttok = json_get_member(buffer, resulttok, "result");
if (resulttok) {
if (!json_tok_streq(buffer, resulttok, "continue"))
fatal("Plugin returned failed db_write: %s.", buffer);
fatal("Plugin '%s' returned failed db_write: %s.",
dwh_req->plugin->cmd,
buffer);
} else
fatal("Plugin returned an invalid result to the db_write "
"hook: %s", buffer);
fatal("Plugin '%s' returned an invalid result to the db_write "
"hook: %s",
dwh_req->plugin->cmd,
buffer);

assert((*dwh_req->num_hooks) != 0);
--(*dwh_req->num_hooks);
/* If there are other runners, do not exit yet. */
if ((*dwh_req->num_hooks) != 0)
return;

/* We're done, exit exclusive loop. */
io_break(ph_req);
io_break(dwh_req->ph_req);
}

void plugin_hook_db_sync(struct db *db)
Expand All @@ -330,41 +347,62 @@ void plugin_hook_db_sync(struct db *db)
struct jsonrpc_request *req;
struct plugin_hook_request *ph_req;
void *ret;
struct plugin *plugin;
struct plugin **plugins;
size_t i;
size_t num_hooks;

const char **changes = db_changes(db);
if (tal_count(hook->hooks) == 0)
num_hooks = tal_count(hook->hooks);
if (num_hooks == 0)
return;

ph_req = notleak(tal(hook->hooks, struct plugin_hook_request));
/* FIXME: do IO logging for this! */
req = jsonrpc_request_start(NULL, hook->name, NULL, NULL,
db_hook_response,
ph_req);
plugins = notleak(tal_arr(NULL, struct plugin *,
num_hooks));
for (i = 0; i < num_hooks; ++i)
plugins[i] = hook->hooks[i]->plugin;

ph_req = notleak(tal(hook->hooks, struct plugin_hook_request));
ph_req->hook = hook;
ph_req->db = db;
plugin = ph_req->plugin = hook->hooks[0]->plugin;

json_add_num(req->stream, "data_version", db_data_version_get(db));

json_array_start(req->stream, "writes");
for (size_t i = 0; i < tal_count(changes); i++)
json_add_string(req->stream, NULL, changes[i]);
json_array_end(req->stream);
jsonrpc_request_end(req);

plugin_request_send(ph_req->plugin, req);
ph_req->cb_arg = &num_hooks;

for (i = 0; i < num_hooks; ++i) {
/* Create an object for this plugin. */
struct db_write_hook_req *dwh_req;
dwh_req = tal(ph_req, struct db_write_hook_req);
dwh_req->plugin = plugins[i];
dwh_req->ph_req = ph_req;
dwh_req->num_hooks = &num_hooks;

/* FIXME: do IO logging for this! */
req = jsonrpc_request_start(NULL, hook->name, NULL, NULL,
db_hook_response,
dwh_req);

json_add_num(req->stream, "data_version",
db_data_version_get(db));

json_array_start(req->stream, "writes");
for (size_t i = 0; i < tal_count(changes); i++)
json_add_string(req->stream, NULL, changes[i]);
json_array_end(req->stream);
jsonrpc_request_end(req);

plugin_request_send(plugins[i], req);
}

/* We can be called on way out of an io_loop, which is already breaking.
* That will make this immediately return; save the break value and call
* again, then hand it onwards. */
ret = plugin_exclusive_loop(plugin);
ret = plugins_exclusive_loop(plugins);
if (ret != ph_req) {
void *ret2 = plugin_exclusive_loop(plugin);
void *ret2 = plugins_exclusive_loop(plugins);
assert(ret2 == ph_req);
io_break(ret);
}
assert(num_hooks == 0);
tal_free(plugins);
tal_free(ph_req);
}

static void add_deps(const char ***arr,
Expand Down
16 changes: 16 additions & 0 deletions tests/plugins/dbdummy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#! /usr/bin/env python3
'''This plugin is a do-nothing backup plugin which just checks that we
can handle multiple backup plugins.
'''

from pyln.client import Plugin

plugin = Plugin()


@plugin.hook('db_write')
def db_write(plugin, **kwargs):
return {'result': 'continue'}


plugin.run()
26 changes: 26 additions & 0 deletions tests/test_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,32 @@ def test_db_hook(node_factory, executor):
assert [x for x in db1.iterdump()] == [x for x in db2.iterdump()]


@unittest.skipIf(os.getenv('TEST_DB_PROVIDER', 'sqlite3') != 'sqlite3', "Only sqlite3 implements the db_write_hook currently")
def test_db_hook_multiple(node_factory, executor):
"""This tests the db hook for multiple-plugin case."""
dbfile = os.path.join(node_factory.directory, "dblog.sqlite3")
l1 = node_factory.get_node(options={'plugin': os.path.join(os.getcwd(), 'tests/plugins/dblog.py'),
'important-plugin': os.path.join(os.getcwd(), 'tests/plugins/dbdummy.py'),
'dblog-file': dbfile})

# It should see the db being created, and sometime later actually get
# initted.
# This precedes startup, so needle already past
assert l1.daemon.is_in_log(r'plugin-dblog.py: deferring \d+ commands')
l1.daemon.logsearch_start = 0
l1.daemon.wait_for_log('plugin-dblog.py: replaying pre-init data:')
l1.daemon.wait_for_log('plugin-dblog.py: CREATE TABLE version \\(version INTEGER\\)')
l1.daemon.wait_for_log("plugin-dblog.py: initialized.* 'startup': True")

l1.stop()

# Databases should be identical.
db1 = sqlite3.connect(os.path.join(l1.daemon.lightning_dir, TEST_NETWORK, 'lightningd.sqlite3'))
db2 = sqlite3.connect(dbfile)

assert [x for x in db1.iterdump()] == [x for x in db2.iterdump()]


def test_utf8_passthrough(node_factory, executor):
l1 = node_factory.get_node(options={'plugin': os.path.join(os.getcwd(), 'tests/plugins/utf8.py'),
'log-level': 'io'})
Expand Down