From ee0c13eefe442de69c27a23f7cd557b4970c532d Mon Sep 17 00:00:00 2001 From: ZmnSCPxj jxPCSnmZ Date: Mon, 23 Nov 2020 11:46:13 +0800 Subject: [PATCH 1/2] lightningd/plugin.c: Make plugin-exclusive loop support multiple plugins. --- lightningd/plugin.c | 20 ++++++++++++++------ lightningd/plugin.h | 10 +++++++--- lightningd/plugin_hook.c | 15 +++++++++++---- 3 files changed, 32 insertions(+), 13 deletions(-) diff --git a/lightningd/plugin.c b/lightningd/plugin.c index 75d12a8a299e..f28ab277efc2 100644 --- a/lightningd/plugin.c +++ b/lightningd/plugin.c @@ -1715,20 +1715,28 @@ void plugin_request_send(struct plugin *plugin, req->stream = NULL; } -void *plugin_exclusive_loop(struct plugin *plugin) +void *plugins_exclusive_loop(struct plugin **plugins) { void *ret; + size_t i; + bool last = false; + assert(tal_count(plugins) != 0); - io_conn_out_exclusive(plugin->stdin_conn, true); - io_conn_exclusive(plugin->stdout_conn, true); + for (i = 0; i < tal_count(plugins); ++i) { + io_conn_out_exclusive(plugins[i]->stdin_conn, true); + io_conn_exclusive(plugins[i]->stdout_conn, true); + } /* We don't service timers here, either! */ ret = io_loop(NULL, NULL); - io_conn_out_exclusive(plugin->stdin_conn, false); - if (io_conn_exclusive(plugin->stdout_conn, false)) + for (i = 0; i < tal_count(plugins); ++i) { + io_conn_out_exclusive(plugins[i]->stdin_conn, false); + last = io_conn_exclusive(plugins[i]->stdout_conn, false); + } + if (last) fatal("Still io_exclusive after removing plugin %s?", - plugin->cmd); + plugins[tal_count(plugins) - 1]->cmd); return ret; } diff --git a/lightningd/plugin.h b/lightningd/plugin.h index 070c0184ad8e..6f9c7ab667ea 100644 --- a/lightningd/plugin.h +++ b/lightningd/plugin.h @@ -304,12 +304,16 @@ void json_add_opt_disable_plugins(struct json_stream *response, const struct plugins *plugins); /** - * Used by db hooks which can't have any other I/O while talking to plugin. + * Used by db hooks which can't have any other I/O while talking to + * hooked plugins. * - * Returns output of io_loop() (ie. whatever gets passed to io_break() + * @param plugins - a `tal`-allocated array of plugins that are the + * only ones we talk to. + * + * @return output of io_loop() (ie. whatever gets passed to io_break() * to end exclusive loop). */ -void *plugin_exclusive_loop(struct plugin *plugin); +void *plugins_exclusive_loop(struct plugin **plugins); /** * Add a directory to the plugin path to automatically load plugins. diff --git a/lightningd/plugin_hook.c b/lightningd/plugin_hook.c index 321cb75fec53..9413fa3047b8 100644 --- a/lightningd/plugin_hook.c +++ b/lightningd/plugin_hook.c @@ -330,12 +330,18 @@ void plugin_hook_db_sync(struct db *db) struct jsonrpc_request *req; struct plugin_hook_request *ph_req; void *ret; - struct plugin *plugin; + struct plugin **plugins; + size_t i; const char **changes = db_changes(db); if (tal_count(hook->hooks) == 0) return; + plugins = notleak(tal_arr(NULL, struct plugin *, + tal_count(hook->hooks))); + for (i = 0; i < tal_count(hook->hooks); ++i) + plugins[i] = hook->hooks[i]->plugin; + ph_req = notleak(tal(hook->hooks, struct plugin_hook_request)); /* FIXME: do IO logging for this! */ req = jsonrpc_request_start(NULL, hook->name, NULL, NULL, @@ -344,7 +350,7 @@ void plugin_hook_db_sync(struct db *db) ph_req->hook = hook; ph_req->db = db; - plugin = ph_req->plugin = hook->hooks[0]->plugin; + ph_req->plugin = hook->hooks[0]->plugin; json_add_num(req->stream, "data_version", db_data_version_get(db)); @@ -359,12 +365,13 @@ void plugin_hook_db_sync(struct db *db) /* We can be called on way out of an io_loop, which is already breaking. * That will make this immediately return; save the break value and call * again, then hand it onwards. */ - ret = plugin_exclusive_loop(plugin); + ret = plugins_exclusive_loop(plugins); if (ret != ph_req) { - void *ret2 = plugin_exclusive_loop(plugin); + void *ret2 = plugins_exclusive_loop(plugins); assert(ret2 == ph_req); io_break(ret); } + tal_free(plugins); } static void add_deps(const char ***arr, From 426df0dd28463387af59b48c43ca65a2c28b43b5 Mon Sep 17 00:00:00 2001 From: ZmnSCPxj jxPCSnmZ Date: Mon, 23 Nov 2020 13:05:56 +0800 Subject: [PATCH 2/2] lightningd/plugin_hook.c: Make `db_write` a chained hook. Fixes: #4219 Changelog-Changed: Plugins: Multiple plugins can now register `db_write` hooks. --- doc/PLUGINS.md | 8 ++++ lightningd/plugin_hook.c | 85 +++++++++++++++++++++++++++------------- tests/plugins/dbdummy.py | 16 ++++++++ tests/test_plugin.py | 26 ++++++++++++ 4 files changed, 108 insertions(+), 27 deletions(-) create mode 100755 tests/plugins/dbdummy.py diff --git a/doc/PLUGINS.md b/doc/PLUGINS.md index 78f01041a65e..9fe56a48262b 100644 --- a/doc/PLUGINS.md +++ b/doc/PLUGINS.md @@ -897,6 +897,14 @@ to error without committing to the database! This is the expected way to halt and catch fire. +`db_write` is a parallel-chained hook, i.e., multiple plugins can +register it, and all of them will be invoked simultaneously without +regard for order of registration. +The hook is considered handled if all registered plugins return +`{"result": "continue"}`. +If any plugin returns anything else, `lightningd` will error without +committing to the database. + ### `invoice_payment` This hook is called whenever a valid payment for an unpaid invoice has arrived. diff --git a/lightningd/plugin_hook.c b/lightningd/plugin_hook.c index 9413fa3047b8..89289450cd9c 100644 --- a/lightningd/plugin_hook.c +++ b/lightningd/plugin_hook.c @@ -294,20 +294,27 @@ bool plugin_hook_call_(struct lightningd *ld, const struct plugin_hook *hook, * annoying, and to make it clear that it's totally synchronous. */ /* Special synchronous hook for db */ -static struct plugin_hook db_write_hook = {"db_write", PLUGIN_HOOK_SINGLE, NULL, +static struct plugin_hook db_write_hook = {"db_write", PLUGIN_HOOK_CHAIN, NULL, NULL, NULL}; AUTODATA(hooks, &db_write_hook); +/* A `db_write` for one particular plugin hook. */ +struct db_write_hook_req { + struct plugin *plugin; + struct plugin_hook_request *ph_req; + size_t *num_hooks; +}; + static void db_hook_response(const char *buffer, const jsmntok_t *toks, const jsmntok_t *idtok, - struct plugin_hook_request *ph_req) + struct db_write_hook_req *dwh_req) { const jsmntok_t *resulttok; resulttok = json_get_member(buffer, toks, "result"); if (!resulttok) - fatal("Plugin returned an invalid response to the db_write " - "hook: %s", buffer); + fatal("Plugin '%s' returned an invalid response to the " + "db_write hook: %s", dwh_req->plugin->cmd, buffer); /* We expect result: { 'result' : 'continue' }. * Anything else we abort. @@ -315,13 +322,23 @@ static void db_hook_response(const char *buffer, const jsmntok_t *toks, resulttok = json_get_member(buffer, resulttok, "result"); if (resulttok) { if (!json_tok_streq(buffer, resulttok, "continue")) - fatal("Plugin returned failed db_write: %s.", buffer); + fatal("Plugin '%s' returned failed db_write: %s.", + dwh_req->plugin->cmd, + buffer); } else - fatal("Plugin returned an invalid result to the db_write " - "hook: %s", buffer); + fatal("Plugin '%s' returned an invalid result to the db_write " + "hook: %s", + dwh_req->plugin->cmd, + buffer); + + assert((*dwh_req->num_hooks) != 0); + --(*dwh_req->num_hooks); + /* If there are other runners, do not exit yet. */ + if ((*dwh_req->num_hooks) != 0) + return; /* We're done, exit exclusive loop. */ - io_break(ph_req); + io_break(dwh_req->ph_req); } void plugin_hook_db_sync(struct db *db) @@ -332,35 +349,47 @@ void plugin_hook_db_sync(struct db *db) void *ret; struct plugin **plugins; size_t i; + size_t num_hooks; const char **changes = db_changes(db); - if (tal_count(hook->hooks) == 0) + num_hooks = tal_count(hook->hooks); + if (num_hooks == 0) return; plugins = notleak(tal_arr(NULL, struct plugin *, - tal_count(hook->hooks))); - for (i = 0; i < tal_count(hook->hooks); ++i) + num_hooks)); + for (i = 0; i < num_hooks; ++i) plugins[i] = hook->hooks[i]->plugin; ph_req = notleak(tal(hook->hooks, struct plugin_hook_request)); - /* FIXME: do IO logging for this! */ - req = jsonrpc_request_start(NULL, hook->name, NULL, NULL, - db_hook_response, - ph_req); - ph_req->hook = hook; ph_req->db = db; - ph_req->plugin = hook->hooks[0]->plugin; - - json_add_num(req->stream, "data_version", db_data_version_get(db)); - - json_array_start(req->stream, "writes"); - for (size_t i = 0; i < tal_count(changes); i++) - json_add_string(req->stream, NULL, changes[i]); - json_array_end(req->stream); - jsonrpc_request_end(req); - - plugin_request_send(ph_req->plugin, req); + ph_req->cb_arg = &num_hooks; + + for (i = 0; i < num_hooks; ++i) { + /* Create an object for this plugin. */ + struct db_write_hook_req *dwh_req; + dwh_req = tal(ph_req, struct db_write_hook_req); + dwh_req->plugin = plugins[i]; + dwh_req->ph_req = ph_req; + dwh_req->num_hooks = &num_hooks; + + /* FIXME: do IO logging for this! */ + req = jsonrpc_request_start(NULL, hook->name, NULL, NULL, + db_hook_response, + dwh_req); + + json_add_num(req->stream, "data_version", + db_data_version_get(db)); + + json_array_start(req->stream, "writes"); + for (size_t i = 0; i < tal_count(changes); i++) + json_add_string(req->stream, NULL, changes[i]); + json_array_end(req->stream); + jsonrpc_request_end(req); + + plugin_request_send(plugins[i], req); + } /* We can be called on way out of an io_loop, which is already breaking. * That will make this immediately return; save the break value and call @@ -371,7 +400,9 @@ void plugin_hook_db_sync(struct db *db) assert(ret2 == ph_req); io_break(ret); } + assert(num_hooks == 0); tal_free(plugins); + tal_free(ph_req); } static void add_deps(const char ***arr, diff --git a/tests/plugins/dbdummy.py b/tests/plugins/dbdummy.py new file mode 100755 index 000000000000..ecf5a3fd2bc8 --- /dev/null +++ b/tests/plugins/dbdummy.py @@ -0,0 +1,16 @@ +#! /usr/bin/env python3 +'''This plugin is a do-nothing backup plugin which just checks that we +can handle multiple backup plugins. +''' + +from pyln.client import Plugin + +plugin = Plugin() + + +@plugin.hook('db_write') +def db_write(plugin, **kwargs): + return {'result': 'continue'} + + +plugin.run() diff --git a/tests/test_plugin.py b/tests/test_plugin.py index 5d49b6991815..b0bc18e43198 100644 --- a/tests/test_plugin.py +++ b/tests/test_plugin.py @@ -478,6 +478,32 @@ def test_db_hook(node_factory, executor): assert [x for x in db1.iterdump()] == [x for x in db2.iterdump()] +@unittest.skipIf(os.getenv('TEST_DB_PROVIDER', 'sqlite3') != 'sqlite3', "Only sqlite3 implements the db_write_hook currently") +def test_db_hook_multiple(node_factory, executor): + """This tests the db hook for multiple-plugin case.""" + dbfile = os.path.join(node_factory.directory, "dblog.sqlite3") + l1 = node_factory.get_node(options={'plugin': os.path.join(os.getcwd(), 'tests/plugins/dblog.py'), + 'important-plugin': os.path.join(os.getcwd(), 'tests/plugins/dbdummy.py'), + 'dblog-file': dbfile}) + + # It should see the db being created, and sometime later actually get + # initted. + # This precedes startup, so needle already past + assert l1.daemon.is_in_log(r'plugin-dblog.py: deferring \d+ commands') + l1.daemon.logsearch_start = 0 + l1.daemon.wait_for_log('plugin-dblog.py: replaying pre-init data:') + l1.daemon.wait_for_log('plugin-dblog.py: CREATE TABLE version \\(version INTEGER\\)') + l1.daemon.wait_for_log("plugin-dblog.py: initialized.* 'startup': True") + + l1.stop() + + # Databases should be identical. + db1 = sqlite3.connect(os.path.join(l1.daemon.lightning_dir, TEST_NETWORK, 'lightningd.sqlite3')) + db2 = sqlite3.connect(dbfile) + + assert [x for x in db1.iterdump()] == [x for x in db2.iterdump()] + + def test_utf8_passthrough(node_factory, executor): l1 = node_factory.get_node(options={'plugin': os.path.join(os.getcwd(), 'tests/plugins/utf8.py'), 'log-level': 'io'})