jucardi / go-streams

Stream Collections for Go. Inspired in Java 8 Streams and .NET Linq
MIT License
308 stars 23 forks source link
array collection collections filter foreach go go-streams golang iterables java-streams linq slice stream stream-collections stream-match stream-pipeline streams

Stream Collections for Go, inspired in Java 8 Streams and .NET Linq

This library provides


(Nov 2022) Important Update: This library has been redesigned to support Golang Generics, and it is not backwards compatible with the previous version. Also requires at least Go 1.18. If you require the older version without generics or a version that is compatible with an older version of Go, using Golang Modules you may get the latest stable version without generics by running the following command:

go get github.com/jucardi/go-streams@v1.0.3

Quick Start

To keep up to date with the most recent version:

go get github.com/jucardi/go-streams

Using Golang Modules

go get github.com/jucardi/go-streams/v2@latest

For the latest version without Golang Generics (v1.0.3)

go get github.com/jucardi/go-streams@v1.0.3
Quick Overview

Streams facilitate operations on arrays, iterables and collections, such as filtering, sorting, mapping, foreach, and parallel operations on the items contained by these arrays, iterables or collections

A quick example:

Given the following array

var fruitArray = []string{"peach", "apple", "pear", "plum", "pineapple", "banana", "kiwi", "orange"}

Let's obtain an array of only the elements that start with the letter "p"

fruitsThatStartWithP := streams.

    // Creates a stream from the given array
    From[string](fruitArray).

    // Adds a filter for strings that start with 'p'
    Filter(func(v string) bool {
        return strings.HasPrefix(v, "p")
    }).

    // Sorts alphabetically
    Sort(strings.Compare).

    // Converts back to an array
    ToArray()

The resulting array will be {"peach", "pear", "pineapple", "plum"}

Here we use an array of string as the source of the stream, perform a filter operation provided by a function that receives a single element of the collection and determines whether the element should remain in the stream by returning a boolean.

Now let's do a simple forach operation

streams.
    From[string](fruitArray).
    Filter(func(v string) bool {
        return strings.HasPrefix(v, "p")
    }).
    ForEach(func(v string) {
        println(v)
    })

In this example, once the stream processes the filter, performs a foreach operation with the result. With this operation we'll obtain the following output in the console

peach
pear
plum
pineaple

About the go-streams

The characteristics of a Stream are inspired in the stream features provided by Java 8. The following characteristics apply in the go-streams.

Currently Streams can be obtained in two ways:

NOTE: There is a global function From[T comparable] that accepts any value and properly creates the stream depending on whether it is an array or a collection. However it panics if the input is neither.

Stream operations and pipelines

Stream operations are divided into intermediate and terminal operations, and are combined to form stream pipelines. A stream pipeline consists of a source (such as an iterable, a collection, an array, a generator function, or an I/O channel); followed by zero or more intermediate operations such as Filter(), Exclude() or Sort() and a terminal operation such as ForEach() or First()

Intermediate operations return a stream. They are always lazy; executing an intermediate operation such as Filter() or Sort() does not actually perform any action, but instead register the action into the stream that, when traversed, will execute all filtering and sorting criteria at once. Traversal of the pipeline source does not begin until the terminal operation of the pipeline is executed, such as First(), Last(), ToArray(), ForEach()

Terminal operations, such as ForEach(), First(), Last(), ParallelForEach() or ToArray(), may traverse the stream to produce a result or a side-effect. After the terminal operation is performed, the stream pipeline is considered consumed, and can no longer be used; if you need to traverse the same data source again, you must return to the data source to get a new stream. In almost all cases, terminal operations are eager, completing their traversal of the data source and processing of the pipeline before returning

Intermediate operations

Terminal operations

Single item returns
Collection returns
Boolean returns
IThen[T] returns

IThen[T] is a handler where functions can be registered and triggered if the stream result meets a certain condition

E.g:

streams.
    FromArray[T](array). 
    Filter(filterHandler).
    Filter(anotherFilterHandle).
    Sort(sorterHandler)
    IfEmpty().
    Then(func (resultStream IStream[T] { 
        // Do something if empty
    }).
    Else(func (resultStream IStream[T] { 
        // Do something if not empty
    })
Void returns
Int returns

Mapping functions

In previous versions of this library, there was a Map() function that would help to convert the elements of the initial type of the stream into something else (E.g: array of strings to integers)

This library was updated in favor of Golang Generics to avoid having to work with interface{} and reflect. However, Golang Generics do not support adding additional generic types to functions that are defined in interfaces or structs, so the Map function cannot be invoked from the stream since it cannot accept the target type of the conversion.

For this reason, the Map function was moved as a static function in this package, and new mapping functions were added to simplify some mapping invocations

The current mapping functions built-in this package are:

Examples:

1. Mapping from a stream

// Given the following source array of number strings
sourceArray := []string{"1", "5", "8", "100", "23", "6", "abc"}

mappedList := streams.Map[string, int](   
    streams.                            
        From[string](sourceArray).      // The source stream for the mapping with any stream functions needed prior to
        Filter(func(x string) bool {    // be processed for mapping. In this example, all items that cannot be parsed to
            _, err := strconv.Atoi(x)   // an int will be filtered
            return err == nil
        }),
    func(item string) int {             // The mapping function
        ret, _ := strconv.Atoi(x)
        return ret
    }
})

2. Using the mapping result as a stream to continue processing

// Given the following source array of number strings
sourceArray := []string{"1", "5", "8", "100", "23", "6", "abc"}

mappedList := streams.Map[string, int](   
    sourceArray,                       // Using the array as the source instead of a stream
    func(item string) int {            // The mapping function
        ret, _ := strconv.Atoi(x)
        return ret                     // for any elements that cannot be converted to int, returns default int (0)
    }
}).
    ToStream().                        // Obtain a stream from the resulting list
    Filter(func (x int) bool {         // Apply any stream functions to the new stream
        return x > 0
    }).
    ToList()

