coduin / epiphany-bsp

BSP implementation for the Parallella; the world's smallest supercomputer
https://jwbuurlage.github.io/epiphany-bsp/
GNU General Public License v3.0
27 stars 14 forks source link

Data Streams Problem #38

Open naeemb123 opened 7 years ago

naeemb123 commented 7 years ago

Hi,

I am wondering if it is possible to create 3 upstreams and 3 downstreams, and once they have been processed by the cores, prepare another set of 3 upstreams and 3 downstreams.

I am creating an image processing program which requires large data to be processed in parallel but the cores can only handle a certain amount of data. How can I send data back and forth to the cores until all data can be processed?

Is there a way of doing this ?

Tombana commented 7 years ago

Hi, You can create as many streams as you want, as long as there is enough memory for them. From the Epiphany cores, you can call ebsp_host_sync. This will trigger a callback on the host side and in this callback you can read out any processed data and fill up the existing streams with new data.

naeemb123 commented 7 years ago

Hi Tombana,

Thank you for your help! One question, when calling the function ebsp_set_sync_callback() on the host, is this before the ebsp_spmd() or after ?

naeemb123 commented 7 years ago

Also, after using 'ebsp_host_sync()' from the Epiphany cores, do you have to call ebsp_reset_down_cursor(0); to reset the cursor of the stream or does this happen automatically when you put new data into the existing stream?

naeemb123 commented 7 years ago

When you said the following " This will trigger a callback on the host side and in this callback you can read out any processed data and fill up the existing streams with new data."

How do you refill the existing down stream with new data on the host side? As far as the documentation goes, you can only set the data upon creating a down stream e.g. ebsp_create_down_stream(DATA,.....);

Essentially I will be creating new streams rather than fill existing streams right ?

Tombana commented 7 years ago
naeemb123 commented 7 years ago

Thank You Tombana, Really appreciate the help. I will experiment with the new function and give you feedback on this new feature.

Many Thanks, Naeem

naeemb123 commented 7 years ago

Just some questions for clarification:

  1. When you create a generic stream e.g. ebsp_stream_create on the Host side, without placing any intitial data onto it. Its regarded as an upstream right ? And vice versa when you create a generic stream with data, then its regarded as a downstream ?

  2. When you've created these streams assuming that the above question is correct. From the epiphany core side, will you open up streams, read from them, pass them back up to the Host, using the same functions as you would use before ? e.g. ebsp_open_up_stream(), ebsp_move_chunk_up, ebsp_open_down_stream(), ebsp_move_chunk_down() etc. Will these functions know which generic stream is an upstream and which stream is a down stream ?

Tombana commented 7 years ago
  1. The new streams are both up and down at the same time. You can fill them with data when you create them on the host, but you do not have to. If you do not add data, it will simply have zero tokens in it.

I forgot to mention: the indices of the stream are now global indices, instead of indices PER core.

  1. On the Epiphany side, you can use ebsp_stream_open and ebsp_stream_close to open and close streams. Then you can use ebsp_stream_move_down and ebsp_stream_move_up to move tokens from or into the streams.

With these new streams, the old functions (ebsp_move_chunk_down and so on) do not work properly anymore.

In include/e_bsp.h there is documentation for each function, given in the comments above the function.

naeemb123 commented 7 years ago

Thank you.

Im not entirely sure what you meant by global indices. Will this change how I read data from a generic stream in the cores ?

Tombana commented 7 years ago

By global indices I meant that when an Epiphany core refers to a stream with stream_id = 3 then it is the 4th (zero-based indices) stream that was created on the host. Different cores can open the same stream if they want (though not at the same time). In the old system, stream_id = 3 meant that it was the 4th stream created for that specific processor, and then you could not access a stream that was made for another core. In the old system, the stream_id index was local for each core, and in the new system it is global, meaning shared by all cores.

naeemb123 commented 7 years ago

