schveiguy / iopipe

D language library for modular io
Boost Software License 1.0
77 stars 6 forks source link

An example with file IO ? #22

Open biocyberman opened 6 years ago

biocyberman commented 6 years ago

Hi Steve

I don't know this would go against the "pipe" intention of iopipe. So, just want to ask if possible: Can you make an example with file input and output for the new iopipe version (0.1.2)? I tried but failed with std.io:

module fastaq.fastaq;
import iopipe.textpipe;
import iopipe.bufpipe;
import std.getopt;
import fastaq.fasta;
import std.io;
import std.typecons;
import std.stdio: stderr, writeln, writef, writefln;

fastaIn = "myseq.fasta";
auto fhIn = File(fastaIn);
  fhIn
    .refCounted
    .bufd
    .runWithEncoding!filterFastaRange(filterRegex, filterField)
    .each!writeln(fhOut);
}
Error: struct `std.io.file.File` is not copyable because it is annotated with `@disable`
schveiguy commented 6 years ago

the issue is that you are making a copy of fhIn when passing it to refCounted.

You need to pass it on construction, or move it (this is outlined in the README in std.io):

auto fhIn = File(fastaIn).refCounted;
// or
auto fhIn = File(fastaIn);
import std.algorithm: move;
fin.move.refCounted
   .bufd
   ...

One thing we need to figure out in std.io is how to make this less annoying. People expect to be able to use their streams without having to worry about such things. I have considered making bufd or rbufd automatically use refCounted when it detects it can't copy the original. Maybe that's the easiest solution.

schveiguy commented 6 years ago

I should still make an example, I did update the README, but actually, I need to make ALL the examples stop using openDev and use std.io directly instead, since I didn't support openDev in Windows.

biocyberman commented 6 years ago

Thanks for pointing out the seeming subtle difference of usage of refCounted and move between my incorrect way, and the correct way. Going further, my code failed because it can not parse chain like it does with a normal multi-line text input in the unittest. Unittest in fasta.d itself still passes after upgrading iopipe version. Therefore, I am clueless about why the following functions would provoke something like:

cannot implicitly convert expression `entry.entryid` of type `char[]` to `string`
// an example program that uses fasta.d with the new iopipe version 
module fastaq.fastaq;
import iopipe.textpipe;
import iopipe.bufpipe;
import std.getopt;
import fastaq.fasta;
import std.io;
import std.typecons;
import std.stdio: stderr, writeln, writef, writefln;

// Determine field type
FieldType getFieldType(string field)
{
  import std.string;
  field = toLower(field);
  if (startsWith(field, "id")) return FieldType.id;
  else if (startsWith(field, "field")) return FieldType.fields;
  else if (startsWith(field, "def")) return FieldType.defline;
  else if (startsWith(field, "seq")) return FieldType.seq;
  else throw new Exception ("Unknown field type");

}

// filter fasta
auto filterFastaRange(Chain)(Chain chain, string re, FieldType ft)
{
 import std.algorithm : filter;
 auto r = chain.fastaRange;
 auto f = r.filter!(e =>filterEntry(e, re, ft));
 return f;
}

// run filter
auto runFilter(string[] args)
{
  import std.algorithm : filter;
  string fastaIn;
  string fastaOut;
  string filterRegex;
  FieldType filterField;

  auto helpInfo = getopt(args,
                         std.getopt.config.required,
                         "i", &fastaIn,
                         std.getopt.config.required,
                         "o", &fastaOut,
                         std.getopt.config.required,
                         "f", &filterRegex,
                         std.getopt.config.required,
                         "t", &filterField,
                         );
  if(helpInfo.helpWanted)
    {
      defaultGetoptPrinter("usage", helpInfo.options);
      return 1;
    }

  auto fhIn = File(fastaIn).refCounted;
  auto fhOut = File(fastaOut).refCounted;

  fhIn
    .bufd
    .runEncoded!filterFastaRange(filterRegex, filterField).each!writeln(fhOut);
  return 0;
}

int main(string[] args)
{
  import std.traits; // Gets EnumMembers
  enum Commands { filter }
  Commands command;

  auto helpInfo = getopt(args,
                         std.getopt.config.passThrough,
                         "command", &command,
                         );
  if(helpInfo.helpWanted)
    {
      defaultGetoptPrinter("usage", helpInfo.options);
      return 1;
    }

  if(args.length == 0)
    {
      stderr.writeln("Please choose a command to run");
      foreach (immutable cmd; [EnumMembers!Commands])
        stderr.writef("%s ", cmd);
      stderr.writeln();
      return 1;
    }
  switch (command)
    {
    case Commands.filter:
      runFilter(args);
      break;
    default:
      writeln("Unknown command: ", command);
    }
  return 0;
}