3. Mapping using the MapNoComparable function

// Given the following struct
type SomeStruct struct {
    Name       string
    Score      int
    StringFn   func() string
}

// And given the following array
arr := []SomeStruct{
    {
        Name: "abcd",
        StringFn: func() string {
            return "something"
        },
    },
    {
        Score: 123,
        StringFn: func() string {
            return "something else"
        },
    },
}

// Because the structure has a `func` field, the structure `SomeStruct` is not a `comparable` type,
// so that's where the func `MapNoComparable` becomes handy. This example transforms the source array
// into an array functions []func() string using the function in the `StringFn` field

funcs := streams.
    MapNonComparable[SomeStruct, func() string](
        arr,
        func(x SomeStruct) func() string {
            return x.StringFn
        })

4. Using the MapToPtr function

// Using the same struct and array as the example above

// Given the following struct
type SomeStruct struct {
    Name       string
    Score      int
    StringFn   func() string
}

// And given the following array
arr := []SomeStruct{
    {
        Name: "abcd",
        StringFn: func() string {
            return "something"
        },
    },
    {
        Score: 123,
        StringFn: func() string {
            return "something else"
        },
    },
}

// Say in this case, we would like to obtain an array of functions []func() string contained in the
// source structure, but only for those structures that have a `Score` of 50 or higher.
//
// streams.From[SomeStruct] will not work because `SomeStruct` is not `comparable`, so we can do the
// following:

// Convert `arr` from []SomeStruct to []*SomeStruct
newArr := MapToPtr[SomeStruct](arr)

// Now, `newArr` can be used in streams because *SomeStruct is `comparable`
funcs := streams.
    Map[*SomeStruct, func() string](
        streams.                               // The source stream for mapping
            From[*SomeStruct](newArr).
            Filter(func (x *SomeStruct){       // The filtering in the source stream
                return x.Score >= 50
            }),
        func(x *SomeStruct) func() string {    // The mapping function
            return x.StringFn
        },
    )

Parallelism

Parallel operations are allowed with the streams, once enabled, when the intermediate operations are processed, they will be done in parallel. Note that parallelism for ForEach operations are handled separately, since ForEach is a terminal operation and executing it in parallel cannot guarantee the final order of the intermediate operations.

Enabling parallelism for intermediate operations.

There are multiple ways of enabling parallelism in a stream.

stream := streams.
    FromArray[T](array, 8).      // Creates the stream from the given array, the second (variadic) argument indicates 
                                 // the amount of threads to be used when executing intermediate operations
    Filter(filterHandler).
    Filter(anotherFilterHandle).
    Sort(sorterHandler)
stream := streams.
    FromArray[T](array).
    Filter(filterHandler, 8).    // Adds a filter to the stream, the second (variadic) argument indicates the amount
                                 // of threads to be used when executing intermediate operations
    Except(exceptHandler).
    Sort(sorterHandler)

stream := streams.
    FromArray[T](array).
    Filter(filterHandler).
    Except(exceptHandler, 8).    // Adds an except to the stream, the second (variadic) argument indicates the amount of threads to be used when executing intermediate operations
    Sort(sorterHandler)
stream := streams.
    FromArray(array).
    SetThreads(8).           // Sets the amount of threads to be used when executing intermediate operations
    Filter(filterHandler).
    Except(exceptHandler).
    Sort(sorterHandler)

stream := streams.
    FromArray(array).
    Filter(filterHandler).
    Except(exceptHandler).
    Sort(sorterHandler).
    SetThreads(8)            // Can be added at any point before invoking a terminal operation

When setting the amount of threads at any of the options above, keep in mind, the amount of threads will be capped by the maximum amount of CPUs available in the host machine.

Any number equal or lower than 0 can be provided if the maximum amount of threads is desired based on the CPUs cap.

Executing a For Each in parallel

The ForEach function by default does not run in parallel, regardless if threads were previously assigned for intermediate operations. The reason for this design is, intermediate operations produce a result which may have been sorted by a provided sorting algorithm. When running a ForEach in parallel, the order in which the foreach handlers will be ran for each element in the resulting stream cannot be guaranteed.

Use ParallelForEach only when the order of execution does not matter when processing the elements of the stream

The function ParallelForeach(f function(x T), threads int, skipWait ...bool) receives 3 args.

Example:

stream := streams.
    FromArray[T](array, 8).   // Setting 8 cores for intermediate operations, such as filtering.
    Filter(filterHandler).
    Except(exceptHandler).
    Sort(sorterHandler).
    ParallelForEach(function (x T) {

        // foreach logic here.

    }, 0)                     // Setting the maximum amount of cores available for the ParallelForEach process.