So if I have different sets of data for each core, And I pass data using a for loop and calling ebsp_stream_create(..) inside the for loop, specifying the processor id, and specifying the processors unique set of data, When opening the stream from the cores, would all the streams contains the same id ? or would each core have different stream id's ? e.g. core 0 = stream_id 0, core 1 = stream_id 1.. etc to access their set of data

naeemb123 commented 7 years ago

I tried using: ebsp_host_sync(), ebsp_stream_seek(..) after reading in all the chunks. but it gives me the following error: 'BSP ERROR: Stream 0 has 8 space left, token of size 80 can not be moved up.'

for loop twice{ for loop chunks{ for loop chunk_size/sizeof(..){

         ...
     }
         ..
}
    ebsp_host_sync();
   ebsp_stream_seek(1,INT_MIN); 

}

Also when using ebsp_stream_move_up(); when printing the data received from the epiphany cores on the host side. an extra two elements with values 0 are added in the first two elements of each chunk.

e.g. chunk_size = 16 sizeof(float) chunk = 6; data = malloc(chunkchunk_size);

//create generic stream for upstream //create generic stream with data for downstream

//after ebsp_smpd();

//print upstream upstream[core][0] = 0.0 upstream[core][1] = 0.0 upstream[core][2-18] = correct output upstream[core][19] = 0.0 upstream[core][20] = 0.0

Not sure whether the data is being streamed back up correctly or not. I can attach my dummy test code here if it helps.

naeemb123 commented 7 years ago

If it makes it easier Tombana, If you can attach a simple sample code to show how to refill existing streams with new data using the new functions from the new version, that would make it clear how well the new version works?

Im working on a project for the University Of Glasgow as my dissertation. I hope to complete this project using your library.

Best Regards

Tombana commented 7 years ago

When opening the stream from the cores, would all the streams contains the same id ? or would each core have different stream id's ? e.g. core 0 = stream_id 0, core 1 = stream_id 1.. etc to access their set of data

The second statement is correct. When you create 16 streams, the first core can use stream_id = 0, the second core can use stream_id = 1 and so on.

I tried using: ebsp_host_sync(), ebsp_stream_seek(..) after reading in all the chunks. but it gives me the following error: 'BSP ERROR: Stream 0 has 8 space left, token of size 80 can not be moved up.'

When creating the streams on the host, you have to make sure they are large enough to accomodate for all chunks you plan on moving up into the stream from the core.

Regarding the first two elements of every chunk: you are correct. The first two elements of every chunk are used internally by EBSP. You can ignore them. They were not supposed to be outputted to the host, this is a known bug in EBSP. Untill we fix the bug I would recommend simply ignoring the first two elements of every chunk.

naeemb123 commented 7 years ago

"When creating the streams on the host, you have to make sure they are large enough to accomodate for all chunks you plan on moving up into the stream from the core." - Great, Thanks that error is fixed.

Im still not sure how to refill an existing stream: I tried the following:

float downstream = (float )malloc(float ) bsp_nprocs());

//loop to create generic down stream to send data to each core for loop downstream[coreID] = (float*)ebsp_stream_create(...CoreData);

(you previously mentioned the ebsp_stream_create() function returns a pointer to external memory you can rewrite to, so I tried the following)

//callback ebsp_set_sync_callback(callback);

//callbackfunction void callback() { printf("I am running") <<-- This works //write new data to the generic downstream memory for each core for loop each core for loop chunk*chunk_size/sizeof(float) times downstream[core][i] = newDATA; <<--re writing data to external memory }

But this gives the following error: BSP ERROR: Stream contained token larger (1084227584) than maximum token size (64) for stream (truncated)

All I want to do is replace the old data with new values. How would I achieve this?

Regards

Tombana commented 7 years ago

Thank you for noticing this. This is another bug that we were meaning to fix. There will be a better solution in the future, but for now, you can fill the streams using this function:

