fortran-lang / stdlib

Fortran Standard Library
https://stdlib.fortran-lang.org
MIT License
1.09k stars 166 forks source link

Selection algorithms #471

Open jvdp1 opened 3 years ago

jvdp1 commented 3 years ago

In #426, selection algorithms were mentioned. See e.g., these comment, comment, and comment.

Description

To be completed

Prior Art

To be completed

leonfoks commented 3 years ago

First stab. Also, @jvdp1 should we add the proposal label?

FWIW. I would be up for doing this, I have yet to contribute directly to stdlib because I have not had the mental capacity to overcome my learning curve with fypp, but this is an easy enough example to get my toes wet and stuck in. Once we have this, I can work on an implementation of Shewchuks adaptive floating point predicates (exact computation of the point inside/on/outside circle problem) and then KdTrees.

Description Hoare's O(n) quick select algorithm to find the kth sorted entry in an unsorted array, without performing the full sort. We can develop two types of algorithm based on this. quick_select and arg_quick_select (or ord_quick_select, but former matches numpy argmin, argpartition, argsort etc.).

Benefits Can be used to quickly find the median, or perhaps the value of the kth percentile in an array. The quick select is O(n) and in-place so is faster than first sorting the array and then choosing the kth value. This will be very useful for KdTrees where at each iteration, the dimension with the highest variance is split at its median value to create nicely balanced trees. A fast median (quick_select) here is key, and also a fast arg_median (and hence arg_quick_select).

API I'll put what I have in Coretran, this will change to match the stdlib format. Also, I don't know how to use fypp, or the syntax, but ill try to use suitable placeholders for possible changeable declarations. The implementations will go in a submodule. By in the code below, I mean real64, int64 etc. (or whatever stdlib is using as those parameters)

interface select
    !!Use an in-place quick select on an array of numbers
    module subroutine quick_select_<array_type>_<k_type>(array, k, out, left, right)
      !! Interfaced with select()
      <array_type>, intent(inout) :: array(:) 
        !! Array to choose kth smallest from
      <k_type>, intent(in) :: k
        !! kth smallest element, where 1 <= k <= size(array)
      <array_type> :: out
        !! Return the value of the kth element
      <k_type>, intent(in), optional :: left
        !! If array has already been partially sorted, use this as the starting left index
      <k_type>, intent(in), optional :: right
        !! If array has already been partially sorted, use this as the starting right index
    end subroutine
end interface

interface arg_select
    !!Use an in-place quick select on an array of numbers using an index into the array.
    module subroutine arg_quick_select_(array, indx, k, out, left, right)
      !! Interfaced with argSelect()
      <array_type>, intent(in) :: array(:) 
        !! Array to choose kth smallest from
      <k_type>, intent(inout) :: indx(size(array))
        !! Index into array.
      <k_type>, intent(in) :: k
        !! kth smallest element, where 1 <= k <= size(array)
      <array_type> :: out
        !! Return the value of the kth element
      <k_type>, intent(in), optional :: left
        !! If array has already been partially sorted, use this as the starting left index
      <k_type>, intent(in), optional :: right
        !! If array has already been partially sorted, use this as the starting right index
    end subroutine
end interface

Example

program select
use m_select, only: select

real(r64), allocatable :: d1D(:)
real(64) :: value
integer(i64), allocatable :: i1D(:)
integer(i64) :: ivalue

allocate(d1D(10000))
d1D = Assign values
k = (1 + size(d1D)) / 2
value = select(d1D, k)
write(*,'(a)') 'kth element? '//str(value)

allocate(i1D(10000))
i1D = Assign integers
ivalue = select(i1D, k)
write(*,'(a)') 'kth element? '//str(ivalue)

end program

Current implementations I have an implementation in Coretran, and during a discussion that started here @gareth-nx brought up the Matlab version with a permissive license, @gareth-nx did a quick translation and ill attach that below.

Timing I timed the matlab version and my coretran version using the same arrays and compile flags. Timings are below and are very close with coretran being surprisingly faster for sorted arrays but maybe this is just timing fluctuations.

It might be easiest at this point to simply translate the matlab code, and add the arg_select variant which should be easy enough.

Left column is array length, right is time in seconds. Timing was done around a loop of 10 quick selections and the average taken. The array used had samples from a random normal.

==========================
Testing : Selection
==========================
Double precision quick select
16777216  1.694E-01
8388608  9.460E-02
4194304  5.000E-02
2097152  2.530E-02
1048576  1.030E-02
524288  3.500E-03

Already sorted double precision quick select
16777216  1.030E-02
8388608  4.500E-03
4194304  2.800E-03
2097152  1.000E-03
1048576  6.000E-04
524288  2.000E-04