Compile error:

dlang_dmd  && dub --root=../../ build :fastaq
Building package fastaq:fastaq in /home/user/repositories/github/fastaq/
Performing "debug" build using /home/user/dlang/dmd-2.080.0/linux/bin64/dmd for x86_64.
io 0.2.1: target for configuration "library" is up to date.
iopipe 0.1.2: target for configuration "library" is up to date.
fastaq:fastaq ~dev: building configuration "application"...
../../source/fasta/fasta.d(293,13): Error: cannot implicitly convert expression `entry.entryid` of type `char[]` to `string`
../../source/fasta/fasta.d(303,13): Error: cannot implicitly convert expression `entry.defline` of type `char[]` to `string`
../../source/fasta/fasta.d(306,13): Error: cannot implicitly convert expression `entry.sequence` of type `char[]` to `string`
fastaq.d(30,36): Error: template instance `fastaq.fasta.filterEntry!(FastaConcreteToken!(char[]))` error instantiating
/home/user/dlang/dmd-2.080.0/linux/bin64/../../src/phobos/std/algorithm/iteration.d(1162,38):        instantiated from here: `__lambda4!(FastaConcreteToken!(char[]))`
/home/user/dlang/dmd-2.080.0/linux/bin64/../../src/phobos/std/algorithm/iteration.d(1120,16):        instantiated from here: `FilterResult!(__lambda4, FastaRange!(Result))`
fastaq.d(30,12):        instantiated from here: `filter!(FastaRange!(Result))`
/home/user/.dub/packages/iopipe-0.1.2/iopipe/source/iopipe/textpipe.d(1098,20):        ... (2 instantiations, -v to show) ...
/home/user/.dub/packages/iopipe-0.1.2/iopipe/source/iopipe/textpipe.d(1101,37):        instantiated from here: `runWithEncoding!(forwarder, true, BufferedInputSource!(AllocatedBuffer!(ubyte, GCNoPointerAllocator, 8192LU), RefCounted!(File, cast(RefCountedAutoInitialize)0), 8192LU), string, FieldType)`
fastaq.d(64,33):        instantiated from here: `runEncoded!(filterFastaRange, BufferedInputSource!(AllocatedBuffer!(ubyte, GCNoPointerAllocator, 8192LU), RefCounted!(File, cast(RefCountedAutoInitialize)0), 8192LU), string, FieldType)`
../../source/fasta/fasta.d(86,12): Error: cannot implicitly convert expression `strip(this.defline) ~ "\x0a" ~ strip(this.sequence) ~ "\x0a"` of type `char[]` to `string`
../../source/fasta/fasta.d(67,5): Error: template instance `fastaq.fasta.FastaConcreteToken!(char[])` error instantiating
../../source/fasta/fasta.d(232,34):        instantiated from here: `value!(char[])`
../../source/fasta/fasta.d(244,17):        instantiated from here: `FastaRange!(Result)`
fastaq.d(29,16):        instantiated from here: `fastaRange!(ArrayCastPipe!(BufferedInputSource!(AllocatedBuffer!(ubyte, GCNoPointerAllocator, 8192LU), RefCounted!(File, cast(RefCountedAutoInitialize)0), 8192LU), char))`
/home/user/.dub/packages/iopipe-0.1.2/iopipe/source/iopipe/textpipe.d(1098,20):        ... (2 instantiations, -v to show) ...
/home/user/.dub/packages/iopipe-0.1.2/iopipe/source/iopipe/textpipe.d(1101,37):        instantiated from here: `runWithEncoding!(forwarder, true, BufferedInputSource!(AllocatedBuffer!(ubyte, GCNoPointerAllocator, 8192LU), RefCounted!(File, cast(RefCountedAutoInitialize)0), 8192LU), string, FieldType)`
fastaq.d(64,33):        instantiated from here: `runEncoded!(filterFastaRange, BufferedInputSource!(AllocatedBuffer!(ubyte, GCNoPointerAllocator, 8192LU), RefCounted!(File, cast(RefCountedAutoInitialize)0), 8192LU), string, FieldType)`

For convenience, here is the current fasta.d I am working on:

