os-climate / data-platform-demo

Apache License 2.0
3 stars 7 forks source link

Need demo of data pipeline connected to ECB RESTful interface (exchange rates and more) #51

Closed MichaelTiemannOSC closed 1 year ago

MichaelTiemannOSC commented 2 years ago

The ECB makes datasets available via a RESTful interface: https://sdw-wsrest.ecb.europa.eu/help/

From that help page there is an example of how to format requests to download datasets such as USD/EUR exchange rates (the .A at the end is Annual; Quarterly, Monthly, Daily rates are also available): https://sdw-wsrest.ecb.europa.eu/service/data/EXR/M.USD.EUR.SP00.A

Ideal would be an initial implementation that could be generalized into a federating service of all publicly available ECB statistics, using features of their RESTful interface to guide data discovery and selection. Exchange rate data is needed immediately for ITR and PCAF workstreams. Later enhancements can cache data to Trino and update as needed.

@HeatherAck @toki8 @caldeirav @erikerlandson @LeylaJavadova @mersin35

MichaelTiemannOSC commented 2 years ago

This is link syntax to get CSV data instead of XML: https://sdw-wsrest.ecb.europa.eu/service/data/EXR/M.USD.EUR.SP00.A?format=csvdata

caldeirav commented 2 years ago

@bryonbaker as discussed, we can use this issue to build our first REST-based integration. We can start with daily exchange rates.

MichaelTiemannOSC commented 2 years ago

@bryonbaker please feel free to reach out to Mutlu Ersin (@mersin35) for use-case requirements and/or me for any other questions. Thanks!

bryonbaker commented 2 years ago

Thanks @MichaelTiemannOSC . I am just getting up to speed on everything. First I am trying to navigate everything. Vincent gave me a run through. Let's see how much I remember. @mersin35 and Michael - what is the best channel for reaching out directly?

bryonbaker commented 2 years ago

I have a few initial questions...

  1. Are we looking for a live stream, or something that will create a historical feed?
  2. Are we starting with just FX?
  3. Will this include stocks of interest?
  4. Has there been discussion on which components we will incorporate in the solution? E.g. service pulls feeds from markets and dumps them on a kafka topic or persist them in a database, .....
caldeirav commented 2 years ago

@bryonbaker architecturally speaking, this is the pattern we want to demonstrate: https://github.com/os-climate/os_c_data_commons/blob/main/docs/add-data-pipeline-ingestion-patterns-event.md

MichaelTiemannOSC commented 2 years ago

Let's just start with FX, polish that, then extend.

I get GitHub notifications on my phone. OS-Climate email is also good for things that don't (yet) tie well to issues.

bryonbaker commented 2 years ago

Suggested core requirements - please provide feedback @caldeirav @mersin35 @MichaelTiemannOSC :

{
    "base_currency": "USD",
    "meta": {
        "effective_params": {
            "data_set": "OANDA",
            "date": "2022-04-20",
            "decimal_places": "5",
            "fields": [
                "averages",
                "highs",
                "lows",
                "midpoint"
            ],
            "quote_currencies": [
                "AUD",
                "GBP"
            ]
        },
        "request_time": "2022-04-20T01:58:29+0000",
        "skipped_currencies": []
    },
    "quotes": {
        "AUD": {
            "ask": "1.35650",
            "bid": "1.35622",
            "date": "2022-04-19T23:59:59+0000",
            "high_ask": "1.36154",
            "high_bid": "1.36127",
            "low_ask": "1.35152",
            "low_bid": "1.35124",
            "midpoint": "1.35636"
        },
        "GBP": {
            "ask": "0.76894",
            "bid": "0.76881",
            "date": "2022-04-19T23:59:59+0000",
            "high_ask": "0.77038",
            "high_bid": "0.77027",
            "low_ask": "0.76688",
            "low_bid": "0.76675",
            "midpoint": "0.76888"
        }
    }
}
MichaelTiemannOSC commented 2 years ago

I defer to the Allianz folks to weigh in with what's really needed. I see that Vincent guided toward the idea of handling quotes as an event stream, however in the PCAF case (and many other cases), we are dealing more with reporting systems (that report annually, though in some cases monthly or quarterly) and to my eyes that's much more about open/close/high/low/weighted average for a date than bid/ask at a moment in time.

