apache / accumulo-proxy

Apache Accumulo Proxy
https://accumulo.apache.org
Apache License 2.0
9 stars 19 forks source link

Thousands of ProxyServer updates get silently lost if BatchWriter is closed right after the last update has been sent #32

Open m-g-r opened 2 years ago

m-g-r commented 2 years ago

Describe the bug

Having optimized our insertion of data to Accumulo (see https://observablehq.com/@m-g-r/almost-600000-entries-per-second-from-lisp-to-accumulo) I noticed that the data written was often not complete when deleting entries with deleteCell mutations. At the same time there were not any errors to be seen on the client side nor in any log files.

The problem seems to be caused by a combination of three things of the Accumulo Proxy, its Thrift interface but also in the client library of Accumulo that is used by the proxy:

  1. The methods flush(), close(), addMutation() etc. in the BatchWriter of the Accumulo Core client library are all marked "synchronized" but the shared internal resources itself, especially the boolean closed, the MutationSet mutations, and the long integer totalMemUsed are not protected from simultaneous use by different threads.

"Synchronized" means that close() cannot be run at the same time by two threads but it still can run while addMutation() is runnig, for example.

Here, addMutation() can be running and in a waiting state (for background jobs to write data to Accumulo) while close() is run by a new thread which then prevents addMutation() from finishing. (More on this further down.)

  1. The update call of the Accumulo Proxy is marked as "oneway".

Thus errors cannot be sent back to the client immediately. Instead if something gets wrong for an update call, the client can only be informed by a subsequent call.

This seems to be the intention that the flush or closeWriter calls can throw an MutationsRejectedException. But this works only if those calls are not handled too early. That is, if I send a number of update calls the client continues without delay as these are oneway calls. The following flush or closeWriter will be send out immediately as well. If the threads handling the update calls are slower than the threads handling the closeWriter(), those slow update calls cannot be handled anymore.

At the same time, as the close has happened already, the writer cannot be used anymore and the client will never be informed about those errors during the late updates.

  1. Errors during the update call are not properly handled and do not even lead to log messages.

The reason seems to be that in 2013, when fixing "ACCUMULO-1340 made proxy update call tolerate unknown session ids" the catch clause from ProxyServer.update() got changed like this:

try {
       BatchWriterPlusException bwpe = getWriter(writer);
       addCellsToWriter(cells, bwpe);
-    } catch (Exception e) {
-      throw new TException(e);
+    } catch (UnknownWriter e) {
+      // just drop it, this is a oneway thrift call and throwing a TException seems to make all subsequent thrift calls fail
     }
   }

with the side effect that also any other exceptions aside from UnknownWriter do not get thrown as TExceptions now. And Accumulo Proxy seems to ignore it aside from writing to stdout or stderr about it.

I only saw the reason for our dropped mutations when running the Accumulo Proxy in the foreground:

2022-08-08 13:55:05,897 [thrift.ProcessFunction] ERROR: Internal error processing update
java.lang.IllegalStateException: Closed
        at org.apache.accumulo.core.clientImpl.TabletServerBatchWriter.addMutation(TabletServerBatchWriter.java:243)
        at org.apache.accumulo.core.clientImpl.BatchWriterImpl.addMutation(BatchWriterImpl.java:44)
        at org.apache.accumulo.proxy.ProxyServer.addCellsToWriter(ProxyServer.java:1389)
        at org.apache.accumulo.proxy.ProxyServer.update(ProxyServer.java:1453)
        at jdk.internal.reflect.GeneratedMethodAccessor5.invoke(Unknown Source)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:567)
        at org.apache.accumulo.core.trace.TraceUtil.lambda$wrapService$8(TraceUtil.java:232)
        at com.sun.proxy.$Proxy9.update(Unknown Source)
        at org.apache.accumulo.proxy.thrift.AccumuloProxy$Processor$update.getResult(AccumuloProxy.java:9652)
        at org.apache.accumulo.proxy.thrift.AccumuloProxy$Processor$update.getResult(AccumuloProxy.java:9633)
        at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:38)
        at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
        at org.apache.accumulo.server.rpc.TimedProcessor.process(TimedProcessor.java:61)
        at org.apache.thrift.server.AbstractNonblockingServer$FrameBuffer.invoke(AbstractNonblockingServer.java:518)
        at org.apache.accumulo.server.rpc.CustomNonBlockingServer$CustomFrameBuffer.invoke(CustomNonBlockingServer.java:112)
        at org.apache.thrift.server.Invocation.run(Invocation.java:18)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at org.apache.accumulo.fate.util.LoggingRunnable.run(LoggingRunnable.java:35)
        at java.base/java.lang.Thread.run(Thread.java:830)

