sasha-polev / aerospark

Aerospike Spark Connector
Apache License 2.0
35 stars 38 forks source link

collect doesn't get the right results from tmpTbl #13

Closed sasi21033 closed 8 years ago

sasi21033 commented 8 years ago

Hi, I'm facing with issue on my environment when my dataframe doesn't display the real status of my Aerospike bin. I have the following code:

 SparkContextFunctions sparkContextFunctions = new SparkContextFunctions(spark_context); 
            String aeroSpikeAddress = host+port;
            String query = "select * from bin";
            dataFrame = sparkContextFunctions.aeroSInput(aeroSpikeAddress, query, sqlCtx, 6);
            dataFrame .registerTempTable("testTbl");
            dataFrame .persist(StorageLevel.MEMORY_ONLY());

After create the tmpTbl I do the following:

subscribersDataFrame.where(whereClause).collect();

At the first time I get the same values of the Aerospike bin, but if I add or delete row I don't get the same status. If I drop the table and create it again then I get the right status. Thanks. Sasi

sasha-polev commented 8 years ago

Hi Sasi

This is expected behaviour of cached dataframe.

Thanks Sasha On 20 Jan 2016 20:04, "sasi21033" notifications@github.com wrote:

Hi, I'm facing with issue on my environment when my dataframe doesn't display the real status of my Aerospike bin I have the following code:

SparkContextFunctions sparkContextFunctions = new SparkContextFunctions(spark_context); String aeroSpikeAddress = host+port; String query = "select * from bin"; dataFrame = sparkContextFunctionsaeroSInput(aeroSpikeAddress, query, sqlCtx, 6); dataFrame registerTempTable("testTbl"); dataFrame persist(StorageLevelMEMORY_ONLY());

After create the tmpTbl I do the following:

subscribersDataFramewhere(whereClause)collect();

At the first time I get the same values of the Aerospike bin, but if I add or delete row I don't get the same status If I drop the table and create it again then I get the right status Thanks Sasi

— Reply to this email directly or view it on GitHub https://github.com/sasha-polev/aerospark/issues/13.

This message contains confidential information. If you are not the intended recipient, please notify the sender and delete the message immediately. One Point Consulting Ltd is a limited liability company registered in England and Wales (registered number 05516457) and whose registered office is at Business Environment, 1 Olympic Way, Wembley, London, HA90NP, UK.

sasi21033 commented 8 years ago

Hi Sasha,

If the bin is getting update, so the RDD should be update too? I need to create for every call new dataframe?

Thanks, Sasi

sasha-polev commented 8 years ago

Do not use persist.

Thanks Sasha On 20 Jan 2016 20:13, "sasi21033" notifications@github.com wrote:

Hi Sasha,

If the bin is getting update, so the RDD should be update too? I need to create for every call new dataframe?

Thanks, Sasi

— Reply to this email directly or view it on GitHub https://github.com/sasha-polev/aerospark/issues/13#issuecomment-173224192 .

This message contains confidential information. If you are not the intended recipient, please notify the sender and delete the message immediately. One Point Consulting Ltd is a limited liability company registered in England and Wales (registered number 05516457) and whose registered office is at Business Environment, 1 Olympic Way, Wembley, London, HA90NP, UK.

sasi21033 commented 8 years ago

without persist it will act as memory and disk, but I want only memory. Thanks, Sasi

sasi21033 commented 8 years ago

OK i'll use cache().

sasha-polev commented 8 years ago

Did you try?

Unless you use cache or persist it should always recompute.

Thanks Sasha

Thanks Sasha On 20 Jan 2016 20:28, "sasi21033" notifications@github.com wrote:

without persist it will act as memory and disk, but I want only memory. Thanks, Sasi

— Reply to this email directly or view it on GitHub https://github.com/sasha-polev/aerospark/issues/13#issuecomment-173228587 .

This message contains confidential information. If you are not the intended recipient, please notify the sender and delete the message immediately. One Point Consulting Ltd is a limited liability company registered in England and Wales (registered number 05516457) and whose registered office is at Business Environment, 1 Olympic Way, Wembley, London, HA90NP, UK.

sasha-polev commented 8 years ago

No, so not use cache, it is default version of persist.

Thanks Sasha On 20 Jan 2016 20:30, "sasi21033" notifications@github.com wrote:

OK i'll use cache().

— Reply to this email directly or view it on GitHub https://github.com/sasha-polev/aerospark/issues/13#issuecomment-173229010 .

This message contains confidential information. If you are not the intended recipient, please notify the sender and delete the message immediately. One Point Consulting Ltd is a limited liability company registered in England and Wales (registered number 05516457) and whose registered office is at Business Environment, 1 Olympic Way, Wembley, London, HA90NP, UK.

sasi21033 commented 8 years ago

OK, thanks i'll run my tests without persist. Is there away to know how the table stored, e.g memory_only or memory and disk?

Thanks, Sasi

sasha-polev commented 8 years ago

Spark ui has some info on cached rdds. Not on pc right now.

For your purpose it should not be cached at all.