In the case of SEC DERA data, some of which is reported by foreign companies in non-USD currencies, there's a need to get FX information for their reporting dates, which out of thousands of companies who report every quarter probably resolves to a handful of actually different dates (because most "as-of" dates are the end of whatever month aligns with their fiscal quarter, with most choosing the last date of the month and some choosing some other date). In this case, a query for that handful of date data is all that's required, not an event stream. And if there are several other pipelines that need their own as-of data, it would of course be nice to execute against existing data in Trino before calling out via API to the ECB. Whether this ultimately fills in as daily data or event data is not my call. But I hope I've explained what shapes the requirements I've seen.

caldeirav commented 2 years ago

As a clarification, the use of event streams is suggested because of the need for REST API integration, not because of real time data streaming requirements. If we rely on API calls for the source data, we need a way to load source data in a persisting manner for each API call in order to be have a fully reproducible pipeline in the future (i.e. another API call may not yield the same data). By persisting the result of the API calls in streams, we are able to have an end-to-end fully reproducible data pipeline (we just need to republish the stream).

In this specific use-case, we would be making API calls for reference FX rates and historizing them into Trino, but with ful ability to rebuild the data and run transformations again from the source data.

bryonbaker commented 2 years ago

I thought the idea was to use kafka as a time series database to enable it an an input to more than reporting such as machine learning. I am fine with either approach, but I need clarity on the requirements so I can start designing.

caldeirav commented 2 years ago

Yes we would use kafka as a time series database indeed (what i describe as persisting the result of the API calls in streams). So multiple pipelines consuming some historical rates can consume these via Trino, with full reproducibility.

MichaelTiemannOSC commented 2 years ago

@bryonbaker Could you provide a progress update on this issue at the Data Commons tech check-in meeting tomorrow?

caldeirav commented 2 years ago

@bryonbaker built a standardised interface service we can plug into multiple price sources and feed data into Strimzi: https://github.com/bryonbaker/markets-pricing-service

I will be testing this while we wait for the deployment of Strimzi to be on osc-cluster2. @redmikhail for your information as we have a dependency on this setup

eoriorda commented 2 years ago

Dependent on 160 assigned to Mikhail

MichaelTiemannOSC commented 2 years ago

Looks like https://github.com/os-climate/os_c_data_commons/issues/160 was closed, so we can now close this, yes?

caldeirav commented 2 years ago

Not yet, right now we have plugged in simulated rates for demo purpose. @bryonbaker is now going to work on integrating official rate sources.

bryonbaker commented 2 years ago

@caldeirav - which repo and top-level directory should I build the pricing service? So you get idea of what is being built, this is the initial version I have built for the market simulator in my own repo: https://github.com/bryonbaker/markets-pricing-service

bryonbaker commented 2 years ago

@MichaelTiemann - sorry I missed your request above "@bryonbaker Could you provide a progress update on this issue at the Data Commons tech check-in meeting tomorrow?" Happy to give an update at the next one. This is my first Git Issue for OS Climate. What kind of update is required?

MichaelTiemannOSC commented 2 years ago

@mersin35 has been working on the this repo: https://github.com/os-climate/PCAF-sovereign-footprint

It's received a lot of positive attention, and one of the last steps is to demonstrate access to a federate source for FX rates. Ideally we can replace this approach (https://github.com/os-climate/PCAF-sovereign-footprint/blob/master/notebooks/OECD-EXCH-ingest.ipynb) with an API to the ECB data. The goal of the update would be to provide a warm handoff of what's working RESTfully to him (and to me--I do FX stuff with the SEC DERA data).

caldeirav commented 2 years ago

We should create a brand new repo for ELT of real time market data including FX Pricing data as this is going to be used by multiple pipelines as reference data.

I suggest creating a new repo e.g. market-data-ingestion-service under OS-Climate GitHub organization where we have:

The above will give us extraction / loading for rates straight into Trino which is how we want to do federation in the case of API-based integration. If need for additional data transformation on source data, DBT models and pipelines can also be added as an extension to the repo. Transformation required by specific pipelines (like PCAF) should be done in their own pipeline repo.

caldeirav commented 2 years ago

