Closed hefehr closed 1 month ago
Hi @hefehr I have several concerns about this implementation.
Do you know what this would do if the destination file system is going to be a distributed file system like beegfs
?
I am afraid it would be too much metadata access to keep checking the filesystem space for every gulp for every file written, and infinitely rechecking it every 10 seconds if there is no space. If at all we are implementing this in the code, what we should do is to try except - i.e. attempt to write, and if there is an ioerror, check the message or the diskspace to understand if this is due to lack of space. And only then, wait_for_space()
with warning messages displayed for each check. There should also be a max_time
to come out of this and crash.
How would you handle if there are multiple processes checking this, and try to write at the same time? For me, this logic sounds like something that needs to be outside the program. i.e. never start the program if you do not have sufficient space to write out the entire dataset.
@ewanbarr any other comments?
If at all we are implementing this in the code, what we should do is to try except - i.e. attempt to write, and if there is an ioerror, check the message or the diskspace to understand if this is due to lack of space. And only then, wait_for_space() with warning messages displayed for each check. There should also be a max_time to come out of this and crash.
This sounds reasonable. I am currently not sure, how much BeegFS is POSIX conform and delivers an answer to space requests. How should a max_time be implemented? As a constant or something which comes as an argument?
For me, this logic sounds like something that needs to be outside the program. i.e. never start the program if you do not have sufficient space to write out the entire dataset.
That is going to be difficult since a RAM disk is too small right from the beginning, and we need to delete constantly files while the beamformer is running. If the deleting (and file transferring) program can't keep up we don't want the beamformer to crash.
It seems that write
is not throwing an exception if the disk is filled entirely. The badbit
is set, though. There is the error number ENOSPC
which could be collected and translated into English, but that is already too late. If a write process has been interrupted, and I start waiting for enough space, I need to close the file, delete it and reopen it again. Just repeating the write process would result in unpredictable behavior. In some tests, I saw that the file is suddenly twice as big as it was supposed to be.
Would it be possible to keep the filesystem size test in place and make it optional, with not doing it as the default?
If at all we are implementing this in the code, what we should do is to try except - i.e. attempt to write, and if there is an ioerror, check the message or the diskspace to understand if this is due to lack of space. And only then, wait_for_space() with warning messages displayed for each check. There should also be a max_time to come out of this and crash.
This sounds reasonable. I am currently not sure, how much BeegFS is POSIX conform and delivers an answer to space requests. How should a max_time be implemented? As a constant or something which comes as an argument?
That should depend on you. Maybe a
max_iterations
or have a check to see if the filesystem space is the same for the last X iterations and not being actively freed up.
I really don't want to do this on the beegfs side, so this needs to be OFF by default and switched on only via a flag.
You could implement this something like --wait X:Y:Z
where it would wait for a total of X
iterations or Y
iterations where the disk space has not changed, while sleeping for Z
seconds for every iteration.
For me, this logic sounds like something that needs to be outside the program. i.e. never start the program if you do not have sufficient space to write out the entire dataset.
That is going to be difficult since a RAM disk is too small right from the beginning, and we need to delete constantly files while the beamformer is running. If the deleting (and file transferring) program can't keep up we don't want the beamformer to crash.
I still don't understand how you will eliminate the race condition where a number of processes would check this, start writing and clash with each other.
If at all we are implementing this in the code, what we should do is to try except - i.e. attempt to write, and if there is an ioerror, check the message or the diskspace to understand if this is due to lack of space. And only then, wait_for_space() with warning messages displayed for each check. There should also be a max_time to come out of this and crash.
This sounds reasonable. I am currently not sure, how much BeegFS is POSIX conform and delivers an answer to space requests. How should a max_time be implemented? As a constant or something which comes as an argument?
That should depend on you. Maybe a
max_iterations
or have a check to see if the filesystem space is the same for the last X iterations and not being actively freed up.I really don't want to do this on the beegfs side, so this needs to be OFF by default and switched on only via a flag.
You could implement this something like
--wait X:Y:Z
where it would wait for a total ofX
iterations orY
iterations where the disk space has not changed, while sleeping forZ
seconds for every iteration.
I am in favor of this solution.
For me, this logic sounds like something that needs to be outside the program. i.e. never start the program if you do not have sufficient space to write out the entire dataset.
That is going to be difficult since a RAM disk is too small right from the beginning, and we need to delete constantly files while the beamformer is running. If the deleting (and file transferring) program can't keep up we don't want the beamformer to crash.
I still don't understand how you will eliminate the race condition where a number of processes would check this, start writing and clash with each other.
That is indeed a problem. I could diminish the problem by demanding as much available space on the file system as I have a maximum number of concurrent write processes times the max write size, which is hopefully homogenous across all write processes .... (Is it?). This wouldn't solve the problem entirely, though, since some external programs could in principle write to the file system as well. What about adding the number to the argument? --wait MaxIteration:SleepTime:MinAvailableSpace
Where MinAvailableSpace
could be given in Bytes (or GB or so) or in numbers of still possible write cycles before hitting the hard limit. I'd prefer the first option. If MaxIteration == 0
it could mean, that the process waits forever. I would leave out the number for iterations where the disk space has not changed since I expect that the disk space will not change for quite some iteration.
The problem is that the program, which sends away the files, might stuck in a rsync process because the GPU node is done or sluggish. In this case, we plane to start erasing the affected TDB files on the beamformer. But during that time the available size will stay constant.
Ok, I added the wait-for-space
option. I added a WaitStruct
into the PipelineConfig
class and pass the config into the MultiFileWriter
class and from there into the FileOutputStream
class. I needed to pass the config into the File::write
, but the constructor is certainly being called somewhere, so _config
could be defined also the in File
class. I was thinking to pass just the wait config struct through, but I don't actually know where a global structs definition should be placed.
Hi Henning,
The FileOutputStream
and File
classes cannot take the pipeline_config
object. They are supposed to be generic classes and are used for a number of file writes. Some of these are outside the beamformer. For example, the skycleaver (the code that does filterbanking that you guys aren't using) also uses the same stream but with a different config. So we need to come up with a solution that works for all kinds of call to this class. I will have a look and come up with some suggestions.
Could it be done in the MultiFileWriter
class around _file_streams.at(stream_idx)->write(....)
in lines 251 and 256 of MultiFileWrite.cu
? In this way, we do not need to pass pipeline_config
into FileOutputStream
.
Hmmm, I am not sure, whether it works. The create_stream
method in MultiFileWriter
fills the _file_streams
map. From that point on, FileOutputStream
takes over, and I only can control the filesystem size in this class from that point on.
Catching up on this. I think this feature is pretty dangerous and potentially limits our ability to make future changes.
My main issue is thread safety. Currently there is nothing that prevents us putting all file writers in different threads. There is is also no synchronisation required between those threads.
Here you would necessarily need to use some kind of synchronisation barrier for all writers to ensure that once the file system was checked for space that space did not disappear during writing. Additionally you cannot guarantee the filesystem space anyway as other processes may use it (although I assume you will do this by tightly controlling your execution environment).
If you want to add this feature regardless, it should be done via a pre-write callback that is passed to the FileOutputStream class. That way you can inject shared state (and if needed synchronisation) into all writers transparently.
Note, as MultiFileWriter does take the pipeline config, the decision to inject the pre_write_callback to the FileStream can be made there. This would be best done by adding a new constructor to FileStream to take the callback.
Callback should be of the form std::function<void(char const*, std::size_t)>
.
void FileStream::write(char const* ptr, std::size_t bytes)
{
BOOST_LOG_TRIVIAL(debug) << "Writing " << bytes << " bytes to file stream";
if(_current_file) {
// CALLBACK WOULD BE EXECUTED HERE
std::size_t bytes_written = _current_file->write(ptr, bytes);
_total_bytes_written += bytes_written;
if(bytes_written < bytes) {
new_file();
write(ptr + bytes_written, bytes - bytes_written);
}
} else {
new_file();
write(ptr, bytes);
}
}
Note, as MultiFileWriter does take the pipeline config, the decision to inject the pre_write_callback to the FileStream can be made there. This would be best done by adding a new constructor to FileStream to take the callback.
Callback should be of the form
std::function<void(char const*, std::size_t)>
.void FileStream::write(char const* ptr, std::size_t bytes) { BOOST_LOG_TRIVIAL(debug) << "Writing " << bytes << " bytes to file stream"; if(_current_file) { // CALLBACK WOULD BE EXECUTED HERE std::size_t bytes_written = _current_file->write(ptr, bytes); _total_bytes_written += bytes_written; if(bytes_written < bytes) { new_file(); write(ptr + bytes_written, bytes - bytes_written); } } else { new_file(); write(ptr, bytes); } }
I leave the FileOutputStream
class untouched now. The wait method is in the MultiFileWrite
class.
Ok in the MultiFileWrite::operator()() call is fine. I would still prefer this was done via a callback rather than putting this logic directly in the class.
There is still the potential runtime error here that the data type passed to MultiFileWrite::operator()() is larger that min_free_space in which case the system will fail on write anyway. Size checking the write seems sensible.
Ok in the MultiFileWrite::operator()() call is fine. I would still prefer this was done via a callback rather than putting this logic directly in the class.
I can do that.
There is still the potential runtime error here that the data type passed to MultiFileWrite::operator()() is larger that min_free_space in which case the system will fail on write anyway. Size checking the write seems sensible.
Using the debug flag tells me, that the file sizes are absurd high, like 10PB. I think this is the default of max_output_filesize
.
Note, as MultiFileWriter does take the pipeline config, the decision to inject the pre_write_callback to the FileStream can be made there. This would be best done by adding a new constructor to FileStream to take the callback. Callback should be of the form
std::function<void(char const*, std::size_t)>
.void FileStream::write(char const* ptr, std::size_t bytes) { BOOST_LOG_TRIVIAL(debug) << "Writing " << bytes << " bytes to file stream"; if(_current_file) { // CALLBACK WOULD BE EXECUTED HERE std::size_t bytes_written = _current_file->write(ptr, bytes); _total_bytes_written += bytes_written; if(bytes_written < bytes) { new_file(); write(ptr + bytes_written, bytes - bytes_written); } } else { new_file(); write(ptr, bytes); } }
Since we want to use callback functions in the MultiFileWriter
class I need to ask here something. AFAIK we need to address the callback function in the argument list of the function. If this is true, we need to modify the existing methods anyway. If we use a callback function, to which class should it belong?
You have two options. Either create a new constructor that is in addition to the current constructor that takes the callback, or add setter/getter methods for the callback. The former case is something like:
#include <functional>
class MultiFileWriter
{
public:
using PreWriteCallback = std::function<void(std::size_t)>;
// Original constructor
MultiFileWriter()
: _pre_write_callback([](std::size_t){})
{}
// New constructor
MultiFileWriter(PreWriteCallback&& callback)
: _pre_write_callback(callback)
{}
void operator()(std::size_t nbytes)
{
_pre_write_callback(nbytes);
// do other stuff
}
private:
PreWriteCallback _pre_write_callback;
};
int main() {
MultiFileWriter mfw{};
mfw(10);
// vs
MultiFileWriter mfw2{[](std::size_t n){std::cout << n << "\n";}};
mfw2(10);
return 0;
}
Other option is:
class MultiFileWriter
{
public:
using PreWriteCallback = std::function<void(std::size_t)>;
// Original constructor
MultiFileWriter()
: _pre_write_callback([](std::size_t){})
{}
void set_pre_write_callback(PreWriteCallback&& callback)
{
_pre_write_callback = callback;
}
void operator()(std::size_t nbytes)
{
_pre_write_callback(nbytes);
// do other stuff
}
private:
PreWriteCallback _pre_write_callback;
};
In both cases the callback should be copyable and moveable and if it requires shared state then the state should be shared via the copy constructor (e.g. but having std::shared_ptr members)
As a note, the default {} no-op is not needed. The truthiness of the callback can just be checked in the operator and if it isn't set then it isn't called.
On the point of the size, all the data types passed to operator() have a size() and a ::value_type so you can do:
...
bool MultiFileWriter<VectorType>::operator()(VectorType const& stream_data,
std::size_t stream_idx)
{
std::size_t bytes_to_write = stream_data.size() * sizeof(VectorType::value_type);
...
}
Ok, implemented callback functions.
Ok that is looking pretty good. I thought that the logic below would have been hidden inside the callback, but I can see that that would make stack construction of the writer instances a little awkward.
if (_config.get_wait_config().is_enabled) {
try {
_pre_write_callback(data_size, _config);
} catch (std::runtime_error& e) {
std::cout << "Wait loop exception: " << e.what() << std::endl;
throw;
}
}
That could be solved by using unique_ptrs, e.g.:
using StatsWriterType = skyweaver::MultiFileWriter<skyweaver::FPAStatsD<skyweaver::Statistics>>;
std::unique_ptr<StatsWriterType> stats_handler;
if (_config.get_wait_config().is_enabled)
{
stats_handler = std::make_unique<StatsWriterType>(config, "stats", pre_write_callback);
} else {
stats_handler = std::make_unique<StatsWriterType>(config, "stats");
}
Then just dereference later when it is used.
Currently the way this is implemented the only callback that is possible to be passed is the one for the waiting, because it will only be executed when the _config.get_wait_config().is_enabled
condition is true.
At this point I defer to @vivekvenkris though on what he wants here as the proposed solution is fine and if he doesn't have other strong use cases for the pre_write_callback then we can just go with what is here.
I just caught up to this thread. Just FYI, pipeline_config
is no longer explicitly a member of MultiFileWriter
class as I have the same problem of not being able to use that generically. There is now a MultiFileWriterConfig
in the skycleaver
branch -- this is why I originally mentioned that you pull into that branch instead. Anyway, Once we accept the pull request here, I will do the changes required when merging skycleaver
to pipeline_dev
. The constructor that takes pipeline_config
still exists, so this is a trivial change.
I will give specific comments on the changes per file now.
pre_write_callback
Shall I only change it for the tag stats
or for all other objects as well? (ib
and twice cb
)
I just caught up to this thread. Just FYI,
pipeline_config
is no longer explicitly a member ofMultiFileWriter
class as I have the same problem of not being able to use that generically. There is now aMultiFileWriterConfig
in theskycleaver
branch -- this is why I originally mentioned that you pull into that branch instead. Anyway, Once we accept the pull request here, I will do the changes required when mergingskycleaver
topipeline_dev
. The constructor that takespipeline_config
still exists, so this is a trivial change.I will give specific comments on the changes per file now.
I should have asked beforehand.
pre_write_callback
Shall I only change it for the tag
stats
or for all other objects as well? (ib
and twicecb
)
HI Henning,
I think we should be for everything. So something like
if(_config.get_wait_config().is_enabled) {
// call constructor for ib, cb and stats handlers with callback
}
else{
// call constructors without this callback
}
Also, on further thoughts, I realised that in the skycleaver
branch, we have additional callbacks that tell what kind of stream that MultiFileWriter
needs to produce: DADA vs Filterbank streams. So, the constructors are different from what they are in pipeline_dev
. If it is not too much work, can you port these changes there, so that we don't have several merge conflicts?
You can't do the constructor call in the if-else statement as it will go out of scope without a copy or move. You can do conditional constructor calls with an immediately invoked lambda. e.g.
struct Test
{
Test(){}
Test(int){}
};
int main()
{
int x = 3;
Test t = [&](){
if (x == 3)
{
return Test{3};
} else {
return Test{};
}
}();
return 0;
}
Return value optimisation is guaranteed on the lambda since C++17, so no move or copy.
I create a new pull request into skycleaver. I close this one. It is codewise very similar.
This doesn't add much overhead and helps us to write the output to the ramdisk, which needs to cleaned frequently. If the cleaning process can't catch up, the beamformer waits.
There are two open questions:
Answering one or both questions with yes requires some additional code changes on the CLI argument list.