==========================
Testing : Matlab Selection
==========================
Double precision quick select
16777216  1.579E-01
8388608  8.530E-02
4194304  4.260E-02
2097152  2.130E-02
1048576  9.300E-03
524288  4.100E-03
Already sorted double precision quick select
16777216  1.310E-02
8388608  6.200E-03
4194304  3.600E-03
2097152  1.500E-03
1048576  1.200E-03
524288  4.000E-04

Matlab translated version

module qselect_mod
!
! This code is a translation of a matlab implementation "qselect" by Manolis Lourakis
!   https://www.mathworks.com/matlabcentral/fileexchange/68947-qselect 
! Below is the licence of qselect
!
!
! Copyright (c) 2018, Manolis Lourakis
! All rights reserved.
! 
! Redistribution and use in source and binary forms, with or without
! modification, are permitted provided that the following conditions are met:
! 
! * Redistributions of source code must retain the above copyright notice, this
!   list of conditions and the following disclaimer.
! 
! * Redistributions in binary form must reproduce the above copyright notice,
!   this list of conditions and the following disclaimer in the documentation
!   and/or other materials provided with the distribution
! * Neither the name of Foundation for Research and Technology - Hellas nor the names of its
!   contributors may be used to endorse or promote products derived from this
!   software without specific prior written permission.
! THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
! AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
! IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
! DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE
! FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
! DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
! SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
! CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
! OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
! OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
!

    use iso_c_binding, only: C_DOUBLE, C_INT
    implicit none

    integer, parameter :: dp = C_DOUBLE, ip = C_INT

    contains

    function qselect(a, k) result(kth_largest)
! qselect - select the k-th smallest out of n numbers
! Implements Hoare s Quickselect algorithm (https://en.wikipedia.org/wiki/Quickselect)
! with the median-of-3 pivot strategy. Operates in-place, avoiding sorting and recursion
!
!    kth = qselect(a, k)
!
!    a      - array of n elements
!    k      - specifies the desired k-th smallest element
!             the k-th *largest* element can be found by passing length(a)+1-k
!
! Returns
!
!    kth    - the sought element
! Manolis Lourakis 2007-18
! Institute of Computer Science, Foundation for Research & Technology - Hellas
! Heraklion, Crete, Greece
! Sep  2018  - Original version. (v. 1.0)
!
        real(dp), intent(inout) :: a(:)
        integer(ip), intent(in) :: k
        real(dp) :: kth_largest

        integer(ip) :: l, r, s, i, j, k_local
        real(dp) :: temp, pivot

        l = 1_ip
        r = size(a)
        if(k < 1 .or. k > r) stop "in qselect, k must be between 1 and size(a)";
        k_local = k

        do while(.true.)
            s = (l+r)/2_ip ! Deliberate integer division
            if(a(s) < a(r)) then
                temp = a(s); a(s) = a(r); a(r) = temp
            end if
            if(a(s) < a(l)) then
                temp = a(s); a(s) = a(l); a(l) = temp
            end if
            if(a(r) < a(l)) then
                temp = a(r); a(r) = a(l); a(l) = temp; 
            end if
            pivot = a(r) ! Median
            i = l

            do j = l, r-1
                if(a(j) <= pivot) then
                    temp = a(i); a(i) = a(j); a(j) = temp
                    i = i+1_ip
                end if
            end do
            temp = a(r); a(r) = a(i); a(i) = temp
            s = i-l+1
            if(k_local < s) then
                r = i-1_ip
            else if(k_local > s) then
                l=i+1_ip; k_local=k_local-s;
            else
                kth_largest = a(i)
                return
            end if
        end do
    end function

end module
gareth-nx commented 3 years ago

@leonfoks I like the subroutine interfaces you propose. IMO it is good to make this a subroutine as in your interfaces, rather than a function as in the matlab translation -- because there is some input argument modification (which I personally avoid for functions in Fortran, although others may have different views).

In addition to supporting various real/integer types, one could follow the approach of the sorting routines, and develop versions for character(len=*) and type(string_type). However on first thought, I'm not sure they would be especially useful? The obvious applications of quick-select are 'median', 'percentiles' etc which probably only need numeric types. If such special cases are of interest, one could also consider enable arg_quick_select_ to take a comparison function as an input argument in place of the array (something like C's qsort).

For simplicity it may be best to focus on intrinsic numeric types for the first pull request.

jvdp1 commented 3 years ago

First stab. Also, @jvdp1 should we add the proposal label?

Yes, indeed @leonfoks . I was a bit short in time yesterday to start it, but wanted to open an issue to not forget about it. Thank you for adding this information. It is a great start IMO.

Re: character(len=*) and type(string_type), I think it could be easily implemented once we have a clean version for integers and reals, similarly to stdlib sort.

jvdp1 commented 3 years ago

