JuliaDataReaders / DataReaders.jl

A Julia library to get remote data via Requests.jl and get DataFrame (from DataFrames.jl) or TimeArray (from TimeSeries.jl)
MIT License
10 stars 7 forks source link

A Julia Dukascopy tick data reader #14

Closed femtotrader closed 7 years ago

femtotrader commented 7 years ago

Dukascopy provides ticks data http://www.dukascopy.com/datafeed/EURUSD/2016/02/14/20h_ticks.bi5 is ticks for 2016-02-14 (datetime.datetime(2016, 2, 14)) from 08PM to 09PM This is LZMA compressed data.

see also http://datafeed.dukascopy.com/datafeed/EURUSD/2016/02/14/06h_ticks.bi5

Python can download, uncompress them and output Pandas DataFrame see https://github.com/pydata/pandas-datareader/issues/235

It will be great to have such feature with Julia

LZMA decompression with Julia is a prerequisite - see discussion https://groups.google.com/forum/#!topic/julia-users/G9Pqe5svS3c

see https://github.com/yuyichao/LibArchive.jl

WIP: Uncompress 20h_ticks.bi5 to 20h_ticks

cp 20h_ticks.bi5 20h_ticks.xz
xz --decompress --format=lzma 20h_ticks.xz

Read with Python uncompressed 20h_ticks file

It's a stuctured binary file with array of records

see discussion https://groups.google.com/forum/#!topic/julia-users/wQUs4RnJta8

import numpy as np
import pandas as pd
import datetime
symb = "EURUSD"
dt_chunk = datetime.datetime(2016, 2, 14)
record_dtype = np.dtype([
    ('Date', '>u4'),
    ('Ask', '>u4'),
    ('Bid', '>u4'),
    ('AskVolume', '>f4'),
    ('BidVolume', '>f4'),
])

data = np.fromfile("20h_ticks", dtype=record_dtype)
columns = ["Date", "Ask", "Bid", "AskVolume", "BidVolume"]
df = pd.DataFrame(data, columns=columns)
if symb[3:] == "JPY":
    p_digits = 3
else:
    p_digits = 5
for p in ["Ask", "Bid"]:
    df[p] = df[p] / 10**p_digits
df["Date"] = dt_chunk + pd.to_timedelta(df["Date"], unit="ms")
df = df.set_index("Date")

a first step is to read this uncompressed file with Julia

import Base: start, done, next

symb = "EURUSD"

year, month, day, hour = 2016, 2, 14, 20

filename = @sprintf "%02dh_ticks" hour
println(filename)

immutable TickRawRecordType
  Date::UInt32
  Ask::UInt32
  Bid::UInt32
  AskVolume::Float32
  BidVolume::Float32
end

immutable TickRecordType
  Date::DateTime
  Ask::AbstractFloat
  Bid::AbstractFloat
  AskVolume::AbstractFloat
  BidVolume::AbstractFloat
end

function convert(raw_rec::TickRawRecordType, dt_chunk, p_digits)
    TickRecordType(
        Base.Dates.Millisecond(raw_rec.Date) + dt_chunk,
        raw_rec.Ask / 10^p_digits,
        raw_rec.Bid / 10^p_digits,
        Float64(raw_rec.AskVolume),
        Float64(raw_rec.BidVolume),
    )
end

function price_digits(symb)
    if symb[4:end] == "JPY"
        3
    else
        5
    end
end

type TickIter
    stream::IOStream
    ondone::Function
    dt_chunk::DateTime
    p_digits::UInt

    function TickIter(fh, year, month, day, hour, symb; ondone=()->nothing)
        new(fh, ondone, DateTime(year, month, day, hour), price_digits(symb))
    end

end

#=
    function TickIter(filename::AbstractString, year, month, day, hour, symb)
        fh = open(filename)
        ondone = ()->close(fh)
        new(fh, ondone, DateTime(year, month, day, hour), price_digits(symb))
    end
=#

function start(itr::TickIter)
    seek(itr.stream, 0)
    nothing
end

function next(itr::TickIter, nada)
    fh = itr.stream
    raw_rec = TickRawRecordType(
        ntoh(read(fh, UInt32)),
        ntoh(read(fh, UInt32)),
        ntoh(read(fh, UInt32)),
        ntoh(read(fh, Float32)),
        ntoh(read(fh, Float32))
    )
    rec = convert(raw_rec, itr.dt_chunk, itr.p_digits)
    rec, nothing
