mechatroner / RBQL

🦜RBQL - Rainbow Query Language: SQL-like query engine for (not only) CSV file processing. Supports SQL queries with Python and JavaScript expressions.
https://rbql.org
MIT License
281 stars 13 forks source link

Add a string (or stream) API for CSV #47

Open agladysh opened 1 year ago

agladysh commented 1 year ago

I was looking for an easy way to handle CSV as strings (not files) today, and had to hack together a solution based on query_csv(), below.

That could be a documentation issue (or even lack of attention on my end), but it seems that there are low-level (browser) APIs and a high-level (node) API, but no middle ground.

It might be nice to expose some friendly API to support middle ground use cases. The boilerplate is rather long now.

Or, for my case, just support CSV strings, if that is in scope of the library.

If I miss something and there is an easier solution, please do share. :-)

Sorry for the typescript, if you're interested, I can try to submit a pure-JS PR, provided you share your preferences on the API design.

interface QueryFileOptions {
  /**
   * Whether the CSV file has a header row.
   * Default false.
   */
  hasHeader: boolean;

  /**
   * Comment prefix to treat lines as comments.
   * Lines starting with this prefix will be ignored.
   */
  commentPrefix: string;

  /**
   * Input CSV encoding. Default 'utf-8'.
   */
  encoding: 'utf-8' | 'latin-1' | 'binary';

  /**
   * Input CSV field delimiter. Default ','.
   */
  inputDelimiter: string;

  /**
   * Input CSV parsing policy. Default 'quoted_rfc'.
   */
  inputPolicy: 'simple' | 'quoted' | 'quoted_rfc';

  /**
   * Output CSV field delimiter. Default `inputDelimiter`.
   */
  outputDelimiter: string;

  /**
   * Output CSV formatting policy. Default `inputPolicy`.
   */
  outputPolicy: 'simple' | 'quoted' | 'quoted_rfc';
}

function loadOptions(query: string, partialOptions?: Partial<QueryFileOptions>): QueryFileOptions {
  const defaultOptions: Partial<QueryFileOptions> = {
    hasHeader: true,
    encoding: 'utf-8',
    inputDelimiter: ',',
    inputPolicy: 'quoted_rfc'
  };

  const options = { ...defaultOptions, ...partialOptions };
  options.outputDelimiter ??= options.inputDelimiter;
  options.outputPolicy ??= options.inputPolicy;

  if (options.encoding === 'latin-1') {
    options.encoding = 'binary';
  }

  if (options.inputDelimiter === '"' && options.inputPolicy === 'quoted') {
    throw new RbqlIOHandlingError('Double quote delimiter is incompatible with "quoted" policy');
  }

  if (!isAscii(query) && options.encoding === 'binary') {
    throw new RbqlIOHandlingError(
      'To use non-ascii characters in query enable UTF-8 encoding instead of latin-1/binary'
    );
  }
  if ((!isAscii(options.inputDelimiter!) || !isAscii(options.outputDelimiter!)) && options.encoding === 'binary') {
    throw new RbqlIOHandlingError(
      'To use non-ascii characters in query enable UTF-8 encoding instead of latin-1/binary'
    );
  }

  return options as QueryFileOptions;
}

// TODO: Extend for several tables?
class JoinRegistry implements RBQLJoinTableRegistry {
  private recordIterator: RBQLInputIterator;

  constructor(recordIterator: RBQLInputIterator) {
    this.recordIterator = recordIterator;
  }

  get_iterator_by_table_id(tableId: string) {
    if (tableId !== 'b') {
      throw new RbqlIOHandlingError(`Unable to find join table "${tableId}"`);
    }

    return this.recordIterator;
  }

  // TODO: Implement?
  get_warnings(_warnings: string[]) {  }
}

async function queryImpl(
  query: string,
  inputStream: Readable,
  outputStream: Writable,
  options: QueryFileOptions,
  joinStream?: Readable
): Promise<void> {
  const inputIterator = new CSVRecordIterator(
    inputStream,
    null,
    options.encoding,
    options.inputDelimiter,
    options.inputPolicy,
    options.hasHeader,
    null,
    'a',
    'a'
  );

  const outputWriter = new CSVWriter(
    outputStream,
    true,
    options.encoding,
    options.outputDelimiter,
    options.outputPolicy
  );

  const joinRegistry = joinStream && new JoinRegistry(
    new CSVRecordIterator(
      joinStream,
      null,
      options.encoding,
      options.inputDelimiter,
      options.inputPolicy,
      options.hasHeader,
      null,
      'b',
      'b'
    )
  );

  // TODO: Handle warnings
  const warnings: string[] = [ ];
  // Promise passthrough, no await.
  return rbql.query(query, inputIterator, outputWriter, warnings, joinRegistry);
}

  async function query(
    data: string,
    query: string,
    joinData?: string,
    partialOptions?: Partial<QueryFileOptions>
  ): Promise<string> {
    const options = loadOptions(query, partialOptions);

    const inputStream = Readable.from(data, { objectMode: false });
    const outputStream = new WritableStreamBuffer();
    const joinStream = joinData ? Readable.from(joinData, { objectMode: false }) : undefined;

    await queryImpl(query, inputStream, outputStream, options, joinStream);

    // TODO: Get rid of this ||.
    return outputStream.getContentsAsString(options.encoding) || '(Error!)';
  }
}
mechatroner commented 1 year ago

Thanks, I like this idea, the only thing I would change is that I think streams can be more efficient than strings, and functionality-wise they are more basic/generic, so we actually need a stream API, similar to what your internal queryImpl does, and as your snippet shows it is fairly trivial to convert a string into a stream, so the API caller can easily do this outside the function. IMO it should look something like this: async function query_csv_stream(query_text, input_stream, input_delim, input_policy, output_stream, output_delim, output_policy, output_warnings, join_stream=null, with_headers=false, comment_prefix=null, user_init_code='', options=null)