FWIW. I would be up for doing this, I have yet to contribute directly to stdlib because I have not had the mental capacity to overcome my learning curve with fypp, but this is an easy enough example to get my toes wet and stuck in.

Really happy to read this. Regarding fypp, the community can help you, if needed. stdlib_sorting module could be a nice example too. Anyway, don't hesitate to ask if needed.

The quick select is O(n) and in-place so is faster than first sorting the array and then choosing the kth value.

I think it would be good to propose both: a subroutine which is in-place, and a function with all argument intent(in) (keeping the array unchanged).

The implementations will go in a submodule. By in the code below, I mean real64, int64 etc. (or whatever stdlib is using as those parameters)

It sounds good to me.


interface select
    !!Use an in-place quick select on an array of numbers
    module subroutine quick_select_<array_type>_<k_type>(array, k, out, left, right)
      !! Interfaced with select()
      <array_type>, intent(inout) :: array(:) 
        !! Array to choose kth smallest from
      <k_type>, intent(in) :: k
        !! kth smallest element, where 1 <= k <= size(array)
      <array_type> :: out
        !! Return the value of the kth element
      <k_type>, intent(in), optional :: left
        !! If array has already been partially sorted, use this as the starting left index
      <k_type>, intent(in), optional :: right
        !! If array has already been partially sorted, use this as the starting right index
    end subroutine
end interface

Are left and right there to help the search?

leonfoks commented 3 years ago

Anyway, don't hesitate to ask if needed

Great! Thanks @jvdp1

I think it would be good to propose both: a subroutine which is in-place, and a function with all argument intent(in) (keeping the array unchanged).

We can implement the function with intent(in), we would just duplicate the incoming array.

Are left and right there to help the search?

Short answer: Yes.

Long answer and a bit of a ramble:

Yes, they make it a little faster when building a KdTree. Each time we split along a given dimension, the quick select puts all values lower than the median to the left and all values greater than to the right. The kdtree build is recursive, so the next time we split a dimension, we keep track of the left and right indices and only quick select over the smaller range. From a median or percentile standpoint it's hidden to the user, but from a development/speed up standpoint (in future algorithms) it will be good to have. Doing this allowed me to keep the co-ordinates of the KdTree in-place throughout building it, rather than passing sub arrays to the median algorithm, although my understanding at the time might have been lacking about how contiguous subarrays are passed to subroutines and whether an implicit copy happens or not. Since the number of points for building a kdtree can be huge I thought this step might be important and also why i originally suggested a subroutine with intent(inout).

I can make sure this is explained in the docstrings if needed!

Beliavsky commented 3 years ago

Often I want to know the locations of the largest k elements, not just the single location of the k'th largest element. Can this be accommodated? Related suggestions are in issue #405

Beliavsky commented 3 years ago

Often you want to select elements of several ranks, for example for a 100-element array the elements [1,25,50,75,100]. Ideally a selection code would allow the user to specify the ranks of the elements desired and either use a selection algorithm to find them individually or sort the entire array, depending on what is expected to be faster.

gareth-nx commented 3 years ago

@Beliavsky

Although I haven't checked comprehensively, I think it is true that, following a call to qselect(a, k) above, a is modified such that a(1:(k-1)) <= a(k) <= a((k+1):size(a)).

So if you were looking for the largest 10 elements (say), you could call qselect(a,size(a) - 10 + 1) and then look at a( (size(a) - 10+1): size(a) ) -- while the latter might not be sorted, it would have the 10 largest.

So there seems to be a clear path to k-largest or k-smallest type routines.

leonfoks commented 3 years ago

Thanks @gareth-nx, that should be the case.

For returning several elements at different k indices, that is a good use case for the optional left, right arguments in my interface/subroutine above. We could adapt the code / have a wrapper to simply continue searching for the other elements. I would hazard a guess that a center out approach might be fastest, i.e. the order would be 50-25-1-75-100. Or maybe it doesn't matter at all. Regardless, using the left and right indices from a previous run will help.

@Beliavsky arg_quick_select will return the indices into the original unmodified array that correspond to the k largest. And all the above discussions apply to that sub of course.

gareth-nx commented 3 years ago

Motivated by this discussion, I modified the matlab translation of quick_select to a subroutine, added left/right arguments, arg_quick_select, and a bunch of tests.

Among other things, these confirm the statements about partial sorting in our discussion above.

Copied below in case this is useful for future work (even if not the implementation, maybe some of the tests would be).