Alas, the client code thinks all went well and continues to run as if no error has happened.

Versions (OS, Maven, Java, and others, as appropriate):

To Reproduce

I have written a little test case to check the severity of the problem but as these are written in Common Lisp they will probably not be of help for you. I describe them instead.

First, I add a number of simple entries to Accumulo (just numbers as key and value), then I count. Afterwards I try to delete all entries, and count again if the deletion was successful.

I do this deletion with a batch scanner over all entries, creating simple update mutation with a ColumnUpdate with deleteCell true for each row found by the scanner. The updates I send to Accumulo with a writer. After the last update call I explicitly call flush and then close the writer. In Lisp this deletion function looks like this:

(defun delete-all-values (table-name &key (k *scanner-next-k-entries*))
  ;; use a separate connection for the scanner, to make it as quick as doing the updates after the scanning
  (accumulo-client:with-connection (writer-connection)
    (accumulo-client:with-connection (accumulo-client:*connection*)
      (let ((writer (raccumulo.i::create-writer table-name writer-connection)))
        (unwind-protect
             (accumulo-client:with-scanner (scanner table-name)
                                           (:batch-scanner-p t :threads *scanner-threads*)
               (loop for (entries more-p) = (multiple-value-list (accumulo-client:scanner-next-k-entries scanner :k k))
                     do (dolist (key-value entries)
                          (let* ((key (accumulo:keyvalue-key key-value))
                                 (row (accumulo:key-row key)))
                            (accumulo.accumulo-proxy:update (accumulo-client:connection-client writer-connection)
                                                            writer
                                                            (thrift:map row
                                                                        (thrift:list
                                                                         (accumulo:make-columnupdate :delete-cell t))))))
                     while more-p))
          (raccumulo.i::flush-writer writer)
          (raccumulo.i::close-writer writer))))))

The test function is:

  (defun test (&optional (count 1000))
    (delete-entries :check-at-end t)
    (count-entries)
    (insert-entries count)
    (format t "~&inserted: ~d~%" (count-entries))
    (delete-entries)
    (let* ((num (count-entries))
           (succ (zerop num)))
      (format t "after deletion: ~d~%" num)
      (format t "~a~&" (if succ :successful :failed))
      (values succ num count)))

And then a loop to do it a number of times is:

  (defun test-loop (&optional (times 3) (count 1000))
    (every #'identity
           (loop for i from 0 below times
                 do (format t "~&~%round: ~d" i)
                 collect (test count))))

When I call it to make 10 rounds with 100.000 entries each, the outcome is:

  round: 0
  inserted: 100000
  after deletion: 15381
  FAILED

  round: 1
  inserted: 100000
  after deletion: 13338
  FAILED

  round: 2
  inserted: 100000
  after deletion: 18683
  FAILED

  round: 3
  inserted: 100000
  after deletion: 14296
  FAILED

  round: 4
  inserted: 100000
  after deletion: 9983
  FAILED

  round: 5
  inserted: 100000
  after deletion: 16286
  FAILED

  round: 6
  inserted: 100000
  after deletion: 12158
  FAILED

  round: 7
  inserted: 100000
  after deletion: 18712
  FAILED

  round: 8
  inserted: 100000
  after deletion: 10087
  FAILED

  round: 9
  inserted: 100000
  after deletion: 18290
  FAILED

Each time a couple of thousand entries stay in the table. In the best case "only" 9.983 and in the worst case even 19.290.

