Closed hayanobiton closed 4 years ago
ご報告ありがとうございます、
ご報告いただいたソースを拝見するかぎり、 factory.GetOrderBookSource(BfProductCode.FXBTCJPY).Select(data=> orderBook.GetSnapshot(100)).Subscribe(bookorder =>
とした上で、以降の処理で data に対して処理を行っていますが、 GetSnapshot(100) で取得したスナップショット(bookorder)ではなく、板全体(買気配:300、売気配:300)に対してSQLクエリを作成しているように見えます。
factory.GetOrderBookSource(BfProductCode.FXBTCJPY).Select(ob => ob.GetSnapshot(100)).Subscribe(snapshot => { foreach (var ask in snapshot.Asks) { // SQL 作成処理 } foreach (var bid in snapshot.Bids) { // SQL 作成処理 } });
のようにされてみてはいかがでしょうか。
※Orderbook自身は処理速度とメモリ利用効率の観点から、配信毎にコピーを作成せず、単一のインスタンスを更新し続ける設計になっています。アプリケーションは、別スレッドで処理中に中身が書き換わらないよう、Snapshotを作成することで、スレッドセーフに処理を行うことができます。
OrderBookSourceからの呼出処理は単一スレッドで行われているため、サーバーが板情報を配信する頻度よりもSubscribe()内部での処理が遅いと、目詰まりを起こしてWebSocketが切断したり、ご報告いただいたような、Snapshot作成中に、元の板情報が更新されてしまうといった現象が発生します。 データベース更新のようなある程度処理時間のかかる処理を行う場合には、板情報のキューイングを検討されてみてはいかがでしょうか。
例)
var queue = new ConcurrentQueue
Task.Run(() => { var result = queue.TryDequeue(out BfOrderBookSnapshot snapshot); // SQL作成、データベース書込等の処理 }, null);
お試しください。
補足: 処理開始数分に例外が発生するとのことですが、ガベージコレクションの発生により、Subscribe() 内の処理が間に合わなくなっているものと思われます。前述のキューイングを行ったり、SQL作成にStringBuilderを使用するなどの対策をご検討ください。
暫定的に対策を行ってみました。併せてご確認下さい。
お返事ありがとうございます。
1.もともとは、
var messageQueue = new ConcurrentQueue<BfOrderBookSnapshot>();
factory.GetOrderBookSource(BfProductCode.FXBTCJPY).Select(obj=> obj.GetSnapshot(100)).Subscribe(snapshot =>
{
messageQueue.Enqueue(snapshot);
Task.Run(() =>
{
UpdateInmemoryBoard(messageQueue);
});
}).AddTo(_disposables);
のようにQueueを使っていました。
こちらを「Microsoft Visual Studio Community 2019 Preview」でデバックすると、 TakeLast.csというファイルが開き、
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT License.
// See the LICENSE file in the project root for more information.
using System.Collections.Generic;
namespace System.Linq
{
public static partial class EnumerableEx
{
#if !(REFERENCE_ASSEMBLY && (NETCOREAPP2_0 || NETSTANDARD2_1))
/// <summary>
/// Returns a specified number of contiguous elements from the end of the sequence.
/// </summary>
/// <typeparam name="TSource">Source sequence element type.</typeparam>
/// <param name="source">Source sequence.</param>
/// <param name="count">The number of elements to take from the end of the sequence.</param>
/// <returns>Sequence with the specified number of elements counting from the end of the source sequence.</returns>
public static IEnumerable<TSource> TakeLast<TSource>(this IEnumerable<TSource> source, int count)
{
if (source == null)
throw new ArgumentNullException(nameof(source));
if (count < 0)
throw new ArgumentOutOfRangeException(nameof(count));
return TakeLastCore(source, count);
}
#endif
private static IEnumerable<TSource> TakeLastCore<TSource>(IEnumerable<TSource> source, int count)
{
if (count == 0)
{
yield break;
}
var q = new Queue<TSource>(count);
foreach (var item in source)
{
if (q.Count >= count)
{
q.Dequeue();
}
q.Enqueue(item);
}
while (q.Count > 0)
{
yield return q.Dequeue();
}
}
}
}
の
foreach (var item in source)
で、Collection was modified after the enumerator was instantiated. の例外が発生しました この例外が、自分のQueue処理によるものなのか、スナップショットを取得によるものか、切り分けるためQueueを使わないようにしましたが、同様な例外が発生しましたの、ご質問を致しました。
2.ソースを最新にしましたが、同様なエラーは発生しました。 そのため、何かアドバイスを頂ければ幸いです。
例外メッセージから推察すれば、
public static IEnumerable
追加した修正は、lockを使用して GetSnapshot() の最中に OrderBook を書き換えられないようにしたもので、それでも同じ例外が出るということであれば、現状対策が思いつきません。
Visual Studio 2019 Preview をご利用のとのことですので、一度最新版に更新してみてから再度試されてみてはいかがでしょうか。
RealtimeApiTests を更新しました。最新ソースを取得の上、ご利用の開発環境で正常に動作するかをご確認いただけます。 起動後、メニューから、OrderBook もしは Board を選択して下さい。
当方の開発環境 Windows10 / Visual Studio 2019 Professional 16.7.1 で 10分程度連続稼働させましたが、特にエラーは発生しませんでした。
*OrderBook test を、GetSnapshot を使用するように変更しました。
修正をして頂きありがとうございます。
最新のソースで、RealtimeApiTestsのBoard、および自分のプログラムで、30分動作しましたが、例外エラーは発生しませんでした。
お時間を頂き、本当にありがとうございました。
いつもライブラリを使わせて頂きありがとうございます
Realtime API Public Channelsの GetOrderBookSourceを使用して板情報をDBへ格納しておりました
バージョンが上がってから、
factory.GetOrderBookSource(BfProductCode.FXBTCJPY).Select(data=> orderBook.GetSnapshot(100)).Subscribe(bookorder => { var sql = ""; for (int n = 0; n < data.Asks.Count(); n++) { sql += "(" + data.Asks[n].Price + "," + data.Asks[n].Size + ",'ask'),"; } for (int n = 0; n < data.Bids.Count(); n++) { sql += "(" + data.Bids[n].Price + "," + data.Bids[n].Size + ",'bid'),"; } if (sql.Length > 0) { System.Diagnostics.Debug.WriteLine("REPLACE INTO inmemory VALUES" + sql.TrimEnd(',')); } }).AddTo(_disposables);
を実行すると、
BfOrderBookSnapshot.cs
29行目、もしくは、30行目 asks.Take(size).ForEach(e => _asks.Add((e.Key, e.Value))); bids.TakeLast(size).ForEach(e => _bids.Add((e.Key, e.Value))); のところで、
例外がスローされました: 'System.InvalidOperationException' (System.Collections.dll の中) Collection was modified after the enumerator was instantiated. が発生し異常終了します。
ただ、例外が発生する時間は異なり、初め問題なく動いていても、5~10分後に例外が発生します。
以前のBfOrderBookSnapshot.csがないときのバージョンでは、このような例外は発生しませんでした