module quick_select_mod
!
! This code was modified from a matlab implementation of "qselect" by Manolis Lourakis
!   https://www.mathworks.com/matlabcentral/fileexchange/68947-qselect
! Below is the licence of qselect
!
! Copyright (c) 2018, Manolis Lourakis
! All rights reserved.
!
! Redistribution and use in source and binary forms, with or without
! modification, are permitted provided that the following conditions are met:
!
! * Redistributions of source code must retain the above copyright notice, this
!   list of conditions and the following disclaimer.
!
! * Redistributions in binary form must reproduce the above copyright notice,
!   this list of conditions and the following disclaimer in the documentation
!   and/or other materials provided with the distribution
! * Neither the name of Foundation for Research and Technology - Hellas nor the names of its
!   contributors may be used to endorse or promote products derived from this
!   software without specific prior written permission.
! THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
! AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
! IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
! DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE
! FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
! DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
! SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
! CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
! OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
! OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
!

    use iso_c_binding, only: C_DOUBLE, C_LONG
    implicit none

    integer, parameter :: dp = C_DOUBLE, ip = C_LONG

    contains

    subroutine quick_select(a, k, kth_smallest, left, right)
        !! quick_select - select the k-th smallest entry in a(:).
        !!
        !! Implements Hoare s Quickselect algorithm
        !!     (https://en.wikipedia.org/wiki/Quickselect)
        !! with the median-of-3 pivot strategy.
        !! Operates in-place, avoiding sorting and recursion.
        !!
        !! Modified from a translation of "qselect" by Manolis Lourakis
        !!   https://www.mathworks.com/matlabcentral/fileexchange/68947-qselect
        !!
        real(dp), intent(inout) :: a(:)
            !! Array in which we seek the kth-smallest entry.
            !! On output it will be partially sorted such that
            !! a(1:(k-1)) <= a(k) <= a((k+1):size(a)).
        integer(ip), intent(in) :: k
            !! We want the kth smallest entry. E.G. "k=1" corresponds to
            !! "min(a)", and "k=size(a)" corresponds to "max(a)"
        real(dp), intent(out) :: kth_smallest
            !! On output contains the kth-smallest value of a(:)
        integer(ip), intent(in), optional :: left, right
            !! If we know that:
            !!    the kth-smallest entry of a is in a(left:right)
            !! and also that:
            !!    a(1:(left-1)) <= a(left)
            !! and:
            !!    a(left:right)) <= a((right+1):size(a))
            !! then one or both bounds can be specified to reduce the search time.
            !! These constraints are available if we have previously called the
            !! subroutine with a different k (due to the way that a(:) becomes
            !! partially sorted, see documentation for a(:)).

        integer(ip) :: l, r, s, i, j, k_local
        real(dp) :: pivot

        l = 1_ip
        if(present(left)) l = left
        r = size(a)
        if(present(right)) r = right

        if(k < 1_ip .or. k > size(a) .or. l > r .or. l < 1_ip .or. &
            r > size(a)) then
            stop "quick_select must have 1 <= k <= size(a), and 1 <= left <= right <= size(a)";
        end if

        k_local = k - l + 1

        do while(.true.)
            s = (l+r)/2_ip ! Deliberate integer division
            if(a(s) < a(r)) call swap(a(s), a(r))
            if(a(s) < a(l)) call swap(a(s), a(l))
            if(a(r) < a(l)) call swap(a(r), a(l))
            pivot = a(r) ! Median

            i = l
            do j = l, r-1
                if(a(j) <= pivot) then
                    call swap(a(i), a(j))
                    i = i+1_ip
                end if
            end do
            call swap(a(r), a(i))
            s = i-l+1
            if(k_local < s) then
                r = i-1_ip
            else if(k_local > s) then
                l=i+1_ip; k_local=k_local-s;
            else
                kth_smallest = a(i)
                return
            end if
            !print*, a, i
        end do

        contains
            subroutine swap(a, b)
                real(dp), intent(inout) :: a, b
                real(dp) :: tmp
                tmp = a; a = b; b = tmp
            end subroutine
    end subroutine

    subroutine arg_quick_select(a, indx, k, kth_smallest, left, right)
        !! arg_quick_select - find the index of the k-th smallest entry in a(:)
        !!
        !! Implements Hoare s Quickselect algorithm
        !!    https://en.wikipedia.org/wiki/Quickselect)
        !! with the median-of-3 pivot strategy.
        !! Operates in-place, avoiding sorting and recursion.
        !!
        real(dp), intent(in) :: a(:)
            !! Array in which we seek the kth-smallest entry.
        integer(ip), intent(inout) :: indx(:)
            !! Array of indices into a(:). Must contain each integer
            !! from 1:size(a) exactly once. On output it will be partially
            !! "sorted" such that
            !! a(indx(1:(k-1))) <= a(indx(k)) <= a(indx( (k+1):size(a) )).
        integer(ip), intent(in) :: k
            !! We want index of the kth smallest entry. E.G. "k=1" corresponds
            !! to the index holding "min(a)", and "k=size(a)" corresponds to
            !! the index holding "max(a)"
        integer(ip), intent(out) :: kth_smallest
            !! On output contains the index with the kth-smallest value of a(:)
        integer(ip), intent(in), optional :: left, right
            !! If we know that:
            !!    the kth-smallest entry of a is in a(indx(left:right))
            !! and also that:
            !!    a(indx(1:(left-1))) <= a(indx(left))
            !! and:
            !!    a(indx(left:right))) <= a(indx((right+1):size(a)))
            !! then one or both bounds can be specified to reduce the search
            !! time. These constraints are available if we have previously
            !! called the subroutine with a different k (due to the way that
            !! indx(:) becomes partially sorted, see documentation for indx(:)).

        integer(ip) :: l, r, s, i, j, k_local
        real(dp) :: pivot

        l = 1_ip
        if(present(left)) l = left
        r = size(a)
        if(present(right)) r = right

        if(size(a) /= size(indx)) then
            stop "arg_quick_select must have size(a) == size(indx)"
        end if

        if(k < 1_ip .or. k > size(a) .or. l > r .or. l < 1_ip .or. &
            r > size(a)) then
            stop "arg_quick_select must have 1 <= k <= size(a), and 1 <= left <= right <= size(a)";
        end if

        k_local = k - l + 1

        do while(.true.)
            s = (l+r)/2_ip ! Deliberate integer division
            if(a(indx(s)) < a(indx(r))) call swap(indx(s), indx(r))
            if(a(indx(s)) < a(indx(l))) call swap(indx(s), indx(l))
            if(a(indx(r)) < a(indx(l))) call swap(indx(r), indx(l))
            pivot = a(indx(r)) ! Median

            i = l
            do j = l, r-1
                if(a(indx(j)) <= pivot) then
                    call swap(indx(i), indx(j))
                    i = i+1_ip
                end if
            end do
            call swap(indx(r), indx(i))
            s = i-l+1
            if(k_local < s) then
                r = i-1_ip
            else if(k_local > s) then
                l=i+1_ip; k_local=k_local-s;
            else
                kth_smallest = indx(i)
                return
            end if
        end do

        contains
            subroutine swap(a, b)
                integer(ip), intent(inout) :: a, b
                integer(ip) :: tmp
                tmp = a; a = b; b = tmp
            end subroutine
    end subroutine