The Accumulo Proxy displays 37 times "ERROR: Internal error processing update java.lang.IllegalStateException: Closed" during that call. Full result attached: 20220808-tests-oneway_again-with-errors.txt

I wrote another very simple test function to see how many updates I can send at a time without getting a fault:

  (defun meta-test-loop (&optional (times 3) (max 100) (step 1))
    (every #'identity
           (loop for i from 0 below max by step
                 do (format t "~&~%meta round: ~d" i)
                 collect (test-loop times i) into result
                 do (format t "~&~%meta round: ~d, result: ~a" i result)
                 finally (return result))))

I called it as "(meta-test-loop 10 10 1)" that is start from 0 to 10 and write that number of entries 10 times. Already in round 5 it failed once. In round 6 it failed six times, in round 9 it failed 8 times out of 10. Full result attached: 20220808-tests-oneway_again-with-errors2.txt

Workarounds

When I add a delay of at least a couple of 100ms before closeWriter the problem starts to vanish. But as I do not receive any errors during an update because of problem 3 above, I can never be sure if it really succeeded. If the machine is under heavy load it might change.

For a delete it is simple: I can count the entries at the end and if the number is not zero, I need to wait longer. That is what I have implemented in the function "(delete-entries :check-at-end t)". But for more complex mutation, this is not feasible. (As basically all mutation work needs to be retrieved from the server and checked explicitly.)

The only easy workaround was to change the update call not to be oneway anymore and recompile the Java and Common Lisp Thrift interface of the Accumulo Proxy and then build a new Accumulo Proxy. With that change I do not see any errors anymore and all deletions are successful. The tests above as "(test-loop 10 100000)" run without any errors at all.

But that comes with a severe drop in performance, instead of 600,000 entries per second for my benchmark I get only 250,000. Other more complex import tasks take 19 hours instead of 3.

More on the flush operation of the BatchWriter and analysis

The flush operation as implemented in BatchWriter in close() just waits that all work as stored in the MutationSet is handled by the mutation writer background threads.

This might be good enough for an inbetween flush but not if you want to close() and thus terminate or stutdown the writer. There might be threads just in the moment adding to the mutations.

This code is in the core of Accumulo in the file: https://github.com/apache/accumulo/blob/rel/2.0.1/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java There is a longer comment at the beginning on how it operates.

It just looks at memory usage of the mutation, which is computed and updated. Each added mutation increases it by an estimation, each time a mutation is sent to the server it is reduced by the bytes sent.

flush() or close() just waits while "totalMemUsed > 0 && !somethingFailed" holds true, and assumes afterwards that all work is done. This would usually be the case when totalMemUsed reaches zero.

addMutation() increases totalMemUsed in the line:

      totalMemUsed += m.estimatedMemoryUsed();

but that line is quite late in the function and the counter seems not be protected to be used from threads running in parallel. Only the functions flush(), close(), addMutation() etc. are all marked "synchronized" but that means close() can run while addMutation() is running.

When I write 100.000 entries to Accumulo in one go, I expect there to be quite a number of threads running addMutatation() which would wait in the line

    waitRTE(() -> (totalMemUsed > maxMem || flushing) && !somethingFailed);

But at the end when close() is called, close immediately sets

      closed = true;

which then triggers the check in addMutation() just following the WaitRTE() above:

    // do checks again since things could have changed while waiting and not holding lock
    if (closed)
      throw new IllegalStateException("Closed");

And that leads to the observed "java.lang.IllegalStateException: Closed" as reported by Accumulo Proxy.

Hm, it is really just the flag "closed" that causes this problem. But the waiting by the line

   waitRTE(() -> totalMemUsed > 0 && !somethingFailed);

in close() is also not enough to make sure that no other thread is not adding already more work in addMutation() as it got past the second "if (closed)" check and handled the mutation already before increasing the memory counter.

This all seems rather thread unsafe. The precautions are not effective. In addition to this, it would be good if the client of the Accumulo Proxy had also a chance to test if all work was done. For example, by flush returning the number of mutation processed.