/**
 * proof of concept for using iopipe to parse fasta data
 *
 * Format:
 * >Entry1_ID header field1|header field2|...
 * CAGATATCTTTGATGTCCTGATTGGAAGGACCGTTGGCCCCCCACCCTTAGGCAG
 * TGTATACTCTTCCATAAACGAGCTATTAGTTATGAGGTCCGTAGATTGAAAAGGG
 * TGACGGAATTCGGCCGAACGGGAAAGACGGACATCTAGGTATCCTGAGCACGGTT
 * GCGCGTCCGTATCAAGCTCCTCTTTATAGGCCCCG
 * >Entry2_ID header field1|header field4|...
 * GTTACTGTTGGTCGTAGAGCCCAGAACGGGTTGGGCAGATGTACGACAATATCGCT
 * TAGTCACCCTTGGGCCACGGTCCGCTACCTTACAGGAATTGAGA
 *
 * >Entry3_ID header field1|header field2|...
 * GGCAGTACGATCGCACGCCCCACGTGAACGATTGGTAAACCCTGTGGCCTGTGAGC
 * GACAAAAGCTTTAATGGGAAATACGCGCCCATAACTTGGTGCGA
 *
 * Some characteristics:
 *
 * - Entry_ID is >[[:alphanumeric:]]. Where '>' marks the entry start.
 * - Headers may contain annotation information separated by some delimiter (i.e. | in this case).
 * - Entry ID and header is a single line, which does not contain newline characters.
 * - Sequence under the header line is [ATCGN\n]* (Perl regex).
 * - A fasta file can be plain-text or gzip compressed.
 */

/**
 * TODO
 * Implement
 * filter (i.e. by pattern on header),
 * writer
 * reverse complement
 *
 */

module fastaq.fasta;

import iopipe.traits;
import iopipe.textpipe;
private import std.traits;
private import std.range.primitives;
private import std.algorithm : find, splitter, filter;
private import std.conv: to;
private import std.string : stripLeft, stripRight, strip;
import fastaq.common.utils;

struct FastaToken
{
  BufRef defline;
  size_t endPos;
  BufRef entryid;
  BufRef[] fields;
  BufRef sequence;

  void release(size_t elements)
  {
    endPos -= elements;
    defline.release(elements);
    entryid.release(elements);
    sequence.release(elements);
    foreach(ref f; fields) f.release(elements);
  }

  auto value(B)(B buf)
  {
    FastaConcreteToken!B result;
    result.defline = defline.value(buf);
    result.entryid = entryid.value(buf);
    result.fields = new B[fields.length];
    foreach(i, ref f; fields)
      result.fields[i] = f.value(buf);
    result.sequence = sequence.value(buf);
    return result;
  }
}

struct FastaConcreteToken(R)
{
  R defline;
  R entryid;
  R[] fields;
  R sequence;
  string toString()
  {
    return defline.strip ~ "\n" ~ sequence.strip ~  "\n" ;
  }
}

auto tokenParser(Chain, char header = '>', char fieldsep = '|')(Chain c) if (isIopipe!Chain && isSomeChar!(ElementEncodingType!(WindowType!Chain)))
{
  auto lines = c.byLine;
  alias ChainType = typeof(lines);
  static struct Result
  {
    ChainType chain;
    size_t pos;
    alias chain this;

    FastaToken nextToken()
    {
      if(pos == chain.window.length)
        // reaches the end of the stream
        return FastaToken.init;
      // pos must start with a start identifier
      assert(chain.window[pos] == header);
      // the header is the current line
      FastaToken result;
      result.defline = BufRef(pos, chain.window.length - pos - 1);
      auto fields = chain.window[pos .. $].stripRight.splitter(fieldsep);
      if(!fields.empty)
        {
          auto firstElemSize = fields.front.length;
          auto firstField = fields.front.find(' ');
          result.entryid = BufRef(pos + 1, firstElemSize - firstField.length - 1);
          if(firstField.length > 0)
            {
              firstField = firstField.stripLeft;
              result.fields ~= BufRef(pos + (firstElemSize - firstField.length), firstField.length);
            }
          pos += firstElemSize;

          fields.popFront;
          pos += 1; // skip newline or |
          foreach(f; fields)
            {
              if(!f.empty)
                result.fields ~= BufRef(pos, f.length);
              pos += f.length + 1;
            }
        }

      // parse all the sequence
      auto seqStart = pos;
      while(chain.extend(0) != 0)
        {
          if(chain.window[pos] == header)
            break;
          pos = chain.window.length;
        }

      auto seqData = chain.window[seqStart .. pos].stripLeft;
      seqStart = pos - seqData.length;
      seqData = seqData.stripRight;
      result.sequence = BufRef(seqStart, seqData.length);
      result.endPos = pos;
      return result;
    }

    void release(size_t elements)
    {
      pos -= elements;
      chain.release(elements);
    }
  }

  // prime the lines item
  lines.extend(0);
  while (lines.window.strip.empty)
      {
        lines.release(lines.window.length);
        lines.extend(0);
      }
    return Result(lines);
  }