end module

module test_quick_select_mod
    use quick_select_mod
    implicit none

    contains

    subroutine test_quick_select()

        integer(ip), parameter :: N = 10, Nr = 1000, Nreps = 4
        real(dp) :: x(N), x_copy(N), mat(8), mat_copy(8), &
            len1(1), len2(2), kth_smallest, random_vals(Nr)
        integer(ip) :: i, p, up_rank, down_rank, mid_rank
        logical :: test1, test2, test3, any_failed

        ! x contains the numbers 1**2, 2**2, .... 10**2, with mixed-up order
        x = (/( i**2, i=1, size(x) )/)
        x(5:2:-1) = x(2:5)
        x(10:8:-1) = x(8:10)

        ! Check that the ith-ranked entry of x really is i**2
        do i = 1, size(x)
            x_copy = x
            call quick_select(x_copy, i, kth_smallest)
            print*, merge('PASS', 'FAIL', kth_smallest == i**2)
        end do

        ! Check that it works when we specify "left" and know that the array
        ! is partially sorted due to previous calls to quickselect
        x_copy = x
        do i = 1, size(x), 1
            call quick_select(x_copy, i, kth_smallest, left=i)
            print*, merge('PASS', 'FAIL', kth_smallest == i**2)
        end do

        ! Check that it works when we specify "right" and know that the array
        ! is partially sorted due to previous calls to quickselect
        x_copy = x
        do i = size(x), 1, -1
            call quick_select(x_copy, i, kth_smallest, right=i)
            print*, merge('PASS', 'FAIL', kth_smallest == i**2)
        end do

        !
        ! Variants of test that came with the matlab documentation
        !
        mat = 1.0_dp * [3, 2, 7, 4, 5, 1, 4, -1]
        mat_copy = mat
        call quick_select(mat_copy, 1_ip, kth_smallest)
        print*, merge('PASS', 'FAIL', kth_smallest == -1)
        mat_copy = mat
        call quick_select(mat_copy, 2_ip, kth_smallest)
        print*, merge('PASS', 'FAIL', kth_smallest == 1)
        mat_copy = mat
        call quick_select(mat_copy, size(mat)+1_ip-4_ip, kth_smallest)
        print*, merge('PASS', 'FAIL', kth_smallest == 4)
        mat_copy = mat
        call quick_select(mat_copy, 5_ip, kth_smallest)
        print*, merge('PASS', 'FAIL', kth_smallest == 4)
        mat_copy = mat
        call quick_select(mat_copy, 6_ip, kth_smallest)
        print*, merge('PASS', 'FAIL', kth_smallest == 4)
        mat_copy = mat
        call quick_select(mat_copy, 7_ip, kth_smallest)
        print*, merge('PASS', 'FAIL', kth_smallest == 5)

        ! Check it works for size(a) == 1
        len1(1) = -1.0_dp
        call quick_select(len1, 1_ip, kth_smallest)
        print*, merge('PASS', 'FAIL', kth_smallest == -1.0_dp)

        ! Check it works for size(a) == 2
        len2 = [-3.2_dp, -5.1_dp]
        call quick_select(len2, 2_ip, kth_smallest)
        print*, merge('PASS', 'FAIL', kth_smallest == -3.2_dp)
        len2 = [-3.2_dp, -5.1_dp]
        call quick_select(len2, 1_ip, kth_smallest)
        print*, merge('PASS', 'FAIL', kth_smallest == -5.1_dp)
        len2 = [-1.0_dp, -1.0_dp]
        call quick_select(len2, 1_ip, kth_smallest)
        print*, merge('PASS', 'FAIL', kth_smallest == -1.0_dp)
        len2 = [-1.0_dp, -1.0_dp]
        call quick_select(len2, 2_ip, kth_smallest)
        print*, merge('PASS', 'FAIL', kth_smallest == -1.0_dp)

        !
        ! Test using random data
        !
        any_failed=.FALSE.

        ! Search for the pth-smallest rank, for all these p
        ! (avoid end-points to enable constrained search tests)
        do p = 3, Nr-2

            ! Repeat for different random samples to try to expose any errors
            do i = 1, Nreps

                ! Standard quick-select
                call random_number(random_vals)
                call quick_select(random_vals, p, kth_smallest)

                test1 = kth_smallest == random_vals(p)
                test2 = all(random_vals(1:(p-1)) <= random_vals(p))
                test3 = all(random_vals(p) <= &
                    random_vals((p+1):size(random_vals)))
                if( (.not. test1) .or. (.not. test2) .or. (.not. test3) ) &
                    any_failed = .TRUE.

                ! Constrained search above 'p', providing 'left'
                up_rank = p + (Nr-p)/2 ! Deliberate integer division
                call quick_select(random_vals, up_rank, kth_smallest, left=p)

                test1 = kth_smallest == random_vals(up_rank)
                test2 = all(random_vals(1:(up_rank-1)) <= random_vals(up_rank))
                test3 = all(random_vals(up_rank) <= &
                    random_vals((up_rank+1):size(random_vals)))
                if( (.not. test1) .or. (.not. test2) .or. (.not. test3) ) &
                    any_failed = .TRUE.

                ! Constrained search below p, providing 'right'
                down_rank = p - (p/2)
                call quick_select(random_vals, down_rank, kth_smallest, right=p)

                test1 = kth_smallest == random_vals(down_rank)
                test2 = all(random_vals(1:(down_rank-1)) <= &
                    random_vals(down_rank))
                test3 = all(random_vals(down_rank) <= &
                    random_vals((down_rank+1):size(random_vals)))
                if( (.not. test1) .or. (.not. test2) .or. (.not. test3) ) &
                    any_failed = .TRUE.

                ! Constrained search between up-ind and down-ind, proving left
                ! and right. Make 'mid_rank' either above or below p
                mid_rank = p - p/3*mod(i,2) + (Nr-p)/3*(1-mod(i,2))
                call quick_select(random_vals, mid_rank, kth_smallest, &
                    left=down_rank, right=up_rank)

                test1 = kth_smallest == random_vals(mid_rank)
                test2 = all(random_vals(1:(mid_rank-1)) <= &
                    random_vals(mid_rank))
                test3 = all(random_vals(mid_rank) <= &
                    random_vals((mid_rank+1):size(random_vals)))
                if( (.not. test1) .or. (.not. test2) .or. (.not. test3) ) &
                    any_failed = .TRUE.

            end do
        end do

        print*, merge('PASS', 'FAIL', .not. any_failed)

    end subroutine

    subroutine test_arg_quick_select

        integer(ip), parameter :: N = 10, Nr = 1000, Nreps = 4, Nm=8
        real(dp) :: x(N), mat(Nm), len1(1), len2(2)
        integer(ip) :: indx(N), indx_copy(N), &
            indx_mat(Nm), indx_mat_copy(Nm), &
            indx_len1(1), indx_len2(2), &
            indx_r(Nr)
        real(dp) :: random_vals(Nr)
        integer(ip) :: i, j, p, up_rank, down_rank, mid_rank, kth_smallest
        logical :: test1, test2, test3, any_failed

        ! Make x contain 1**2, 2**2, .... 10**2, but mix up the order
        x = (/( i**2, i=1, size(x) )/)
        x(5:2:-1) = x(2:5)
        x(10:8:-1) = x(8:10)

        indx = (/(i, i = 1, size(x))/)

        ! Check that the ith ranked entry of x equals i**2
        do i = 1, size(x)
            indx_copy = indx
            call arg_quick_select(x, indx, i, kth_smallest)
            print*, merge('PASS', 'FAIL', x(kth_smallest) == i**2)
        end do

        ! Check that it works when we specify "left" and know that the index
        ! array is partially sorted due to previous calls to arg_quick_select
        indx_copy = indx
        do i = 1, size(x), 1
            call arg_quick_select(x, indx_copy,  i, kth_smallest, left=i)
            print*, merge('PASS', 'FAIL', x(kth_smallest) == i**2)
        end do

        ! Check that it works when we specify "right" and know that the index
        ! array is partially sorted due to previous calls to arg_quick_select
        indx_copy = indx
        do i = size(x), 1, -1
            call arg_quick_select(x, indx_copy, i, kth_smallest, right=i)
            print*, merge('PASS', 'FAIL', x(kth_smallest) == i**2)
        end do

        !
        ! Variants of test that came with the matlab documentation for qselect
        !
        mat = 1.0_dp * [3, 2, 7, 4, 5, 1, 4, -1]
        indx_mat = (/( i, i = 1, size(mat) )/)

        indx_mat_copy = indx_mat
        call arg_quick_select(mat, indx_mat_copy, 1_ip, kth_smallest)
        print*, merge('PASS', 'FAIL', mat(kth_smallest) == -1)

        indx_mat_copy = indx_mat
        call arg_quick_select(mat, indx_mat_copy, 2_ip, kth_smallest)
        print*, merge('PASS', 'FAIL', mat(kth_smallest) == 1)

        indx_mat_copy = indx_mat
        call arg_quick_select(mat, indx_mat_copy, size(mat)+1_ip-4_ip, &
            kth_smallest)
        print*, merge('PASS', 'FAIL', mat(kth_smallest) == 4)

        indx_mat_copy = indx_mat
        call arg_quick_select(mat, indx_mat_copy, 5_ip, kth_smallest)
        print*, merge('PASS', 'FAIL', mat(kth_smallest) == 4)

        indx_mat_copy = indx_mat
        call arg_quick_select(mat, indx_mat_copy, 6_ip, kth_smallest)
        print*, merge('PASS', 'FAIL', mat(kth_smallest) == 4)

        indx_mat_copy = indx_mat
        call arg_quick_select(mat, indx_mat_copy, 7_ip, kth_smallest)
        print*, merge('PASS', 'FAIL', mat(kth_smallest) == 5)

        ! Check it works for size(a) == 1
        len1(1) = -1.0_dp
        indx_len1(1) = 1_ip
        call arg_quick_select(len1, indx_len1, 1_ip, kth_smallest)
        print*, merge('PASS', 'FAIL', len1(kth_smallest) == -1.0_dp)

        ! Check it works for size(a) == 2
        len2 = [-3.2_dp, -5.1_dp]
        indx_len2 = [1_ip, 2_ip]
        call arg_quick_select(len2, indx_len2, 2_ip, kth_smallest)
        print*, merge('PASS', 'FAIL', len2(kth_smallest) == -3.2_dp)

        len2 = [-3.2_dp, -5.1_dp]
        indx_len2 = [1_ip, 2_ip]
        call arg_quick_select(len2, indx_len2, 1_ip, kth_smallest)
        print*, merge('PASS', 'FAIL', len2(kth_smallest) == -5.1_dp)

        len2 = [-1.0_dp, -1.0_dp]
        indx_len2 = [1_ip, 2_ip]
        call arg_quick_select(len2, indx_len2, 1_ip, kth_smallest)
        print*, merge('PASS', 'FAIL', len2(kth_smallest) == -1.0_dp)

        len2 = [-1.0_dp, -1.0_dp]
        indx_len2 = [1_ip, 2_ip]
        call arg_quick_select(len2, indx_len2, 2_ip, kth_smallest)
        print*, merge('PASS', 'FAIL', len2(kth_smallest) == -1.0_dp)

        !
        ! Test using random data
        !
        any_failed=.FALSE.

        ! Search for the pth-smallest, for all these p (avoid end-points to
        ! enable additional tests using "left", "right" arguments)
        do p = 3, Nr-2

            ! Repeat for many random samples to try to expose any errors
            do i = 1, Nreps

                call random_number(random_vals)

                indx_r = (/( j, j = 1, size(random_vals) )/)

                ! Standard arg_quick_select
                call arg_quick_select(random_vals, indx_r, p, kth_smallest)

                test1 = random_vals(kth_smallest) == random_vals(indx_r(p))
                test2 = all(random_vals(indx_r(1:(p-1))) <= &
                    random_vals(indx_r(p)))
                test3 = all(random_vals(indx_r(p)) <= &
                    random_vals(indx_r((p+1):size(random_vals))))
                if( (.not. test1) .or. (.not. test2) .or. (.not. test3) ) &
                    any_failed = .TRUE.

                ! Constrained search for a rank above 'p', providing 'left'
                up_rank = p + (Nr-p)/2 ! Deliberate integer division
                call arg_quick_select(random_vals, indx_r, up_rank, &
                    kth_smallest, left=p)

                test1 = random_vals(kth_smallest) == &
                    random_vals(indx_r(up_rank))
                test2 = all(random_vals(indx_r(1:(up_rank-1))) <= &
                    random_vals(indx_r(up_rank)))
                test3 = all(random_vals(indx_r(up_rank)) <= &
                    random_vals(indx_r((up_rank+1):size(random_vals))))
                if( (.not. test1) .or. (.not. test2) .or. (.not. test3) ) &
                    any_failed = .TRUE.

                ! Constrained search for a rank below p, providing 'right'
                down_rank = p - (p/2)
                call arg_quick_select(random_vals, indx_r, down_rank, &
                    kth_smallest, right=p)

                test1 = random_vals(kth_smallest) == &
                    random_vals(indx_r(down_rank))
                test2 = all(random_vals(indx_r(1:(down_rank-1))) <= &
                    random_vals(indx_r(down_rank)))
                test3 = all(random_vals(indx_r(down_rank)) <= &
                    random_vals(indx_r((down_rank+1):size(random_vals))))
                if( (.not. test1) .or. (.not. test2) .or. (.not. test3) ) &
                    any_failed = .TRUE.

                ! Constrained search for a rank between up-ind and down-ind,
                ! proving left and right. 'mid_rank' is either above or below p
                mid_rank = p - p/3*mod(i,2) + (Nr-p)/3*(1-mod(i,2))
                call arg_quick_select(random_vals, indx_r, mid_rank, &
                    kth_smallest, left=down_rank, right=up_rank)

                test1 = random_vals(kth_smallest) == &
                    random_vals(indx_r(mid_rank))
                test2 = all(random_vals(indx_r(1:(mid_rank-1))) <= &
                    random_vals(indx_r(mid_rank)))
                test3 = all(random_vals(indx_r(mid_rank)) <= &
                    random_vals(indx_r((mid_rank+1):size(random_vals))))
                if( (.not. test1) .or. (.not. test2) .or. (.not. test3) ) &
                    any_failed = .TRUE.

            end do
        end do

        print*, merge('PASS', 'FAIL', .not. any_failed)

    end subroutine