I have no idea why this is not a problem for others. Is it not? The Common Lisp implementation code for Thrift compiles to native machine code, which runs efficiently, while having something delay the close just a little bit often alleviates the problem. But the problem should also exhibit itself when using the Java client library alone, that is, without the Accumulo Proxy (as long as one does not explicitly manage all threads oneself and makes sure that close() is never run as long as there are threads that might call addMutation()). Strange.

dlmarion commented 2 years ago

A lot of information here, thanks for the detailed explanation. I have not read through all of it yet, but I wanted to make an observation about this:

The methods flush(), close(), addMutation() etc. in the BatchWriter of the Accumulo Core client library are all marked "synchronized" but the shared internal resources itself, especially the boolean closed, the MutationSet mutations, and the long integer totalMemUsed are not protected from simultaneous use by different threads. "Synchronized" means that close() cannot be run at the same time by two threads but it still can run while addMutation() is runnig, for example. Here, addMutation() can be running and in a waiting state (for background jobs to write data to Accumulo) while close() is run by a new thread which then prevents addMutation() from finishing. (More on this further down.)

If I were to write a program in Java that had multiple threads writing to a batch writer, only addMutation would be called from the threads. The main thread of the program would wait for all of the other threads to finish, then the main thread would call close.

Edit: Reading further, it looks like if the proxy update calls were not oneway, then your client thread would wait for the result and the close would not occur early.

dlmarion commented 2 years ago

I'm going to transfer this issue to the accumulo-proxy project.

m-g-r commented 2 years ago

@dlmarion wrote:

Edit: Reading further, it looks like if the proxy update calls were not oneway, then your client thread would wait for the result and the close would not occur early.

Exactly, that is why I've made this change as a quick workaround. But this causes a significant drop in performance as noted. So it is "quick" to implement but slow in effect.

I'm going to transfer this issue to the accumulo-proxy project.

I have made this ticket to the main accumulo project as the way that core client library is written it makes it quite hard for an efficient network library (such as exhibited by the proxy) to be written on top of it. As you write, one has to keep track of all threads using a BatchWriter explicitly oneself. This is hard when the reference to writer is handed over to you by the network call.

dlmarion commented 2 years ago

With respect to the BatchWriter, I don't think you want to use it in the way it was intended. I think the behavior that you want would necessitate a different BatchWriter implementation. I think there are several other things I'm unclear on. However, I think it might be easier if you were to approach this from telling us what you are trying to achieve, and then we can tell you what the best way to get there might be.

dlmarion commented 2 years ago

With respect to the Proxy, specifically The update call of the Accumulo Proxy is marked as "oneway". seems like a problem. Without any type of feedback, I don't know how a client would know to handle errors or move forward and close things.

m-g-r commented 2 years ago

@dlmarion wrote:

With respect to the BatchWriter, I don't think you want to use it in the way it was intended. I think the behavior that you want would necessitate a different BatchWriter implementation. I think there are several other things I'm unclear on. However, I think it might be easier if you were to approach this from telling us what you are trying to achieve, and then we can tell you what the best way to get there might be.

I just use Accomulo Proxy. I do what I described in section "To Reproduce" of my bug report: I open a writer, add a couple of updates using the writer and close it. All in the same thread of my client. I am just a user here using the Thrift interface from another programming language.

Internally, Accumulo Proxy would naturally create a BatchWriter: https://github.com/apache/accumulo-proxy/blob/main/src/main/java/org/apache/accumulo/proxy/ProxyServer.java#L1526 to implement what Accumulo Proxy exposes as a writer. And it is Accumulo Proxy that uses multiple threads to handle requests.

With respect to the Proxy, specifically The update call of the Accumulo Proxy is marked as "oneway". seems like a problem. Without any type of feedback, I don't know how a client would know to handle errors or move forward and close things.

Accumulo Proxy offers a network interface, that means, round trips are much more expensive. From the implementation of Accumulo Proxy I deduct that the BatchWriter is wrapped in a class BatchWriterPlusProblem: https://github.com/apache/accumulo-proxy/blob/main/src/main/java/org/apache/accumulo/proxy/ProxyServer.java#L142 This would try to collect any errors of update calls as MutationsRejectedException to be returned on a flush or closeWriter call.

That fails. My bug report describes how so and why.