/// Input data for unittests
version(unittest){
  immutable auto input = "\n" ~ ">EntryId1 field1|field2|field3\n" ~
    "ACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGT\n" ~
    "ACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGT\n" ~
    "ACGTACGTACGTACGTACGTACG \n" ~
    "\n" ~
    ">EntryId2 field3|field4|length > 3\n" ~
    " ACGT \n" ~
    " ACG \n";
  import std.stdio;
}

unittest
{
    auto tokenizer = input.tokenParser;
    auto item1 = tokenizer.nextToken;
    assert(item1.entryid.value(tokenizer.window) == "EntryId1");
    assert(item1.fields.length == 3);
    assert(item1.fields[0].value(tokenizer.window) == "field1");
    assert(item1.fields[1].value(tokenizer.window) == "field2");
    assert(item1.fields[2].value(tokenizer.window) == "field3");
    assert(item1.defline.value(tokenizer.window) == ">EntryId1 field1|field2|field3");
    auto seq = item1.sequence.value(tokenizer.window);
    assert(seq[0] == 'A');
    assert(seq[$-1] == 'G');
    import std.range: cycle;
    import std.ascii: isWhite;
    import std.algorithm: filter, startsWith;
    assert(cycle("ACGT").startsWith(seq.filter!(a => !a.isWhite)));

    auto item2 = tokenizer.nextToken;

    assert(item2.entryid.value(tokenizer.window) == "EntryId2");
    assert(item2.fields.length == 3);
    assert(item2.fields[0].value(tokenizer.window) == "field3");
    auto field4 = item2.fields[1].value(tokenizer.window);
    assert(field4 == "field4", "got: " ~  field4);
    auto fieldspecial = item2.fields[2].value(tokenizer.window);
    assert(fieldspecial == "length > 3", "Expect 'length > 3' got: " ~ fieldspecial);
    seq = item2.sequence.value(tokenizer.window);
    assert(seq.filter!(a => !a.isWhite).to!string == "ACGTACG", "Expected: ACGTACG, got: " ~ seq);

    auto item3 = tokenizer.nextToken;
    assert(item3.entryid.length == 0);

    tokenizer.release(item1.endPos);
    item2.release(item1.endPos);

    auto concrete = item2.value(tokenizer.window);

    assert(concrete.entryid == "EntryId2");
    assert(concrete.fields.length == 3);
    assert(concrete.fields[0] == "field3");
    assert(concrete.fields[1] == "field4");
    seq = concrete.sequence;
    assert(seq.filter!(a => !a.isWhite).to!string == "ACGTACG", "Expected: ACGTACG, got: " ~ seq);

}

struct FastaRange(Chain)
{
  private Chain chain; // the source parser
  private FastaToken tok; // the buffer reference

  auto front() { return tok.value(chain.window); }
  bool empty() { return tok.endPos == 0; }
  void popFront()
  {
    chain.release(tok.endPos);
    tok = chain.nextToken;
  }
}

auto fastaRange(Chain)(Chain chain)
{
  auto tokenizer = chain.tokenParser;
  auto result = FastaRange!(typeof(tokenizer))(tokenizer);
  result.popFront(); // prime the range, this properly stores the token and advances the iopipe
  return result;
}

/// Range unittest
unittest{
  auto tokenizer2 = input.tokenParser;
  alias ChainType = typeof(tokenizer2);
  assert(isIopipe!ChainType, "ChainType is not an Iopipe");
  auto r = FastaRange!ChainType(tokenizer2);
  r.popFront;
  writeln(r.front.entryid);
  auto tkz2i1 = r.front;
  assert(r.front.entryid == "EntryId1", "Got: " ~ r.front.entryid);
  r.popFront;
  assert(r.front.entryid == "EntryId2", "Got: " ~ r.front.entryid);

  auto r2 = input.fastaRange;
  assert(r2.front.entryid == "EntryId1", "Got: " ~ r.front.entryid);
}

