Closed chu11 closed 4 years ago
experimental patch to batch eventlog appends from output module in configurable interval using eventlogger:
diff --git a/src/shell/eventlogger.c b/src/shell/eventlogger.c
index b2e6b66..237cb06 100644
--- a/src/shell/eventlogger.c
+++ b/src/shell/eventlogger.c
@@ -97,6 +97,7 @@ static void eventlog_batch_error (struct eventlog_batch *batch, int errnum)
static void commit_cb (flux_future_t *f, void *arg)
{
struct eventlog_batch *batch = arg;
+ fprintf (stderr, "eventlogger kvs commit end\n");
if (flux_future_get (f, NULL) < 0)
eventlog_batch_error (batch, errno);
eventlogger_batch_complete (batch->ev, batch);
@@ -112,6 +113,7 @@ timer_cb (flux_reactor_t *r, flux_watcher_t *w, int revents, void *arg)
double timeout = ev->commit_timeout;
flux_future_t *f = NULL;
+ fprintf (stderr, "eventlogger kvs commit start\n");
if (!(f = flux_kvs_commit (h, NULL, 0, batch->txn))
|| flux_future_then (f, timeout, commit_cb, batch) < 0) {
eventlog_batch_error (batch, errno);
@@ -239,6 +241,25 @@ static int append_async (struct eventlogger *ev,
return 0;
}
+int eventlogger_append_entry (struct eventlogger *ev,
+ int flags,
+ const char *path,
+ json_t *entry)
+{
+ char *entrystr = NULL;
+ int rc = -1;
+
+ if (!(entrystr = eventlog_entry_encode (entry)))
+ return -1;
+
+ if (flags & EVENTLOGGER_FLAG_WAIT)
+ rc = append_wait (ev, path, entrystr);
+ else
+ rc = append_async (ev, path, entry, entrystr);
+ free (entrystr);
+ return rc;
+}
+
int eventlogger_append (struct eventlogger *ev,
int flags,
const char *path,
@@ -246,27 +267,17 @@ int eventlogger_append (struct eventlogger *ev,
const char *context)
{
int rc = -1;
- char *entrystr = NULL;
- int wait = 0;
- json_t *entry;
-
- if (flags & EVENTLOGGER_FLAG_WAIT)
- wait = 1;
+ json_t *entry = NULL;
- if (!(entry = eventlog_entry_create (0., name, context))
- || !(entrystr = eventlog_entry_encode (entry)))
+ if (!(entry = eventlog_entry_create (0., name, context)))
goto out;
-
- if (wait)
- rc = append_wait (ev, path, entrystr);
- else
- rc = append_async (ev, path, entry, entrystr);
+ rc = eventlogger_append_entry (ev, flags, path, entry);
out:
json_decref (entry);
- free (entrystr);
return rc;
}
+
/*
* vi:tabstop=4 shiftwidth=4 expandtab
*/
diff --git a/src/shell/eventlogger.h b/src/shell/eventlogger.h
index 8732c3e..aad59a7 100644
--- a/src/shell/eventlogger.h
+++ b/src/shell/eventlogger.h
@@ -53,6 +53,11 @@ int eventlogger_append (struct eventlogger *ev,
const char *name,
const char *context);
+int eventlogger_append_entry (struct eventlogger *ev,
+ int flags,
+ const char *path,
+ json_t *entry);
+
int eventlogger_set_commit_timeout (struct eventlogger *ev, double timeout);
#ifdef __cplusplus
diff --git a/src/shell/output.c b/src/shell/output.c
index b801b12..3f13c3d 100644
--- a/src/shell/output.c
+++ b/src/shell/output.c
@@ -56,6 +56,7 @@
#include "svc.h"
#include "internal.h"
#include "builtins.h"
+#include "eventlogger.h"
enum {
FLUX_OUTPUT_TYPE_TERM = 1,
@@ -75,6 +76,8 @@ struct shell_output_type_file {
struct shell_output {
flux_shell_t *shell;
+ struct eventlogger *ev;
+ double batch_timeout;
int refcount;
int eof_pending;
zlist_t *pending_writes;
@@ -334,84 +337,39 @@ error:
return rc;
}
-static void shell_output_kvs_completion (flux_future_t *f, void *arg)
+static int entry_output_is_kvs (struct shell_output *out, json_t *entry)
{
- struct shell_output *out = arg;
-
- /* Error failing to commit is a fatal error. Should be cleaner in
- * future. Issue #2378 */
- if (flux_future_get (f, NULL) < 0)
- shell_die_errno ("shell_output_kvs");
- flux_future_destroy (f);
-
- if (flux_shell_remove_completion_ref (out->shell, "output.kvs") < 0)
- shell_log_errno ("flux_shell_remove_completion_ref");
+ json_t *context;
+ const char *name;
+ const char *stream;
+ if (eventlog_entry_parse (entry, NULL, &name, &context) < 0) {
+ shell_log_errno ("eventlog_entry_parse");
+ return 0;
+ }
+ if (!strcmp (name, "data")) {
+ if (iodecode (context, &stream, NULL, NULL, NULL, NULL) < 0) {
+ shell_log_errno ("iodecode");
+ return 0;
+ }
+ if (!strcmp (stream, "stdout"))
+ return (out->stdout_type == FLUX_OUTPUT_TYPE_KVS);
+ else
+ return (out->stderr_type == FLUX_OUTPUT_TYPE_KVS);
+ }
+ return 0;
}
static int shell_output_kvs (struct shell_output *out)
{
json_t *entry;
size_t index;
- flux_kvs_txn_t *txn = NULL;
- flux_future_t *f = NULL;
- int saved_errno;
- int rc = -1;
-
- if (!(txn = flux_kvs_txn_create ()))
- goto error;
json_array_foreach (out->output, index, entry) {
- json_t *context;
- const char *name;
- const char *stream = NULL;
- if (eventlog_entry_parse (entry, NULL, &name, &context) < 0) {
- shell_log_errno ("eventlog_entry_parse");
- return -1;
+ if (entry_output_is_kvs (out, entry) &&
+ eventlogger_append_entry (out->ev, 0, "output", entry) < 0) {
+ return shell_log_errno ("eventlogger_append");
}
- if (!strcmp (name, "data")) {
- int output_type;
- if (iodecode (context, &stream, NULL, NULL, NULL, NULL) < 0) {
- shell_log_errno ("iodecode");
- return -1;
- }
- if (!strcmp (stream, "stdout"))
- output_type = out->stdout_type;
- else
- output_type = out->stderr_type;
- if (output_type == FLUX_OUTPUT_TYPE_KVS) {
- char *entrystr = eventlog_entry_encode (entry);
- if (!entrystr) {
- shell_log_errno ("eventlog_entry_encode");
- goto error;
- }
- if (flux_kvs_txn_put (txn,
- FLUX_KVS_APPEND,
- "output",
- entrystr) < 0) {
- free (entrystr);
- goto error;
- }
- free (entrystr);
- }
- }
- }
- if (!(f = flux_kvs_commit (out->shell->h, NULL, 0, txn)))
- goto error;
- if (flux_future_then (f, -1, shell_output_kvs_completion, out) < 0)
- goto error;
- if (flux_shell_add_completion_ref (out->shell, "output.kvs") < 0) {
- shell_log_errno ("flux_shell_remove_completion_ref");
- goto error;
}
- /* f memory responsibility of shell_output_kvs_completion()
- * callback */
- f = NULL;
- rc = 0;
-error:
- saved_errno = errno;
- flux_kvs_txn_destroy (txn);
- flux_future_destroy (f);
- errno = saved_errno;
- return rc;
+ return 0;
}
static int shell_output_write_fd (int fd, const void *buf, size_t len)
@@ -639,6 +597,7 @@ void shell_output_destroy (struct shell_output *out)
}
zhash_destroy (&out->fds);
}
+ eventlogger_destroy (out->ev);
free (out);
errno = saved_errno;
}
@@ -920,6 +879,51 @@ error:
return rc;
}
+static void output_ref (struct eventlogger *ev, void *arg)
+{
+ struct shell_output *out = arg;
+ flux_shell_add_completion_ref (out->shell, "output.txn");
+}
+
+
+static void output_unref (struct eventlogger *ev, void *arg)
+{
+ struct shell_output *out = arg;
+ flux_shell_remove_completion_ref (out->shell, "output.txn");
+}
+
+static int output_eventlogger_start (struct shell_output *out)
+{
+ flux_t *h = flux_shell_get_flux (out->shell);
+ struct eventlogger_ops ops = {
+ .busy = output_ref,
+ .idle = output_unref
+ };
+ int timeout;
+
+ out->batch_timeout = 0.5;
+
+ if (flux_shell_getopt_unpack (out->shell, "output",
+ "{s?f}",
+ "batch-timeout",
+ &out->batch_timeout) < 0) {
+ if (flux_shell_getopt_unpack (out->shell, "output",
+ "{s?i}",
+ "batch-timeout",
+ &timeout) == 0)
+ out->batch_timeout = timeout;
+ else
+ return shell_log_errno ("invalid output.batch-timeout option");
+ }
+
+ shell_log ("batch timeout = %.3fs", out->batch_timeout);
+
+ out->ev = eventlogger_create (h, out->batch_timeout, &ops, out);
+ if (!out->ev)
+ return shell_log_errno ("eventlogger_create");
+ return 0;
+}
+
struct shell_output *shell_output_create (flux_shell_t *shell)
{
struct shell_output *out;
@@ -975,6 +979,8 @@ struct shell_output *shell_output_create (flux_shell_t *shell)
goto error;
}
}
+ if (output_eventlogger_start (out) < 0)
+ goto error;
if (shell_output_header (out) < 0)
goto error;
}
taking @grondo's patch above and hacking a call to a new flux_kvs_txn_append_consolidate()
function I wrote (need a way to specify a flag in the eventlogger so this isn't hard-coded, will figure that out later).
diff --git a/src/shell/eventlogger.c b/src/shell/eventlogger.c
index 237cb06..f7700ec 100644
--- a/src/shell/eventlogger.c
+++ b/src/shell/eventlogger.c
@@ -114,6 +114,9 @@ timer_cb (flux_reactor_t *r, flux_watcher_t *w, int revents, void *arg)
flux_future_t *f = NULL;
fprintf (stderr, "eventlogger kvs commit start\n");
+ if (flux_kvs_txn_append_consolidate (batch->txn, "output") < 0)
+ eventlog_batch_error (batch, EDOM); /* just pick a stupid errno for now */
+
if (!(f = flux_kvs_commit (h, NULL, 0, batch->txn))
|| flux_future_then (f, timeout, commit_cb, batch) < 0) {
eventlog_batch_error (batch, errno);
@@ -203,6 +206,9 @@ static int append_wait (struct eventlogger *ev,
path, entrystr) < 0)
return -1;
+ if (flux_kvs_txn_append_consolidate (batch->txn, "output") < 0)
+ eventlog_batch_error (batch, EXDEV); /* just pick a stupid errno for now */
+
if (!(f = flux_kvs_commit (ev->h, NULL, 0, ev->current->txn))
|| flux_future_wait_for (f, ev->commit_timeout) < 0)
goto out;
got this via perf record src/cmd/flux start --size=64 flux mini run -n64 t/shell/lptest 80 500
before
19.03% 173854 flux-broker-0 libc-2.17.so [.] _int_malloc
13.61% 116472 flux-broker-0 libc-2.17.so [.] malloc_consolidate
9.38% 90917 flux-broker-0 libc-2.17.so [.] _int_free
7.79% 68078 flux-broker-0 libc-2.17.so [.] malloc
4.75% 121525 lt-flux-shell libflux-core.so.2.0.0 [.] cbuf_is_valid
2.72% 25001 flux-broker-0 libc-2.17.so [.] free
2.57% 65726 lt-flux-shell libpthread-2.17.so [.] pthread_mutex_trylock
1.99% 17022 flux-broker-0 libjansson.so.4.10.0 [.] json_delete
1.73% 14799 flux-broker-0 libjansson.so.4.10.0 [.] json_deep_copy
1.54% 39308 lt-flux-shell libc-2.17.so [.] __memcmp_sse4_1
after
33.00% 177760 lt-flux-shell libsodium.so.23.3.0 [.] sodium_base642bin
11.34% 59968 lt-flux-shell libsodium.so.23.3.0 [.] sodium_bin2base64
3.43% 18074 lt-flux-shell libjansson.so.4.10.0 [.] 0x0000000000007e33
3.07% 16213 lt-flux-shell libjansson.so.4.10.0 [.] 0x0000000000007e3a
2.05% 10786 lt-flux-shell libjansson.so.4.10.0 [.] 0x0000000000008024
1.46% 7698 lt-flux-shell libsodium.so.23.3.0 [.] 0x0000000000026168
1.35% 7168 lt-flux-shell libsodium.so.23.3.0 [.] 0x0000000000026194
1.23% 6512 lt-flux-shell libsodium.so.23.3.0 [.] 0x000000000002615e
1.23% 6485 lt-flux-shell libsodium.so.23.3.0 [.] 0x0000000000026153
1.18% 6235 lt-flux-shell libsodium.so.23.3.0 [.] 0x000000000002617f
Well, we've clearly traded a bunch of malloc/free calls in the broker for base64 conversions in the shell. Which is a big net win, since in production cases not all shells will be run on the same node.
Comparison using time
before
254.640u 286.778s 2:35.50 348.1% 0+0k 56+0io 0pf+0w
after
156.006u 225.006s 2:13.42 285.5% 0+0k 0+0io 0pf+0w
So I think we're on the right track.
I could probably re-work my flux_kvs_txn_append_consolidate()
to be more optimal
for this very specific case, instead of trying to be "smart" and handle some special cases.
But I think we're on the right track.
Wow, great result!
The shells are at least separate from the broker so the impact of busy shells is at least a bit distributed.
However, I wonder what the impact of dropping base64 encoding for utf-8 would be here. Still the test workload seems to take a little longer than I'd expect.
woo-hoo! With a more optimized flux_kvs_txn_append_consolidate()
.
4.89% 7680 flux-broker-0 libsodium.so.23.3.0 [.] sodium_base642bin
4.47% 6846 flux-broker-0 libsodium.so.23.3.0 [.] sodium_bin2base64
3.44% 6784 flux-broker-0 libc-2.17.so [.] __memcpy_ssse3_back
1.98% 3066 flux-broker-0 libjansson.so.4.10.0 [.] 0x0000000000007e3a
1.60% 2480 flux-broker-0 libjansson.so.4.10.0 [.] 0x0000000000007e33
1.49% 2284 flux-broker-0 libjansson.so.4.10.0 [.] 0x0000000000002751
1.25% 9074 flux-broker-0 libc-2.17.so [.] __libc_calloc
1.04% 7104 flux-broker-0 libc-2.17.so [.] _int_malloc
0.97% 1493 flux-broker-0 libjansson.so.4.10.0 [.] 0x0000000000007f9e
0.95% 1456 flux-broker-0 libjansson.so.4.10.0 [.] 0x0000000000007faf
and time
50.036u 219.624s 0:37.00 728.7% 0+0k 80+0io 0pf+0w
so a wall clock of 2:35 down to 0:37 :-)
Will get a pretty version up for a PR next week.
@grondo is your patch on a branch somewhere that I can cherry-pick?
Per discussion in #2505
This is "part 2" of improving shell output performance, as part 1 was the fix in PR #2526