void fill_stream(void* stream, void* source, int total_size, int token_size) {
    unsigned dst_cursor = (unsigned)stream;
    unsigned src_cursor = (unsigned)source;

        int current_chunksize = token_size;
        int last_chunksize = 0;
        for (int nbytes_left = total_size; nbytes_left > 0;
             nbytes_left -= token_size) {
            if (nbytes_left < token_size)
                current_chunksize = nbytes_left;

            (*(int*)dst_cursor) = last_chunksize; // write prev header
            dst_cursor += sizeof(int);
            (*(int*)dst_cursor) = current_chunksize; // write next header
            dst_cursor += sizeof(int);

            memcpy((void*)dst_cursor, (void*)src_cursor, current_chunksize);

            dst_cursor += current_chunksize;
            src_cursor += current_chunksize;

            last_chunksize = current_chunksize;
        }
        // Write a terminating header
        (*(int*)dst_cursor) = current_chunksize; // write terminating header (prev)
        dst_cursor += sizeof(int);
        (*(int*)dst_cursor) = 0; // write terminating header (next)
        dst_cursor += sizeof(int);
}

Copy and paste this function to your project, and then use it like this in your callback:

// for each core
fill_stream(downstream[core], newDATA,  size_of_newDATA,  size_of_one_token);

The fill_stream function will then divide your data into chunks.

Unfortunately I have not had time to test this yet, so there might be errors.

naeemb123 commented 7 years ago

I tested the refill_stream function, it did not throw any errors but it did not update the data either. When I output the upstream data it gives me the same values as it did in the first iteration for the first set of values. Why might this be ? Here is my code:

HOST:

include

include

include

include

float downstream; float upstream;

float data; float data2; float *data3;

int chunk; int chunk_size;

int n;

void fill_stream(void stream, void source, int total_size, int token_size) { unsigned dst_cursor = (unsigned)stream; unsigned src_cursor = (unsigned)source;

    int current_chunksize = token_size;
    int last_chunksize = 0;
    for (int nbytes_left = total_size; nbytes_left > 0;
         nbytes_left -= token_size) {
        if (nbytes_left < token_size)
            current_chunksize = nbytes_left;

        (*(int*)dst_cursor) = last_chunksize; // write prev header
        dst_cursor += sizeof(int);
        (*(int*)dst_cursor) = current_chunksize; // write next header
        dst_cursor += sizeof(int);

        memcpy((void*)dst_cursor, (void*)src_cursor, current_chunksize);

        dst_cursor += current_chunksize;
        src_cursor += current_chunksize;

        last_chunksize = current_chunksize;
    }
    // Write a terminating header
    (*(int*)dst_cursor) = current_chunksize; // write terminating header (prev)
    dst_cursor += sizeof(int);
    (*(int*)dst_cursor) = 0; // write terminating header (next)
    dst_cursor += sizeof(int);

}

void callback(){ printf("i am running\n"); int j=0; int padding;

    //OUTPUT UPSTREAM RESULTS (DISCARDING FIRST TWO VALUES OF EACH CHUNK)
    for (int s=0; s < n; ++s){
        printf("coming from processor: %i\n",s);
        padding=2;
        for (int i=2; i < chunk * (chunk_size / sizeof(float)) + 12; i++){
            if (i == padding + 16){
            i++;
        padding += chunk_size/sizeof(float)+2;
        }
        else{
        printf("%d: upstream: %.2f\n",j,upstream[s][i]);
            j++;
            }
       }
    }
int c=0;
    for (int i=0; i<chunk * (chunk_size / sizeof(float)); i++){
        data[c] = 1.0;
        data2[c] = 2.0;
        data3[c] = 3.0;
        c++;
    }
    for (int core=0; core<n; core++){
    if (core == 0)
        fill_stream(downstream[core],data,chunk*chunk_size,chunk_size);
    if (core == 1)
    fill_stream(downstream[core],data2,chunk*chunk_size,chunk_size);
        if (core == 2)
            fill_stream(downstream[core],data3,chunk*chunk_size,chunk_size);
}

}