end

function done(itr::TickIter, nada)
    if !eof(itr.stream)
        return false
    end
    itr.ondone()
    true
end

fh = open(filename)
#seek(fh, 0)

itr = TickIter(fh, year, month, day, hour, symb)

#itr = TickIter(filename, year, month, day, hour, symb)

#=
state = start(itr)
while(!done(itr, state))
    rec, state = next(itr, state)
    println(rec)
end

println(repeat("=", 10))

seek(fh, 0)
itr = TickIter(fh, year, month, day, hour, symb)
state = start(itr)
while(!done(itr, state))
    rec, state = next(itr, state)
    println(rec)
end
=#

for rec in itr
    println(rec)
end

println(repeat("=", 50))

for rec in itr
    println(rec)
end

arr = collect(itr)
a_Dates = map(rec->rec.Date, arr)

close(fh)

ToDo:

See also

using LibArchive

immutable TickRawRecordType
  Date::UInt32
  Ask::UInt32
  Bid::UInt32
  AskVolume::Float32
  BidVolume::Float32
end

function price_digits(symb)
    if symb[4:end] == "JPY"
        3
    else
        5
    end
end

symb = "EURUSD"
p_digits = price_digits(symb)
year, month, day, hour = 2016, 2, 14, 20

filename = @sprintf "%02dh_ticks.bi5" hour
reader = LibArchive.Reader(filename)
LibArchive.support_format_raw(reader)
LibArchive.support_filter_all(reader)
entry = LibArchive.next_header(reader)
arr = read(reader)
close(reader)
arr = reinterpret(TickRawRecordType, arr)

a_Dates = map(rec->DateTime(year, month, day, hour) + Base.Dates.Millisecond(ntoh(rec.Date)), arr)

Nrows, Ncols = length(arr), length(fieldnames(TickRawRecordType)) - 1
data = zeros(Nrows, Ncols)

for i in 1:Nrows
    for j in 1:Ncols
        data[i, j] = ntoh(getfield(arr[i], j + 1))
    end
end
for j in 1:2
    data[1:end,j] = data[1:end,j] / 10^p_digits
end

println(a_Dates)
println(data)

using DataFrames
columns = [:Date, :Ask, :Bid, :AskVolume, :BidVolume]
df = DataFrame([a_Dates data])
names!(df, columns)
println(df)

using TimeSeries: TimeArray
columns = ["Ask", "Bid", "AskVolume", "BidVolume"]
ta = TimeArray(a_Dates, data, columns)
println(ta)
stevengj commented 7 years ago

Define

type TickIter{S<:IO}
    stream::S
    ...
end

so that stream can be any IO type, not just IOStream (files).

femtotrader commented 7 years ago

Thanks for this tip

so here is my code

import Base: start, done, next

symb = "EURUSD"

year, month, day, hour = 2016, 2, 14, 20

filename = @sprintf "%02dh_ticks" hour
println(filename)

immutable TickRawRecordType
  Date::UInt32
  Ask::UInt32
  Bid::UInt32
  AskVolume::Float32
  BidVolume::Float32
end

immutable TickRecordType
  Date::DateTime
  Ask::AbstractFloat
  Bid::AbstractFloat
  AskVolume::AbstractFloat
  BidVolume::AbstractFloat
end

function convert(raw_rec::TickRawRecordType, dt_chunk, p_digits)
    TickRecordType(
        Base.Dates.Millisecond(raw_rec.Date) + dt_chunk,
        raw_rec.Ask / 10^p_digits,
        raw_rec.Bid / 10^p_digits,
        Float64(raw_rec.AskVolume),
        Float64(raw_rec.BidVolume),
    )
end

function price_digits(symb)
    if symb[4:end] == "JPY"
        3
    else
        5
    end
end

type TickIter{S<:IO}
    stream::S
    ondone::Function
    dt_chunk::DateTime
    p_digits::UInt

    function TickIter(fh, year, month, day, hour, symb; ondone=()->nothing)
        new(fh, ondone, DateTime(year, month, day, hour), price_digits(symb))
    end

end