Thanks Sasha On 20 Jan 2016 20:39, "sasi21033" notifications@github.com wrote:

OK, thanks i'll run my tests without persist. Is there away to know how the table stored, e.g memory_only or memory and disk?

Thanks, Sasi

— Reply to this email directly or view it on GitHub https://github.com/sasha-polev/aerospark/issues/13#issuecomment-173232052 .

This message contains confidential information. If you are not the intended recipient, please notify the sender and delete the message immediately. One Point Consulting Ltd is a limited liability company registered in England and Wales (registered number 05516457) and whose registered office is at Business Environment, 1 Olympic Way, Wembley, London, HA90NP, UK.

sasi21033 commented 8 years ago

Hey Sasha, I didn't find any documents about aeroInput and aeroSInput. What is the real different between those 2 methods? (Expect the return value).

Thanks, Sasi

sasha-polev commented 8 years ago

Hi Sasi

AeroSInput is something you use with SparkSQL/Dataframe subsystem. It also has applied schema.

AeroInput gets you basic object AerospikeRDD, it does not have applied schema and also does not have any SparkSQL dependency. In a way it is most basic form of object in Spark, and thus has minimum functionality. You can still apply schema later as it is based on RDD of Rows.

Thanks Sasha

On 20 January 2016 at 16:30, sasi21033 notifications@github.com wrote:

Hey Sasha, I didn't find any documents about aeroInput and aeroSInput. What is the real different between those 2 methods? (Expect the return value).

Thanks, Sasi

— Reply to this email directly or view it on GitHub https://github.com/sasha-polev/aerospark/issues/13#issuecomment-173262342 .

This message contains confidential information. If you are not the intended recipient, please notify the sender and delete the message immediately. One Point Consulting Ltd is a limited liability company registered in England and Wales (registered number 05516457) and whose registered office is at Business Environment, 1 Olympic Way, Wembley, London, HA90NP, UK.

sasi21033 commented 8 years ago

Hi Sasha,

By removing the persist it works! Thanks! Got a quick question, I don't know where to point the following problem on the connector or the driver. I'm running on JBoss, so my driver is running via JBoss. Sometimes, i'm getting DataFrame with size of N but the inside object are empty. E.g.: Rows.size == N Rows.get(0).getString(0) <---result empty.

If I do restart to JBoss then I get the DataFrame back to normal.

Do you know what should I check? Thanks, Sasi

sasha-polev commented 8 years ago

Hi Sasi

Good things work for you.

I am not sure where to start looking for your problem. It may be empty bins in Aerospike, or when you do transformation.

If it changes over time on Rows object assigned to variable, it may be that objects to which Rows collection refers are de-allocated by some process.

Thanks Sasha

On 25 January 2016 at 19:45, sasi21033 notifications@github.com wrote:

Hi Sasha,

By removing the persist it works! Thanks! Got a quick question, I don't know where to point the following problem on the connector or the driver. I'm running on JBoss, so my driver is running via JBoss. Sometimes, i'm getting DataFrame with size of N but the inside object are empty. E.g.: Rows.size == N Rows.get(0).getString(0) <---result empty.

If I do restart to JBoss then I get the DataFrame back to normal.

Do you know what should I check? Thanks, Sasi

— Reply to this email directly or view it on GitHub https://github.com/sasha-polev/aerospark/issues/13#issuecomment-174520126 .

This message contains confidential information. If you are not the intended recipient, please notify the sender and delete the message immediately. One Point Consulting Ltd is a limited liability company registered in England and Wales (registered number 05516457) and whose registered office is at Business Environment, 1 Olympic Way, Wembley, London, HA90NP, UK.

sasi21033 commented 8 years ago

Hi Sasha,

Let me split my answers into steps: 1) My Aerospike contains data, when I do aql on the bin I get values. 2) I have the following method:

DataFrame datataFrame = null;
        try {
            datataFrame = getSqlContext(getSparkContext()).table("test");
        } catch (Throwable) {
        }
        if (subscribersDataFrame == null) {
            SparkContextFunctions sparkContextFunctions = new SparkContextFunctions(getSparkContext());
            datataFrame = sparkContextFunctions.aeroSInput(aeroSpikeAddress, query, getSqlContext(getSparkContext()), 
            subscribersDataFrame.registerTempTable("test");
        }
        return subscribersDataFrame;

The code above work only when someone call doQuery method.

 Arrays.asList(subscribersDataFrame.limit(maxResults).collect());

This code work as intended, but sometimes the collect return Rows but if you run rows.get(0).getString(0), then the value that return is empty.

I tried to create new DataFrame when I got empty value, but it didn't work.

The workaround I have, is that when I do restart to JBoss, then the DataFrame back to normal.

I thought about scenario that my Aerospike load after my Spark create the DataFrame and then it cause this empty values.

Hope you can help me with that, because I didn't find any issue on Spark Jira, or any documents about it in the wide. It seems that I'm the only one who set Spark and Aerospark via JBoss :).

Thanks, Sasi

sasi21033 commented 8 years ago

Closing this issue and open new one.