/// Utility functions
auto countEntries(R)(R fastaRange)
{
  size_t i;
  foreach(e; fastaRange) i++;
  return i;
}

enum FieldType { id, fields, defline, seq}
/**
 * Params:
 *     entry = a FastaConcreteToken item
 *     regex = a regex string used for matching
 *     field = type of the piece of information that `regex` is matched with
 */
bool filterEntry(T)(T entry, string re, FieldType field = FieldType.id)
{

  import std.regex;
  string str;
  auto rx = regex(re);
  switch (field)
    {
    default:
      throw new Exception("Unknown field type");
    case FieldType.id:
      str = entry.entryid;
      break;
    case FieldType.fields:
      foreach(f; entry.fields)
        {
          if (!matchFirst(f, rx).empty)
            return true;
        }
      return false;
    case FieldType.defline:
      str = entry.defline;
      break;
    case FieldType.seq:
      str = entry.sequence;
      break;

    }
  auto c = matchFirst(str, rx);
  return !c.empty;

}

unittest{
  auto r2 = input.fastaRange;
  assert(r2.countEntries == 2);
  // exact match
  assert(filterEntry(r2.front, "EntryId1"));
  // partial match
  assert(filterEntry(r2.front, "EntryId"));
  // no match
  assert(!filterEntry(r2.front, "EntryID"));

  // match field1;
  assert(filterEntry(r2.front, "field1", FieldType.fields));
  // no matching Field1
  assert(!filterEntry(r2.front, "Field1", FieldType.fields));

  // Filtering
  auto f = r2.filter!(e =>filterEntry(e, "field4", FieldType.fields));
  writeln("f Type: ", typeof(f).stringof);
}
schveiguy commented 6 years ago

Haven't done a thorough dive, but it looks like a const/immutable issue. Note that when reading data from a file, it's going to come in as a char[] buffer (as it needs to be overwritten as the file is read), whereas the string version will come in as a string, which is immutable(char)[].

You can probably replicate the error in unittests without using your file input by changing input.tokenParser to input.dup.tokenParser. Indeed, we have to fix this bug.

biocyberman commented 6 years ago

That's exactly it. Doing casting all over the place in doesn't seem right. So I am thinking window method should be updated to handle this.

biocyberman commented 6 years ago

Alternatively going through lines by "byLineRange" may not have this problem, as I can see from the latest example in the iopipe's README file. If am tempted to rewrite tokenParser using byLineRange instead of byLine. Will this avoids using window method and pos?

schveiguy commented 6 years ago

I don't think that's the right way. byLineRange will release previous lines, so you won't have the 4-line buffer to extract your token.

The tokenParser should work with char[] buffer, it's probably an easy fix. When I get some time, I will look into it.

schveiguy commented 6 years ago

OK, I had a few minutes to debug this.

The two problems are:

bool filterEntry(T)(T entry, string re, FieldType field = FieldType.id)
{

  import std.regex;
  string str; // << problem 1
  auto rx = regex(re);
  switch (field)
    {
    default:
      throw new Exception("Unknown field type");
    case FieldType.id:
      str = entry.entryid;
      break;
...
}

struct FastaConcreteToken(R)
{
  ...
  string toString()
  {
    return defline.strip ~ "\n" ~ sequence.strip ~  "\n" ; // << problem 2
  }

The first problem, you are assuming the type of the input window is "string". This is somewhat easy to fix:

typeof(entry.entryid) str;

You can solve it other ways, maybe try to extract the window parameter from the template, but this is the most straitforward way.

The second problem is trickier. toString MUST return a string. So here, we have to use std.format.format:

  string toString()
  {
      import std.format : format;
      return format("%s\n%s\n", defline.strip, sequence.strip);
  }

Or alternatively, if you don't want to incur unnecessary allocations, toString can take a delegate with a char[] (see documentation for formatValue:

  void toString(scope void delegate(const(char)[]) sink)
  {
      static if(is(R : const(char[])))
      {
          sink(defline.strip);
          sink("\n");
          sink(sequence.strip);
          sink("\n");
      }
      else
          static assert(0, "implement me for " ~ R.stringof ~ "!");
  }

Note, I took the easy way out and avoided dealing with other width characters 😉

biocyberman commented 6 years ago

Some progress so far. However I bumped into another problem. So I think it is more convenient if you could check the code directly. I created a cloud9 IDE instance and sent you an email. Could you login and check?