#=
    function TickIter(filename::AbstractString, year, month, day, hour, symb)
        fh = open(filename)
        ondone = ()->close(fh)
        new(fh, ondone, DateTime(year, month, day, hour), price_digits(symb))
    end
=#

function start(itr::TickIter)
    seek(itr.stream, 0)
    nothing
end

function next(itr::TickIter, nada)
    fh = itr.stream
    raw_rec = TickRawRecordType(
        ntoh(read(fh, UInt32)),
        ntoh(read(fh, UInt32)),
        ntoh(read(fh, UInt32)),
        ntoh(read(fh, Float32)),
        ntoh(read(fh, Float32))
    )
    rec = convert(raw_rec, itr.dt_chunk, itr.p_digits)
    rec, nothing
end

function done(itr::TickIter, nada)
    if !eof(itr.stream)
        return false
    end
    itr.ondone()
    true
end

fh = open(filename)
#seek(fh, 0)

itr = TickIter{IOStream}(fh, year, month, day, hour, symb)

#itr = TickIter(filename, year, month, day, hour, symb)

#=
state = start(itr)
while(!done(itr, state))
    rec, state = next(itr, state)
    println(rec)
end

println(repeat("=", 10))

seek(fh, 0)
itr = TickIter(fh, year, month, day, hour, symb)
state = start(itr)
while(!done(itr, state))
    rec, state = next(itr, state)
    println(rec)
end
=#

for rec in itr
    println(rec)
end

println(repeat("=", 50))

for rec in itr
    println(rec)
end

close(fh)

I just need now to integrate with LibArchive.jl

stevengj commented 7 years ago

You shouldn't need to call TickIter{IOStream}(...) ... just call TickIter(...) it will automatically use the correct TickIter type based on the type of the arguments, no?

stevengj commented 7 years ago

LibArchive provides an IO subtype, so you can use it with anything that accepts any subtype of IO.

femtotrader commented 7 years ago

That was also what I thought... but calling

itr = TickIter{IOStream}(fh, year, month, day, hour, symb)

instead of

itr = TickIter(fh, year, month, day, hour, symb)

raises

ERROR: LoadError: MethodError: `convert` has no method matching convert(::Type{TickIter{S<:IO}}, ::IOStream, ::Int64, ::Int64, ::Int64, ::Int64, ::ASCIIString)
This may have arisen from a call to the constructor TickIter{S<:IO}(...),
since type constructors fall back to convert methods.
Closest candidates are:
  call{T}(::Type{T}, ::Any)
  convert{T}(::Type{T}, !Matched::T)
 in call at essentials.jl:57
 in include at /Applications/Julia-0.4.6.app/Contents/Resources/julia/lib/julia/sys.dylib
 in include_from_node1 at /Applications/Julia-0.4.6.app/Contents/Resources/julia/lib/julia/sys.dylib
 in process_options at /Applications/Julia-0.4.6.app/Contents/Resources/julia/lib/julia/sys.dylib
 in _start at /Applications/Julia-0.4.6.app/Contents/Resources/julia/lib/julia/sys.dylib
while loading /Users/scls/test.jl, in expression starting on line 96
stevengj commented 7 years ago

Oh right, because you defined your own constructor, you replaced the default one. You can just add a definition:

TickIter{S<:IO}(fh::S, year, month, day, hour, symb) = TickIter{S}(fh, year, month, day, hour, symb)
femtotrader commented 7 years ago

I've done

type TickIter{S<:IO}
    stream::S
    ondone::Function
    dt_chunk::DateTime
    p_digits::UInt    
end
function TickIter{S<:IO}(fh::S, year, month, day, hour, symb; ondone=()->nothing)
    TickIter{S}(fh, ondone, DateTime(year, month, day, hour), price_digits(symb))
end

it works fine now.

I have just to use LibArchive now but I can't have a look at this today

femtotrader commented 7 years ago

WIP:

import Base: start, done, next
using LibArchive

immutable TickRawRecordType
  Date::UInt32
  Ask::UInt32
  Bid::UInt32
  AskVolume::Float32
  BidVolume::Float32
end

immutable TickRecordType
  Date::DateTime
  Ask::AbstractFloat
  Bid::AbstractFloat
  AskVolume::AbstractFloat
  BidVolume::AbstractFloat
end

