Open EngieAntoine opened 1 year ago
Hello Antoine,
Thank you for the precise report. I think I now understand what is going on: our case analysis is missing a branch. We do not address the autotrophic operators and take the wrong turn as noted.
It should be possible to write a test in the refinery repo using an ad-hoc autotrophic operator, showing that it does not work. With such a test (which in theory anyone can write, esp. since tshistory_formula contains many such tests), adding a working solution should then be relatively easy.
"anyone can write" but not me ^^ I found the way to create autotrofix serie with the following syntax :
@func('auto_serie', auto=True)
def auto_serie(__interpreter__,
__from_value_date__,
__to_value_date__,
__revision_date__) -> pd.Series:
return pd.Series(
[10] * 24,
index=pd.date_range(utcdt(2020, 1, 1), periods=24, freq='H')
)
tsh.register_formula(
engine,
'autotrophic_serie',
'(auto_serie)'
)
but I don't how to simulate updates on specific insertion_date. Could you help me plz ?
Good question. I think the simplest way would be for the auto operator to use an underlying "normal" (stored) series named e.g. "upstream", which can be then easily updated. That should allow to simulate upstream changes.
@func('auto_serie', auto=True)
def auto_serie(__interpreter__,
__from_value_date__,
__to_value_date__,
__revision_date__) -> pd.Series:
i = __interpreter__
return i.tsh.get(
i.cn,
'upstream',
revision_date=__revision_date__,
from_value_date=__from_value_date__,
to_value_date=__to_value_date__
)
Here the unit test for the autotrophic serie :
def test_autotrophic_serie_in_cache(engine, tsa):
tsa.delete('autotrophic_series')
tsa.delete('upstream')
tsh = tsa.tsh
# 1. create autotrophic series with a series we can update for the test
@func('auto_series', auto=True)
def auto_series(__interpreter__,
__from_value_date__,
__to_value_date__,
__revision_date__) -> pd.Series:
i = __interpreter__
return i.tsh.get(
i.cn,
'upstream',
revision_date=__revision_date__,
from_value_date=__from_value_date__,
to_value_date=__to_value_date__
)
@finder('auto_series')
def custom(cn, tsh, tree):
return {
'I HAVE A NAME FOR DISPLAY PURPOSES': tree
}
@insertion_dates('auto_series')
def autotrophic_idates(cn, tsh, tree,
from_insertion_date=None, to_insertion_date=None,
from_value_date=None, to_value_date=None):
idates= tsh.insertion_dates(
cn,
'upstream',
from_insertion_date= from_insertion_date,
to_insertion_date=to_insertion_date,
from_value_date=from_value_date,
to_value_date=to_value_date,
nocache=True
)
return idates
# 2. assign first values with insertion date = 2022-1-1
plain_ts = pd.Series(
[1] * 7,
index=pd.date_range(
start=datetime(2014, 12, 31),
freq='D',
periods=7,
)
)
tsa.update('upstream',
plain_ts,
'Babar',
insertion_date=pd.Timestamp('2022-1-1', tz='UTC')
)
tsh.register_formula(
engine,
'autotrophic_series',
'(auto_series)'
)
# 3. get the data from the autotrophic series
result_without_cache = tsh.get(engine, 'autotrophic_series')
# 4. set a new cache policy
cache.new_policy(
engine,
'a-policy',
'(date "2022-1-1")',
'(shifted now #:days -10)',
'(shifted now #:days 10)',
'0 1 * * *',
'0 8-18 * * *',
namespace=tsh.namespace
)
cache.set_policy(
engine,
'a-policy',
'autotrophic_series',
namespace=tsh.namespace
)
r = cache.series_policy_ready(
engine,
'autotrophic_series',
namespace=tsh.namespace
)
assert r == False
p = cache.series_policy(
engine,
'autotrophic_series',
namespace=tsh.namespace
)
assert p == {
'name': 'a-policy',
'initial_revdate': '(date "2022-1-1")',
'look_after': '(shifted now #:days 10)',
'look_before': '(shifted now #:days -10)',
'revdate_rule': '0 1 * * *',
'schedule_rule': '0 8-18 * * *'
}
# 5. initial cache load with our autotrophic_series series
cache.refresh_series(
engine,
tsa,
'autotrophic_series',
final_revdate=pd.Timestamp('2022-1-5', tz='UTC')
)
cache.set_policy_ready(engine, 'a-policy', True, namespace=tsh.namespace)
r = cache.series_policy_ready(
engine,
'autotrophic_series',
namespace=tsh.namespace
)
assert r
# 6. check that cached series and non cached series are the same
result_from_cache = tsh.get(engine, 'autotrophic_series')
assert str(result_without_cache) == str(result_from_cache)
# 7. update the upstream series on insertion 1st feb with values 2 instead of 1
plain_ts = pd.Series(
[2] * 7,
index=pd.date_range(
start=datetime(2022, 1, 25),
freq='D',
periods=7,
)
)
tsa.update('upstream',
plain_ts,
'Babar',
insertion_date=pd.Timestamp('2022-2-1', tz='UTC')
)
# 8. check that series in not the same in and out the cache
result_without_cache = tsh.get(engine, 'autotrophic_series', nocache=True)
result_from_cache = tsh.get(engine, 'autotrophic_series')
assert str(result_without_cache) != str(result_from_cache)
# 9. refresh series in cache
cache.refresh_series(
engine,
tsa,
'autotrophic_series'
)
# 10 check that now, the series is updated
result_from_cache = tsh.get(engine, 'autotrophic_series')
assert str(result_without_cache) == str(result_from_cache)
# release series
tsa.delete('autotrophic_series')
tsa.delete('upstream')
tell me if it's ok for you
update: I add a definition for the finder (needed for find_series method)
mmm I missed the insertion_date of the autotrophic series. update on previous code done
I fixed the update (step 7) and it's work now.... :/ Did I miss something ? or perhaps the problem come from cronos operator ?
Question: if you put a print in cache.py ~454 do we pass there ?
Then, indeed the cronos operator insertion_dates
implementation might have issues. It is alas complex and should be investigated. One option is to move the test you have written to the data_hub repo and adapt the test to a cronos series very much like the one you have that fails. The closer from the prod the better.
Yes I pass on the
# delegate to other instance
idates += tsa.insertion_dates(
ho I see in my previous picture that I have the "nocache=True" fix. When I removed it, it's failed \o/
Ok, we have a good start :) Can you commit this and push it ? I'll hopefully be able to work on a solution from this.
Thanks, I'll have a look soon.
I've pushed a commit with a tentative fix. Can you examine and possibly try it where it applies ? Note that the original commit also has been slightly amended.
Tks. I push the fix on our preprod env and will see tomorrow if it's correct or not. We also have an other problem and trying to make an other TU (seem difficult to reproduce) but I will create an other issue
Unfortunately, it seems not working. We still have some series that don't update at all. After investigating a little, it seem the problem come from the dbcache used on singularity formula for the idate. I reset it and try again...
Indeed, that cache in singularity_formula might hide some nasties.
isn't normal that from_insertion_date and to_insertion_date are not precised on the bug fix you give ?
if idatefunc:
# import pdb;pdb.set_trace()
idates += idatefunc(
cn,
tsh,
# localtree given in full: all params will be needed
# to build a proper series id
localtree,
>> from_insertion_date = from_insertion_date,
>> to_insertion_date = to_insertion_date
)
continue
Oops, indeed. I've pushed a new version of that same commit.
Hello,
sur la V3.10, on observe des séries (qui sont dans un cache policy) qui ne se mettent pas à jour. En regardant dans le détails, le problème semble venir des insertions dates qui ne se mettent pas à jour.
voici un exemple de formule qui ne se met pas à jour :
gas.de.storage.injections.7fields.oge.haiming_2_7f.gwhd.obs ->
en ajoutant des logs dans tshistory_refinery/cache.py/line 454, on voit que l'on tombe dans le cas où la série feuille (sname) (ici cronos "5c16d725-b673-41ca-8a18-982c71b77497") n'existe pas (ce qui est normal dans notre cas). Du coup, on va essayer d'aller chercher les insertions dates du parent (name) mais avec le nocache=False par default. En mettant nocache=True, cela semble résoudre le problème mais comme on n'est pas sûr d'avoir compris ce que l'on essayait de faire dans cette méthode, peut-être que l'on aurait un effet de bord ?
Voici le code avec la modification que l'on teste (cache.py/454)
Qu'est-ce que tu penses de ça ? est-ce que l'on pourrait se faire un ptit call pour comprendre l'idée ?