We agreed that data for this use-case the trino schema will be tied to the provider so we can have multiple FX providers streaming based on their raw format into kafka, based on their own schema, and the ECB provider source would be polled on a regular cadence (daily) through some automation.

eoriorda commented 2 years ago

Need to check with Bryon to see if he is ready to push the ECB @bryonbaker

bryonbaker commented 2 years ago

I am still in Dev. I am retrieving the FX details and just getting it ready to Marshall onto Kafka. I need a few more days before it is ready in Dev.

bryonbaker commented 2 years ago

I am reading the FX data from ECB now. The json format is v e r y complex. Please review and comment - are you going to be able to handle this? Here is an example of yesterday's FX for AUD against EUR: https://gist.github.com/bryonbaker/24b4df73d8cafe6e26b342094b681bd2

toki8 commented 2 years ago

Internal

Hi Bryan,

thanks for your work. Mutlu Ersin, who is working on this piece, is currently on vacation. Back next week.

Best Regards, Thomas

Thomas Kirchherr Head of Sales and Account Management

@.**@.> P +49 89 3800 15117 M +49 160 322 2684

Be green, keep it on the screen.

Von: Bryan Baker @.> Gesendet: Dienstag, 30. August 2022 14:21 An: os-climate/data-platform-demo @.> Cc: Kirchherr, Thomas (IDS GmbH) @.>; Mention @.> Betreff: [EXT] Re: [os-climate/data-platform-demo] Need demo of data pipeline connected to ECB RESTful interface (exchange rates and more) (Issue #51)

I am reading the FX data from ECB now. The json format is v e r y complex. Please review and comment - are you going to be able to handle this? Here is an example of yesterday's FX for AUD against EUR: https://gist.github.com/bryonbaker/24b4df73d8cafe6e26b342094b681bd2

— Reply to this email directly, view it on GitHubhttps://github.com/os-climate/data-platform-demo/issues/51#issuecomment-1231590142, or unsubscribehttps://github.com/notifications/unsubscribe-auth/ASV2E7QIBQZQAZDWX5IOIQTV3X4BTANCNFSM5XH4S6EQ. You are receiving this because you were mentioned.Message ID: @.**@.>>

bryonbaker commented 2 years ago

I discussed this with Vincent and here is the plan of approach...

Originally the idea was to take the entire message from the data source and put on the topic, where data pipelines can pick up all the data and do what they want with it. Prune, combine, join other data sources, ... But when the message is as complicated as this, that approach is not going to work well.

So I will extract the salient data that we want for OS Climate and put that in a new message structure, with a well-defined Trino schema.

Next Steps:

  1. I will identify the attributes to extract and propose the schema in this Issue and get agreement from the team.
  2. Implement and publish
bryonbaker commented 2 years ago

@MichaelTiemann - I am almost there. A few questions:

Thanks

MichaelTiemann commented 2 years ago

I think 1990 is far enough back.

bryonbaker commented 2 years ago

@MichaelTiemann - thanks. Will do. And the other two questions? Which currencies and which base currency (EUR?) Thanks

MichaelTiemannOSC commented 2 years ago

I presume that we can change things as needed. But in the mean time the pint demo for Iceberg (https://github.com/os-climate/data-platform-demo/blob/iceberg/notebooks/pint-demo.ipynb) defines these currencies:

ureg.define("USD = [currency] = $")
ureg.define("EUR = [currency_EUR] = €")
ureg.define("GBP = [currency_GBP] = £")
ureg.define("JPY = [currency_JPY] = ¥")
ureg.define("KRW = [currency_KRW] = ₩")
ureg.define("UAH = [currency_UAH] = ₴")
ureg.define("THB = [currency_THB] = ฿")
ureg.define("TRY = [currency_TRY] = ₺")
ureg.define("INR = [currency_INR] = ₨ = ₹")
ureg.define("MXN = [currency_MXN] = ₱")
ureg.define("ILS = [currency_ILS] = ₪")
ureg.define("AUD = [currency_AUD] = ₳")
ureg.define("RUB = [currency_RUB] = ₽")

We could start with those and update as needed. As for base currency, let's start with EUR and change if needed. I can fix the pint demo to adapt ;-)

bryonbaker commented 2 years ago

Yes. We can adjust. I just want to load all the essentials from day 1

On Sun, 11 Sep 2022 at 6:22 pm, Michael Tiemann @.***> wrote:

I presume that we can change things as needed. But in the mean time the pint demo for Iceberg ( https://github.com/os-climate/data-platform-demo/blob/iceberg/notebooks/pint-demo.ipynb) defines these currencies:

ureg.define("USD = [currency] = $")

ureg.define("EUR = [currency_EUR] = €")

ureg.define("GBP = [currency_GBP] = £")

ureg.define("JPY = [currency_JPY] = ¥")

ureg.define("KRW = [currency_KRW] = ₩")

ureg.define("UAH = [currency_UAH] = ₴")

ureg.define("THB = [currency_THB] = ฿")

ureg.define("TRY = [currency_TRY] = ₺")

ureg.define("INR = [currency_INR] = ₨ = ₹")

ureg.define("MXN = [currency_MXN] = ₱")

ureg.define("ILS = [currency_ILS] = ₪")

ureg.define("AUD = [currency_AUD] = ₳")

ureg.define("RUB = [currency_RUB] = ₽")

We could start with those and update as needed. As for base currency, let's start with EUR and change if needed. I can fix the pint demo to adapt ;-)

— Reply to this email directly, view it on GitHub https://github.com/os-climate/data-platform-demo/issues/51#issuecomment-1242915324, or unsubscribe https://github.com/notifications/unsubscribe-auth/ACHUARXTBUN7QKW3T4G2YJ3V5WJEXANCNFSM5XH4S6EQ . You are receiving this because you were mentioned.Message ID: @.***>

--

========================== Bryon Baker Ph.: +61(0)457531202 Email: @.***

bryonbaker commented 2 years ago

I have this running in dev. It is publishing to the ecb-fx topic in the fx-pricing-dev cluster. It is loading all the above currencies from 1999 right now.

Sample data format is:

{"key":"EXR.D.INR.EUR.SP00.A","freq":"D","currency":"INR","currency-denom":"EUR","sender":"ECB","exr-type":"SP00","exr-suffix":"A","starttime-period":"2022-08-08T00:00:00.000+02:00","end-time-period":"2022-08-08T23:59:59.999+02:00","obs-value":81.166,"obs-status":"A","time-format":"P1D"}
{"key":"EXR.D.INR.EUR.SP00.A","freq":"D","currency":"INR","currency-denom":"EUR","sender":"ECB","exr-type":"SP00","exr-suffix":"A","starttime-period":"2022-08-09T00:00:00.000+02:00","end-time-period":"2022-08-09T23:59:59.999+02:00","obs-value":81.406,"obs-status":"A","time-format":"P1D"}
{"key":"EXR.D.INR.EUR.SP00.A","freq":"D","currency":"INR","currency-denom":"EUR","sender":"ECB","exr-type":"SP00","exr-suffix":"A","starttime-period":"2022-08-10T00:00:00.000+02:00","end-time-period":"2022-08-10T23:59:59.999+02:00","obs-value":81.468,"obs-status":"A","time-format":"P1D"}
{"key":"EXR.D.INR.EUR.SP00.A","freq":"D","currency":"INR","currency-denom":"EUR","sender":"ECB","exr-type":"SP00","exr-suffix":"A","starttime-period":"2022-08-11T00:00:00.000+02:00","end-time-period":"2022-08-11T23:59:59.999+02:00","obs-value":82.2845,"obs-status":"A","time-format":"P1D"}

@HumairAK I raise an issue #203 to get the trino connector set up. https://github.com/os-climate/os_c_data_commons/issues/203

bryonbaker commented 1 year ago

The coding is done. Waiting on the Trino configuration

bryonbaker commented 1 year ago

Needs the Kafka topic issue # 203 to be done.

caldeirav commented 1 year ago

Dependent on https://github.com/os-climate/os_c_data_commons/issues/203

bryonbaker commented 1 year ago

@MichaelTiemann I have loaded the topic with all of the currencies above since 1990. (It is loading now) There is a daily cron job that will pick up the FX for the previous day. Have fun!

MichaelTiemannOSC commented 1 year ago

Super!