OryxProject / oryx

Oryx 2: Lambda architecture on Apache Spark, Apache Kafka for real-time large scale machine learning
http://oryx.io
Apache License 2.0
1.79k stars 405 forks source link

/knownItems/uid doesn't return all items #354

Closed stiv-yakovenko closed 5 years ago

stiv-yakovenko commented 5 years ago

I log all items, that were sent to oryx to a text file:

image

But if I submit this url to oryx, I get shorter list of IDS:

image

How is it possible that some items are missing?

srowen commented 5 years ago

Without more info, impossible to say. There are many reasons. Nothing updates until the speed layer has processed the data. Items won't get added by the speed layer if they are all attached to users that also don't exist yet. (They will after the batch layer runs.) Etc.

stiv-yakovenko commented 5 years ago

I've looked through logs, I see this new sort of interesting crash:

Sep 21 19:33:36 htm-psycho-401.zxz.su oryx-run.sh[31787]: java.lang.ClassCastException
Sep 21 19:35:02 htm-psycho-401.zxz.su oryx-run.sh[31787]: 2018-09-21 19:35:02,638 INFO  ALSServingModelManager:96 ALSServingModel[
Sep 21 19:36:02 htm-psycho-401.zxz.su oryx-run.sh[31787]: 2018-09-21 19:36:02,638 INFO  ALSServingModelManager:96 ALSServingModel[
Sep 21 19:38:24 htm-psycho-401.zxz.su oryx-run.sh[31787]: 2018-09-21 19:38:24,533 INFO  ALSServingModelManager:96 ALSServingModel[
Sep 21 19:40:03 htm-psycho-401.zxz.su oryx-run.sh[31787]: 2018-09-21 19:40:03,031 INFO  ALSServingModelManager:96 ALSServingModel[
Sep 21 19:56:41 htm-psycho-401.zxz.su oryx-run.sh[31787]: 2018-09-21 19:56:41,621 INFO  ALSServingModelManager:96 ALSServingModel[
Sep 21 19:57:45 htm-psycho-401.zxz.su oryx-run.sh[31787]: 2018-09-21 19:57:45,477 INFO  ALSServingModelManager:108 Loading new mod
Sep 21 19:57:46 htm-psycho-401.zxz.su oryx-run.sh[31787]: 2018-09-21 19:57:46,526 INFO  ALSServingModelManager:122 Updating model
Sep 21 19:57:53 htm-psycho-401.zxz.su oryx-run.sh[31787]: 2018-09-21 19:57:53,545 WARN  AbstractServingModelManager:71 Error while

strangely I don't see stacktrace here

srowen commented 5 years ago

That's the same error as before right? That's still one I just don't get. It could explain this problem, yes.

stiv-yakovenko commented 5 years ago

No this seems to be another problem:

image

stiv-yakovenko commented 5 years ago

This is complete mystery, serving layer crashes with classnotfoundexeption, while I can see missing class in the jar file of serving layer!

image

stiv-yakovenko commented 5 years ago

Well, this seems to be some sort of trick from jetty web container, it has some special classloader to load class only from WEB-INF/lib first and only after that to try to load it from other places. But I don't know how to catch that other noname classnotfoud exception. Some data is lost and not stored in oryx....

srowen commented 5 years ago

Yeah, that class is definitely bundled. How are you running this? The binary does not use Jetty. You can't run it as a web app; well, you could pretty easily repackage it that way but you'd have to just make sure you included all the transitive dependencies, like the released JAR file does.

stiv-yakovenko commented 5 years ago

HI! You are right, this is not jetty, but rather tomcat/catalina false crash exception. I've added this checking code and even was able to see it reacting once: https://gist.github.com/stiv-yakovenko/0e32698f996f0988d188ad90073ed79e

After improving checking code, I was not able to reproduce the problem yet. My rough guess is that it might be somehow connected with JVM optimisations/instruction reordering somehow.

stiv-yakovenko commented 5 years ago

Answering your question, I am running this with oryx-run.sh on linux and I have forwarded port for debugging and supplied java param for that. I use intellij idea on Windows to perform debugging.

srowen commented 5 years ago

It uses embedded Tomcat. There is no WEB-INF or web app here. All the required classes are bundled in the single .jar file, which is run as a simple Java process.

You are running it correctly then. One potential issue is: what else is on the classpath? I assume you had to modify compute-classpath.sh to provide a few things for the serving layer as you're not on a CDH cluster. It's possible that is dragging in conflicting versions of libraries like the Kafka client?

stiv-yakovenko commented 5 years ago

To be clear, there are two separate issues here. Main issue is this classcastexception, which also happened to other users, as you mentioned. Something strange happens inside vectors collections, so that some keys are Objects, not Strings. In order to fix it I tried to put breakpoint on all exceptions in serving code. This way I've captured this kafka-related issue, but it seems for me that this is how it is expected to work: tomcat has custom class loader for its servlets, it tries to load classes from /lib file first (even if it is absent in your case) and then, on second try it loads all jars and then it works.

Now I've instrumented all access cases to vector collection, trying to find out which access has added non-string objects there. Unfortunately yet the problem has gone, even though I feed same data into oryx...

srowen commented 5 years ago

Tomcat does not load from any lib directory in this case. It is embedded Tomcat, not the web server. It would be an issue with the classpath provided to the java process.

stiv-yakovenko commented 5 years ago

Anyways the data is still lost. Here is how I debug it:

public synchronized static void flushOryx(StringBuilder sb, boolean force) {
    if (sb.length() == 0) return;
    if (sb.length() > 100000 || force) {
        URL url = null;
        try {
            String dataset = Config.properties.getProperty("dataset");
            if (!StringUtil.isEmpty(dataset)) {
                try(PrintWriter out = new PrintWriter(new BufferedWriter(new FileWriter(dataset, true)))) {
                    out.print(sb.toString());
                } catch (IOException e) {
                }
            }
            String oryx = Config.properties.getProperty("oryx");
            if (oryx!=null && oryx.length()>0) {
                url = new URL(oryx + "/ingest");
                HttpURLConnection con = (HttpURLConnection) url.openConnection();
                con.setRequestMethod("POST");
                con.setRequestProperty("Content-Type", "text/plain;charset=UTF-8");
                con.setDoOutput(true);
                con.getOutputStream().write(sb.toString().getBytes("UTF-8"));
                InputStream inputStream = con.getInputStream();
                if (inputStream != null)
                    inputStream.close();
                String oryxBlock = "./downloads/oryxblock." + getCurrentTimeStamp() + "."+con.getResponseCode()+".txt";
                try(PrintWriter pw = new PrintWriter(new BufferedWriter(new FileWriter(oryxBlock)))) {
                    pw.print(sb.toString());
                }
                System.out.println("Ingested block to oryx, code=" + con.getResponseCode());
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        sb.setLength(0);
    }
}

This function receives StringBuffer as array of lines for /ingest, if data is flushed then the same block is written to file and also return code is written to file like this:

image

but if I try to get recommendation for this user, it is not found:

image

I've observed the problem many times already, so I can reproduce it.

srowen commented 5 years ago

I think this still doesn't rule out the normal explanations in https://github.com/OryxProject/oryx/issues/354#issuecomment-423537270

stiv-yakovenko commented 5 years ago

oh, sorry so much, this was old cluster running old version with koloboke, retesting with updated jars now.

stiv-yakovenko commented 5 years ago

Ok. After upgrading to oryx 7.2, any exceptions are now gone from logs. But, there are still users, which I've added two hours ago to oryx successfully with code 204, and these users are reported to be unknown. generation-interval-sec is 100 for batch, streaming and serving layers:

batch {
    streaming {
      generation-interval-sec = 100
      ....
    }
    ....
  }

speed {
    streaming {
      generation-interval-sec = 100
      ....
    }
    ....
  }
speed {
    serving {
      generation-interval-sec = 100
      ....
    }
    ....
  }

What else is missing in my explanation?

srowen commented 5 years ago

There are still many possibilities, from above. Are the layers running, have they finished batches successfully, is kafka OK, are those new users attached to items that are already known to the model, etc

stiv-yakovenko commented 5 years ago

I have isolated the problem. I have 8K data files ~100Kb each, I submit them to oryx by wget command from sh script one by one. In the middle of processing I can see one of the users (with /recommend/8b6a3d7b68a9374c376f5e4e9f39c2c9). But in the middle of this data upload process something happens and /recommend/8b6a3d7b68a9374c376f5e4e9f39c2c9 gives 404, same as /knownItems. My hdd is not overflown and there are no exceptions in oryx logs. Later, in 10-20 minutes after data upload has finished /recommend/8b6a3d7b68a9374c376f5e4e9f39c2c9 starts working again. I can share datafiles with you, so you can easily observe it yourself.

srowen commented 5 years ago

Hm, everything makes sense there except why the user appears then disappears. When it becomes known to the speed layer, it should stay known until the new model comes, which also has the user.

The logic there is kind of complicated as the model has to track what IDs have arrived but won't be known to the next model, and save them. But also retire IDs that won't appear in the next model. I reviewed the logic and it looks right but who knows if I missed a corner case.

I don't have capacity to investigate much or debug but you can have a look at ALSServingModelManager and its handling of 'recent' IDs.

stiv-yakovenko commented 5 years ago

Unfortunately I don't understand this code. We have this in production and have no options what to do with it. May be you can at least confirm it on your side? Files are just ~~500M compressed.

srowen commented 5 years ago

I would make the speed layer interval relatively low and batch layer relatively high. 100 seconds each doesn't quite make sense. There's no such setting for the serving layer. I'm not able to run this right now (no easy access to a cluster). I'm still not clear whether your user interactions are with items that are new or not, which changes behavior. A pair with new user and new item won't cause either of them to appear in the speed layer. Otherwise I might just accept this behavior if the user is in fact finally appearing promptly, even if you see it appear earlier and then go away for some reason.

stiv-yakovenko commented 5 years ago

The user, which is lost, happens in multiple files, so I think this is highly likely an update. We can't accept it because we can't perform acceptance test of the system because of this bug. We have created special user who consumes certain types of content, then we tried to get recommendation for him and we get 404. Also we see this for many other users. I can give you ssh access to the cluster.

srowen commented 5 years ago

As I say I'm still not sure if the test is valid, nor your settings. For example your batch/speed generation times don't make sense. If you can narrow it down to a test case I can run, I'll look. Or if you can reason through a scenario where the update doens't happen as imagined, I'll look at that too.

stiv-yakovenko commented 5 years ago

I'll try with 10 sec for speed layer and 100 for batch layer, are these reasonable intervals?

srowen commented 5 years ago

Yeah, that's good. I haven't thought it through, but I what could be happening is that two new batch models arrive at the serving layer before the new user makes it into a third batch model. The user would be known to speed, get kept past one model that doesn't know about the new user, but then get removed after the second one didn't know it either. Then it comes back with the third.

That is 'correct' behavior but of course it's weird to produce two batch models quickly w.r.t. the speed layer. Because the former cycle should be much longer than the latter.

stiv-yakovenko commented 5 years ago

There is same story with 10 and 100 secs of settings. But in this case now I don't see this user even after some time has passed. User presents in the dataset.

srowen commented 5 years ago

Hm. What about the issue of user and item pairs that are both new? That won't register. I don't know if it explains things.

Can you summarize the data for the user you are looking at? What is the pattern you are sending? Maybe we can reason about what happens.

The best case would be a test case that can show the situation but this may be hard to do. I will read the code again to think about the handling of new IDs

stiv-yakovenko commented 5 years ago

Small datasets cause crashes themselves, as I've reported already, so I don't think that reducing dataset can help. The dataset is 800M size so I can't really find any pattern. Pairs with problematic user id happen along the entire data. What do you mean by summarizing?

srowen commented 5 years ago

What crashes are you referring to? I was hoping this was a synthetic data set that was easy to characterize.

stiv-yakovenko commented 5 years ago

I made small dataset like that and it gave numerical crashes in one of the layers:

u0,i0,1,881250941 u0,i1,1,881250942 u0,i2,1,881250943 u1,i2,1,881250944 u1,i1,1,881250945 u1,i3,1,881250946

something like diving by zero, or numerical instability

srowen commented 5 years ago

Oh, yeah that's not a crash though. It can't compute a model of the rank you give. It can with different or more data, or if you reduce rank.

Back to this issue: the most likely way this happens is if multiple models are built and published after a new user arrives. When a user U arrives, a model may be building already and U won't be in it. But it will definitely be in the next model. The code accounts for that.

However, if model jobs got backed up because the batch interval is low but there's not enough capacity to keep up, then there may be many models built after U arrives. Then U won't survive after the second new model comes by without U.

While there are all kinds of ways to imagine dealing with, really, it's not supposed to get backed up. Are you seeing scheduling delay in the batch layer jobs? that is, does it not finish before a new batch starts? If that's the case, then increase the batch interval to a time that it can keep up with.

stiv-yakovenko commented 5 years ago

I don't know how to monitor scheduling delay in batch layer jobs.

srowen commented 5 years ago

Just look at the Spark UI's streaming tab. How long are the batches taking? is it longer than the batch interval? I suspect this because your data is largeish, interval small, and running on a single machine IIRC.

stiv-yakovenko commented 5 years ago

I set batch time to 1000 and now finally user is actually added. but it took hell lot of time for model to be created, for ~15 mins i was getting error 503. Are these timings you were asking for:

image

? what value is safe to put instead of 1000? Isn't it possible to tune it automatically or at least warn user if time is wrong?

srowen commented 5 years ago

The more data you have and less resource to process it, the longer it will take. I guess you need 30 minutes for your batch interval, to be safe. Or less data or more compute.

Yeah I think it would be possible to log a warning if the batches are taking longer than the configured interval. I'm also wondering if I can modify the code to account for this situation anyway and hold on to 'recent' IDs longer.

stiv-yakovenko commented 5 years ago

Now some of user-items are lost. For example dataset contains these pairs:

11111111111111111111111111111111,p858957,5 11111111111111111111111111111111,p688674,5 11111111111111111111111111111111,p904089,5 11111111111111111111111111111111,p692953,5 11111111111111111111111111111111,p692953,5 11111111111111111111111111111111,p696203,5 11111111111111111111111111111111,p696654,5 11111111111111111111111111111111,p698359,5 11111111111111111111111111111111,p694012,5 11111111111111111111111111111111,p903498,5 11111111111111111111111111111111,p698359,5 11111111111111111111111111111111,p698359,5 11111111111111111111111111111111,p698359,5 11111111111111111111111111111111,p698359,5 11111111111111111111111111111111,p694850,5 11111111111111111111111111111111,p694850,5 11111111111111111111111111111111,p690520,5 11111111111111111111111111111111,p871235,5 11111111111111111111111111111111,p871235,5 11111111111111111111111111111111,p905533,5

but /knownItems/11111111111111111111111111111111 returns only

p905533 p871235

stiv-yakovenko commented 5 years ago

streaming { generation-interval-sec = 1000 } streaming { generation-interval-sec = 10 }

srowen commented 5 years ago

Same answers as above. There are many reasons this could be the case, and I don't see reason to believe it's different here.

stiv-yakovenko commented 5 years ago

Is there a way how can I diagnose this? There is again no exceptions in logs:

image

spark doesn't indicate any failed jobs, all jobs timings are under 1000s:

image

2hrs has passed, isnt that enough for data to get processed?

srowen commented 5 years ago

It should be plenty of time, if your batch interval is only 1000 seconds. Reasons why not?

It's a somewhat complex distributed system. I think the behavior is pretty correct given all the integration tests that check the behavior of updates, but there are many reasons any particular data point can be surprising. It's hard to debug remotely.

stiv-yakovenko commented 5 years ago

Well, tests were not able to catch koloboke loosing data and data is still lost after koloboke was removed. Recommender itself is working, its recommending something, but based on incomplete data (some user-item pairs are lost). I see no errors in layers logs nor in spark. I have only topics created by oryx-run.sh. I am running single-pc cluster with 30Gb of RAM and enough free disk space.

stiv-yakovenko commented 5 years ago

One more scenario with another problem. I've saved all data which I supply to oryx into text files. Then I reset oryx (I have batch file for that, everything is reset, all possible temp files are deleted including kafka tmp files, hdfs formatting, etc). Now I push these data files to oryx one by one, it takes ~15 min to complete. Then oryx start calculating with CPU usage from 100% to 500%, it finishes in ~5h and then CPU usage is 0%. But serving layer reports 503. The only error I've found in logs is java out of memory error for batch layer, but batch layer is still alive. Each text file is under 100Kb.

srowen commented 5 years ago

It's really hard to say. There are a lot of moving pieces here and a lot of ways that the test scenario might not be what it seems. The safest thing is to delete the data dirs on HDFS and use new Kafka topics. If you run out of memory, the batch didn't complete processing. You won't see a model, I presume, in the output and so you have nothing to serve. Again, I think you either just need more memory and need to run on an actual cluster, or else, have some conflicting memory settings somewhere (e.g. YARN thinks it can allocate more than the machine has). I don't know from here.

stiv-yakovenko commented 5 years ago

I've reduced block sizes to 200kb (previously they were 1mb, not 100k, sorry for missinformation). No crashes in logs, only half of data is in oryx (I've written special utility to verify if data has been lost or not). I want to find at which stage data is lost. Which piece of code is responsible for adding data to the hdfs? How data is stored on hdfs, is it possible to check if its there?

stiv-yakovenko commented 5 years ago

For each experiment I delete all hdfs/kafka data files, reformat hdfs, recreate topics, redeploy spark apps, so each experiment is clean.

srowen commented 5 years ago

Nothing here suggests data is lost. You show above the model did not build because there is not enough memory.

It's not productive to continue the conversation in this way. You need to:

Otherwise I've provided here all I can guess about your setup and your issues and it needs to be followed up. You can also refer to the integration tests which pretty well demonstrate, with Kafka, there is no data loss and correct micro behavior. There can always be bugs or weird corner cases but I am unable to do any more with this info

stiv-yakovenko commented 5 years ago

I've created simple test, you can run it on your own:

package org.stivyakovenko;

import sun.net.www.protocol.http.HttpURLConnection;

import javax.net.ssl.HttpsURLConnection;
import java.io.*;
import java.net.URL;
import java.net.URLConnection;
import java.util.HashSet;
import java.util.Set;

public class OryxHighLoad {
    static int N = 1000000;
    static int n = 20;
    static String url = "http://htm-psycho-402.zxz.su:8080";

    static void post(String url, StringBuilder sb) throws IOException {
        URL u = new URL(url);
        HttpURLConnection con = (HttpURLConnection) u.openConnection();
        con.setRequestProperty("Content-Type", "text/plain;charset=UTF-8");
        con.setRequestMethod("POST");
        con.setDoOutput(true);
        con.getOutputStream().write(sb.toString().getBytes("UTF-8"));
        con.getInputStream();
    }

    public static String get(String urlStr) {
        URL url;
        try {
            url = new URL(urlStr);
            HttpURLConnection urlConnection = (HttpURLConnection) url.openConnection();
            urlConnection.connect();
            InputStream inputStream = urlConnection.getInputStream();
            java.util.Scanner s1 = new java.util.Scanner(inputStream).useDelimiter("\\A");
            String s = s1.hasNext() ? s1.next() : "";
            inputStream.close();
            return s;
        } catch (IOException e) {
            return null;
        }
    }

    public static void main(String[] args) throws IOException {
            StringBuilder sb = new StringBuilder();
            int cnt = 0;
            for (int u = 0; u < N; u++) {
                if (u % 1000 == 0) System.out.println(u + "/" + N);
                for (int i = 0; i < n; i++) {
                    sb.append("u" + u + ",i" + i + ",5\n");
                    cnt++;
                    if (sb.length() > 100000) {
                        post(url + "/ingest", sb);
                        sb.setLength(0);
                    }
                }
            }
             post(url + "/ingest", sb);
            System.out.println("wait and press enter to verify data");
            System.in.read();

        int lost=0;
        for (int u = 0; u < N; u++) {
            String res = get(url + "/knownItems/u" + u);
            Set<String> s = new HashSet<>();
            if (res != null) {
                String[] parts = res.split("\n");
                for (String p0 : parts) {
                    String p = p0.trim();
                    if (p.length() > 0)
                        s.add(p);
                }
            }
            for (int i = 0; i < n; i++) {
                if (!s.contains("i"+i)) {
                    lost++;
                }
            }
        }
        System.out.println("Lost records:" + lost + " of " + N*n);
    }
}

This simple test, ingests ~1Gb of data, 1M users, 20 items each. Then it gives you a chance to wait as long as you wish, then you can press ENTER and check how many user-item pairs are lost. I'd be gratefull if you at least try it on your oryx. For me it losses 10% of data, i.e. 2M pairs. There are no crashes in log files nor in spark GUI.

srowen commented 5 years ago

Among other things, this doesn't check whether the model is actually fully loaded. As I say, there are really a number of reasons this could happen, and they're not errors.

stiv-yakovenko commented 5 years ago

I've added code to perform second step endlessly to poll oryx forever, here is output:

Lost records:20000000 of 20000000
Lost records:13598400 of 20000000
Lost records:13598400 of 20000000
Lost records:13598400 of 20000000
Lost records:13598400 of 20000000
Lost records:13598400 of 20000000

i.e. ~~7M records are lost, never appearing in /knownItems there are no crashes in the log and no incomplete jobs in spark UI....

srowen commented 5 years ago

Again: nothing about this suggests anything was lost. You have to understand how the modeling process works to reason about what items exist in the model after a given input. Not every item in the input ends up in the model. See discussion above.

Off the top of my head: if this is non-implicit ALS then having every rating be 5 is a problem.

There could be a problem here. I do not have time or inclination to investigate this one unless it's narrowed down further for me, I'm afraid.