int main(int argc, char** argv) { // Initialize the BSP system bsp_init("e_hello.elf", argc, argv);

// Initialize the Epiphany system and load the binary

//n = bsp_nprocs();
n = 3;
bsp_begin(n);

chunk_size = sizeof(float) * 16;
chunk = 6;

upstream = (float **)malloc(sizeof(float*)*n);

downstream = (float **)malloc(sizeof(float*)*n);

data = (float*)malloc(chunk*chunk_size);
data2 = (float*)malloc(chunk*chunk_size);
data3 = (float*)malloc(chunk*chunk_size);

 //fill data
int c=0;
for (int i=0; i<chunk * (chunk_size / sizeof(float)); i++){
 data[c] = 20.0;
 data2[c] = 50.0;
 data3[c] = 100.0;
 c++;
}

//create upstreams for 3 cores (n=3)
for (int s=0; s<n; s++){
upstream[s] = (float*)ebsp_stream_create(s, (chunk*chunk_size)*2, chunk_size,0);
}

//create downstreams for 3 cores (n=3)
for (int s=0; s<n; ++s){
    if (s == 0)
downstream[s] = (float*)ebsp_stream_create(s,(chunk*chunk_size),chunk_size,data);
    if (s == 1)    
    downstream[s] = (float*)ebsp_stream_create(s,(chunk*chunk_size),chunk_size,data2);
    if (s == 2)
        downstream[s] = (float*)ebsp_stream_create(s,(chunk*chunk_size),chunk_size,data3);
 }

ebsp_set_sync_callback(callback);
// Run the program on the Epiphany cores
ebsp_spmd();

// Finalize
bsp_end();

return 0;

}

EPIPHANY:

include

include

int main() { bsp_begin();

int n = bsp_nprocs();
int p = bsp_pid();

ebsp_message("Hello world from core %d/%d", p, n);

int chunk_size = ebsp_stream_open(0);
int chunks = 6;

float *upstream = ebsp_malloc(chunk_size);
float *curUpstream = upstream;

float *data;

ebsp_stream_open(1);

for (int x=0; x<2; x++){ //runs twice to test if new data is read for (int i=0; i < chunks; ++i){ ebsp_stream_move_down(1,(void*)&data,0); for (int j=0; j < chunk_size/sizeof(float); ++j){ curUpstream[j] = data[j] 2.0; } ebsp_stream_move_up(0,curUpstream, chunk_size,0); }

ebsp_host_sync();
ebsp_stream_seek(1,INT_MIN);
//ebsp_reset_down_cursor(1);

}

ebsp_stream_close(0);
ebsp_stream_close(1);

bsp_end();

return 0;

}

naeemb123 commented 7 years ago

Also just to note incase it helps your team. When you said global indices for streams, this didn't seem the case as it threw errors when I opened streams i.e.

//host side

for loop 16 times ebsp_stream_create(coreid,...) <<-- upstreams

for loop 16 times ebsp_stream_create(coreid,...) <<-- downstreams

//epiphany side ebsp_stream_open(processor_id); <<--upstream

ebps_stream_open(processor_id + 16) <<--downstream, as the first 16 streams are used as upstreams for each core

But when I used it as local i.e. ebsp_stream_open(0) <-open upstream ebsp_stream_open(1) <- open downstream

it produced the right results when passing down different data to each core

Tombana commented 7 years ago

Apologies for the delayed response. Thank you for that last post. You are indeed correct about the global versus local indices. When you create two times 16 streams, you should open them with ebsp_stream_open(0); and ebsp_stream_open(1); like you said. I got confused with another Epiphany project where the streaming indices are different.

I am not sure why the streams are not working. Could you try exchanging the lines ebsp_host_sync(); and ebsp_stream_seek(1,INT_MIN); ? Try to first seek and then sync with the host:

ebsp_stream_seek(1,INT_MIN);
ebsp_host_sync();
naeemb123 commented 7 years ago

Hi Tombana. Hope you enjoyed the new year holidays.

I tried exchanging the two lines to seek first then sync, but it still does not update the stream with new data. Is this due to a problem with the new library version?