end module

program test_all

    use test_quick_select_mod
    implicit none

    call test_quick_select()
    call test_arg_quick_select()

end program
gareth-nx commented 3 years ago

I have made a pull request implementing select and arg_select, see here.

ghbrown commented 3 years ago

@gareth-nx Just came from your PR, but I'll comment here so as not to pollute the PR.

Thanks for working on this! A need for these methods have come up in my work a lot more frequently than I would have expected, so they'll be great to have.

We have median implemented, but it looks to use a full sort, making it O(n log(n)). I assume there are there plans to update median (and possibly others) it in light of the methods you're implementing? I tried skimmed the discussion here, but didn't find anything on this.

gareth-nx commented 3 years ago

Hi @ghbrown -- yes this issue was originally raised from discussions in the review of median, where I think @leonfoks pointed out that quick-select is better than sorting the full array. At that time the median pull request by @jvdp1 was well advanced, so rather than delay, the idea was to move ahead with it, and just update it when quick-select became available.

Until the comment by @leonfoks I didn't realise that these techniques existed, but turns out they are very useful!

If you or anyone else has time to review this, I'm sure it would help move things along.

gareth-nx commented 3 years ago

For future reference, note the current pull request covers all real and integer types, but it is relatively straightforward to generalise the algorithm to character(len=*) and string_type as well.

There is a version of the patch which can do that. Unfortunately it contains a minor workaround for (what seems like) a compiler bug in gfortran-9. There is no problem with more recent versions of gfortran, or with ifort. See discussion in the pull-request review.

For now I don't expect we'll pursue those cases, to keep the code clean. But it would be easy to do in future, so I'm using this comment to refer to the code.

gareth-nx commented 3 years ago

The pull request has now been approved by @jvdp1 , but we need a second review that explicitly "approves" the pull request before it can be merged.

If anyone on this list can find time to do that, it would be most appreciated. The pull request is here. Given the review process to date, it should be fairly clean. @leonfoks @Beliavsky