function convert(raw_rec::TickRawRecordType, dt_chunk, p_digits)
    TickRecordType(
        Base.Dates.Millisecond(raw_rec.Date) + dt_chunk,
        Float64(raw_rec.Ask) / 10^p_digits,
        Float64(raw_rec.Bid) / 10^p_digits,
        Float64(raw_rec.AskVolume),
        Float64(raw_rec.BidVolume),
    )
end

function price_digits(symb)
    if symb[4:end] == "JPY"
        3
    else
        5
    end
end

type TickIter{S<:IO}
    stream::S
    ondone::Function
    dt_chunk::DateTime
    p_digits::UInt    
end
function TickIter{S<:IO}(stream::S, year, month, day, hour, symb; ondone=()->nothing)
    TickIter{S}(stream, ondone, DateTime(year, month, day, hour), price_digits(symb))
end

function start(itr::TickIter)
    seek(itr.stream, 0)
    nothing
end

function next(itr::TickIter, nada)
    fh = itr.stream
    raw_rec = TickRawRecordType(
        ntoh(read(fh, UInt32)),
        ntoh(read(fh, UInt32)),
        ntoh(read(fh, UInt32)),
        ntoh(read(fh, Float32)),
        ntoh(read(fh, Float32))
    )
    rec = convert(raw_rec, itr.dt_chunk, itr.p_digits)
    rec, nothing
end

function done(itr::TickIter, nada)
    if !eof(itr.stream)
        return false
    end
    itr.ondone()
    true
end

type TickReader
    filename::AbstractString
    reader::LibArchive.Reader
    itr::TickIter

    function TickReader(dt::DateTime, ticker, filename)
        reader = LibArchive.Reader(filename)

        LibArchive.support_format_raw(reader)
        LibArchive.support_filter_all(reader)
        entry = LibArchive.next_header(reader)
        arr = read(reader)
        close(reader)
        stream = IOBuffer(arr)
        itr = TickIter(stream, Dates.year(dt), Dates.month(dt), Dates.day(dt), Dates.hour(dt), ticker)
        new(filename, reader, itr)
    end
end

function to_arrays(reader::TickReader)
    itr = reader.itr
    seek(itr.stream, 0)
    arr = collect(itr)
    a_date = Array{DateTime}(map(rec->rec.Date, arr))
    a_ask = Array{Float64}(map(rec->rec.Ask, arr))
    a_bid = Array{Float64}(map(rec->rec.Bid, arr))
    a_ask_vol = Array{Float64}(map(rec->rec.AskVolume, arr))
    a_bid_vol = Array{Float64}(map(rec->rec.BidVolume, arr))
    a_date, a_ask, a_bid, a_ask_vol, a_bid_vol
end

using DataFrames
function to_dataframe(reader::TickReader)
    a_date, a_ask, a_bid, a_ask_vol, a_bid_vol = to_arrays(reader)
    columns = [:Date, :Ask, :Bid, :AskVolume, :BidVolume]
    df = DataFrame([a_date a_ask a_bid a_ask_vol a_bid_vol])
    names!(df, columns)
    df
end

using TimeSeries: TimeArray
function to_timearray(reader::TickReader)
    a_date, a_ask, a_bid, a_ask_vol, a_bid_vol = to_arrays(reader)
    columns = ["Ask", "Bid", "AskVolume", "BidVolume"]
    dat = [a_ask a_bid a_ask_vol a_bid_vol]
    TimeArray(a_date, dat, columns)
end

function main()
    ticker = "EURUSD"
    dt = DateTime(2016, 2, 14, 20)
    filename = @sprintf "%02dh_ticks.bi5" Dates.hour(dt)
    println(filename)

    reader = TickReader(dt, ticker, filename)

    itr = reader.itr
    state = start(itr)
    while(!done(itr, state))
        rec, state = next(itr, state)
        println(rec)
    end

    println(repeat("=", 10))

    for rec in itr
        println(rec)
    end

    println(repeat("=", 10))

    df = to_dataframe(reader)
    println(df)

    println(repeat("=", 10))

    ta = to_timearray(reader)
    println(ta)
end

main()

will be include in https://github.com/femtotrader/DukascopyTicksReader.jl see https://github.com/femtotrader/DukascopyTicksReader.jl/issues/1