Closed benyamin-codez closed 5 months ago
Yeah, in hindsight this was not the best solution to the previous problem.
I'm in the process of changing all requests to use a single connection pool, with a single session per thread reusing the same connection pool, with the threads being managed in a FIFO fixed length queue. Should fix this problem.
I looked at queue
as an alternative too. It would certainly be more elegant.
I didn't go with it because at the time I wasn't sure it would have solved the problem. In my environment the root cause could have been any number of things.
I also found thread failure is sporadic with a decreasing frequency with each script run. In fact, the original issue can also be overcome by hitting Esc
and manually retrying somewhere between 3 and 10 times. The number of retries required for success decreases. Similarly, with the workaround in place, the log shows a reduction in errors with every run. Eventually the log will not show any threading errors. After some time, this resets and the failure frequency returns to what it was on first run. I considered maybe some garbage collection was occurring or a connection was timing out.
In any case, I think queue
will probably fix this for me too.
I also found thread failure is sporadic with a decreasing frequency with each script run. In fact, the original issue can also be overcome by hitting
Esc
and manually retrying somewhere between 3 and 10 times. The number of retries required for success decreases. Similarly, with the workaround in place, the log shows a reduction in errors with every run. Eventually the log will not show any threading errors. After some time, this resets and the failure frequency returns to what it was on first run. I considered maybe some garbage collection was occurring or a connection was timing out.
Hmm the queue
by itself may not be enough. Will probably need to catch the RuntimeError
as you have done, and implement some back-off functionality to the filling.
Can you add the following before the time.sleep
call in your workaround? While the current code has no bounds on the number of Thread
s that are spawned, which is obiously problematic for large subscription lists or resource constrained devices, it would be interesting to quantify the issue.
_context.log_debug(f'{threading.active_count() = }')
_context.log_debug(f'{threading.enumerate() = }')
Yeah, I have approx 215 subs in my feed - not sure if that would be considered "large" though... I guess "large" is a bit like "soon" and somewhat subjective. 8^d
Just to be clear, I did check system proc limits with ulimit -u
and also ps -fLu osmc | wc -l
during the event, and didn't really see any issues. Without debugging Python's threading.py it would be hard to discern why the thread actually wasn't starting. I did go down this path but there was really too much noise to discern what might be happening.
I dropped to the _context.log_error
level and added the enum and count you requested.
Please see the attached log snip. I hope it is of some value to you.
20231102_youtube_threads.log
I still think it is a race condition, but couldn't say where or why...
Thanks for that.
Seems like there are two inter-related problems.
A large number of threads are spawned all at once, possibly hitting the per process thread limit based on how much system memory you have and max thread stack size. Unfortunately I am not sure on the specifics of how this is exactly determined, but Python now defaults to limiting the max threads it opens in a thread pool to be your CPU core count + 4 up to a max of 32 i.e. a range of 5 to 32, so will probably do the same.
The second problem is that some threads are closing in a timely manner, but others are not. The flood of connections may be causing your network to choke or for Youtube to be throttling or rate limiting responses. Will likely not be an issue when using a more reasonable amount of threads.
The second problem is that some threads are closing in a timely manner, but others are not. The flood of connections may be causing your network to choke or for Youtube to be throttling or rate limiting responses. Will likely not be an issue when using a more reasonable amount of threads.
It might be worth mentioning that the list does usually load within 30 seconds, despite the use of thread.join(30)
, suggesting that none of the threads are timing out. It could be a race between connection setup for each thread? I do very occasionally (very rarely) see 404s from YT on a couple of random channel_ids which might be an indication I am hitting some rate limiting limits, so perhaps YT is throttling responses also.
The join
method, with a specified timeout, doesn't really do anything special if the thread has not completed within the timeout period. It is simply blocking the main thread and giving the child thread the opportunity to complete, but if it doesn't end, or ends early, then the main thread will still continue as normal.
The original problem was that the large number of simultaneous requests was causing a large number of connections to be created, which in turn caused the SSL handshake of the connections to fail, without being caught.
The original PR fixed the problem with multiple simultaneous connections by using a single session and connection pool, and also limiting the number of concurrent connections and reusing them when the requests were complete. However it didn't check if the requests were timing out or otherwise failing, and the original code would block indefinitely waiting for the threads to complete: https://github.com/anxdpanic/plugin.video.youtube/pull/478/files#diff-5ecde54fa534f31c07c435544e8497cb462674901a9d688be492bfbbde50b579R824-R852
My idea at the time was to add timeouts to the requests, and the 30s thread.join
timeout value is based on the request timeout period. That's why you don't see the threads hanging even if join
doesn't do much, because the requests themselves have terminated, in a specific effort to make sure that Kodi wouldn't hang.
I just didn't think this through enough to realise the issue with creating the threads in the first place, as I don't really use this particular functionality.
Thanks for the explanation @MoojMidge.
I see there's been some further improvements in that section of code since the pull you referenced (#478), such as the timeout tupple on the session.get
, the subsequent error handling, and the join
timeout catch-all we discussed above.
Looking a little deeper, I see the HTTPAdapter (currently at line 828 in master), i.e. adapter = requests.adapters.HTTPAdapter(pool_maxsize=5, pool_block=True)
divides the number of requests from child threads into 5 per session (via pool_maxsize=5
) and the adapter blocks further requests from the thread pool until a connection is available (via pool_block=True
). I'm fairly certain the use of pool_block=True
is the root cause of the problem.
Normally pool_maxsize
would be equal to the number of child threads spawned so that new sessions are not created. The trouble here is we cannot forsee how many threads will be created. Despite much errata to the contrary, when pool_block=False
and the the pool_maxsize
is reached a new session is created, apparently using the same TCP connection. As these sessions close, they are consolidated into the one pool. When pool_block=True
these new sessions cannot be started, leading to the exception being raised.
It's probably worth mentioning the HTTPAdapter default pool_connections=10
will only ever use 1 pool when matching www.youtube.com. There might be some value to specifying pool_connections=1
when instantiating the adapter.
All of the following don't produce any errors:
adapter = requests.adapters.HTTPAdapter(pool_maxsize=300, pool_block=True)
adapter = requests.adapters.HTTPAdapter(pool_maxsize=5)
adapter = requests.adapters.HTTPAdapter(pool_connections=1, pool_maxsize=5)
adapter = requests.adapters.HTTPAdapter(pool_connections=1, pool_maxsize=30)
I'd like to confirm what is happening in urllib3 but I'm having some trouble enabling debug logging for same. Any hints?
Not really something I know too much about, but I'll try to respond based on my current understanding of the issue. Note that I may interchangeably refer to parts of the urllib3
and requests
API (requests
more predominantly because it is what is directly used in this plugin), but hopefully the API and terminology should be sufficiently similar to be able to follow either way.
I'm fairly certain the use of
pool_block=True
is the root cause of the problem.
Yes and no. Yes - it can prevent the remaining threads, with the pending requests, from ending, while waiting for the requests currently in the pool to complete. No - I don't think it is the root cause of the issue. See below.
Normally
pool_maxsize
would be equal to the number of child threads spawned so that new sessions are not created. The trouble here is we cannot forsee how many threads will be created. Despite much errata to the contrary, whenpool_block=False
and the thepool_maxsize
is reached a new session is created, apparently using the same TCP connection. As these sessions close, they are consolidated into the one pool. Whenpool_block=True
these new sessions cannot be started, leading to the exception being raised.
I don't think this is correct. There is only one Session
instance being created. The ConnectionPool
instances are not responsible for creating the Session
instances, it is the other way around, with the Session
creating adapters as required, which in turn create a ConnectionPool
instance as required, depending on the number of different hosts that are being connected to, up to a maximum default of 10 based on the pool_connections=10
parameter.
By default, each individual ConnectionPool
created for a Session
will only reuse one HTTPConnection
after the request is completed, for use by subsequent requests, based on the default pool_maxsize=1
parameter.
If pool_block=False
then there is no limit on the number of connections that can be made to a specific host, however only the number specified by pool_maxsize
will be returned to the ConnectionPool
for reuse.
If pool_block=True
then the creation of additional HTTPConnection
instances will be blocked to limit the total number in the ConnectionPool
to be no greater than pool_maxsize
. If this is not used, then the original problem can re-occur, because the number of connections are not limited, however only pool_maxsize
connections will be reused, with additional HTTPConnection
instances created for the additional threads that are created.
Because the additional connections are being prevented from being created, this causes the threads to stay running for longer, however all four example adaptor instances that didn't produce error for you, are all essentially doing the same thing - creating new connections for all 215 subscriptions you have, all at once, which is what was causing the original problem for other people.
It is just a trade-off between managing network resources and CPU/memory resources.
It's probably worth mentioning the HTTPAdapter default
pool_connections=10
will only ever use 1 pool when matching www.youtube.com. There might be some value to specifyingpool_connections=1
when instantiating the adapter.
That's right, but it shouldn't create new pools unless required. However, from what I can tell requests.Session
is not thread safe and while it may not make a difference when only connecting to a single host, I think what should be done more generally is to use a common HTTPAdapter
across multiple Session
instances, which will create a thread safe ConnectionPool
instance per host, as required by the requests that are made using the Session
.
This is what I am intending to do, because while the issue with fetching subscriptions results in connections to only a single host, there are multiple other requests being made to different Google/Youtube hosts at various times in this plugin, that will all benefit from using a unified request mechanism.
In this way connections can be reused throughout the plugin, which should both be faster, while also reducing memory and socket usage, and also preventing Kodi from hanging when there are network issues. For this, the default pool_connections=10
seems appropriate, along with pool_maxsize=10
and pool_block=True
.
This is currently a WIP, the common module is done and works pretty well across the plugin. The next step is to finish the thread management.
I'd like to confirm what is happening in urllib3 but I'm having some trouble enabling debug logging for same. Any hints?
Adding the following to youtube.py
(or anywhere else really) should do the trick.
import logging
logging.basicConfig()
logging.getLogger("urllib3").setLevel(logging.DEBUG)
My understanding is that requests
now requires urllib3
. If you look at script.module.requests/lib/requests/init.py you will see it imports urllib3
and script.module.requests/lib/requests/adapters.py utilises a urllib3.poolmanager
to instantiate the HTTPAdapter
.
Unfortunately, I found your suggestion above for urllib3
logging clobbered the kodi log, so I added requests.urllib3.add_stderr_logger()
near the top of youtube.py
instead. The function is defined in script.module.urllib3/lib/init.py with a default of level=logging.DEBUG
. This made it pretty easy to work out what was happening.
You are correct that a new Session
is not instantiated, but rather a new HTTPConnection
until pool_maxsize
is reached. Whilst requests.Session()
creates default adapters when initialized, in our case the session object is mounted with our custom HTTPAdapter
.
When pool_block=False
each HTTPConnection
created above pool_maxsize
will return to the ConnectionPool
for reuse when a slot is freed up following the closure of a HTTPConnection
already in the pool. In this way any "extra" HTTPConnection
can be returned for use in the pool. I erroneously conflated "session" with HTTPConnection
, which is clearly a TCP connection.
Just to be clear, in my testing with pool_block=False
, pool_maxsize=5
resulted in 30 connections and pool_maxsize=20
resulted in 36 connections. So the use of the HTTPAdapter
is certainly an improvement in any case.
FYI, release 1.26.16 of urllib3
made poolmanager
threadsafe. Also the default for pool_maxsize
is 10. I note the comments from @cas-- as to why he chose pool_maxsize=5
in #280 / #478... However, in my case the default seems to be a bit of a sweet spot instead.
I also think that consolidating the request mechanics into a unified function is a great idea. Given the defaults, requests.adapters.HTTPAdapter(pool_block=True)
should be sufficient for instantiating the HTTPAdapter
, although you may want to consider using urllib3.util.retry
for max_retries
.e.g. max_retries=urllib3.util.retry(total=3, backoff_factor=0.1, status_forcelist=[500, 502, 503, 504])
. Are you intending to modify _request()
in youtube.py
?
I note the max_retries
parameter above cannot resolve the thread.start()
issue as the the thread must start before the max_retries
functionality can be of any effect.
I think queue
could work, but won't be as efficient. It is probably overkill as the child threads don't need to share data. It would seem a thread.start()
retry mechanism is still required in any case...
FYI, "final" diff:
--- ./plugin.video.youtube-7.0.2.2.matrix.1_release/resources/lib/youtube_plugin/youtube/client/youtube.py
+++ ./plugin.video.youtube-7.0.2.2.matrix.1_wip/resources/lib/youtube_plugin/youtube/client/youtube.py
@@ -17,6 +17,8 @@
import requests
+from time import sleep
+from urllib3.util import Retry
from .login_client import LoginClient
from ..youtube_exceptions import YouTubeException
from ..helper.video_info import VideoInfo
@@ -26,6 +28,7 @@
_context = Context(plugin_id='plugin.video.youtube')
+#requests.urllib3.add_stderr_logger()
class YouTube(LoginClient):
def __init__(self, config=None, language='en-US', region='US', items_per_page=50, access_token='', access_token_tv=''):
@@ -825,7 +828,15 @@
session = requests.Session()
session.headers = headers
session.verify = self._verify
- adapter = requests.adapters.HTTPAdapter(pool_maxsize=5, pool_block=True)
+ retries = Retry(
+ total=3,
+ backoff_factor=0.1,
+ status_forcelist=[500, 502, 503, 504]
+ )
+ adapter = requests.adapters.HTTPAdapter(
+ pool_block=True,
+ max_retries=retries
+ )
session.mount("https://", adapter)
responses = []
@@ -847,7 +858,21 @@
responses)
)
threads.append(thread)
- thread.start()
+ for _ in range(5):
+ try:
+ thread.start()
+ except Exception as e:
+ err = e
+ _context.log_error('Failed to start thread ' + str(len(threads)) + ' for channel_id ' + channel_id)
+ #_context.log_debug(f'{threading.active_count() = }')
+ #_context.log_debug(f'{threading.enumerate() = }')
+ sleep(0.15)
+ continue
+ else:
+ _context.log_debug('Success starting thread ' + str(len(threads)) + ' for channel_id ' + channel_id)
+ break
+ else:
+ raise err
for thread in threads:
thread.join(30)
EDIT: Removed threadcount
... Note extra debugging for urllib3
and threading
is commented out.
EDIT: Cleaned up time.sleep
. Sleeping for 0.15s is sufficient, i.e. no discernible difference with 0.3s but must be > 0.1s.
A large number of threads are spawned all at once, possibly hitting the per process thread limit based on how much system memory you have and max thread stack size. Unfortunately I am not sure on the specifics of how this is exactly determined, but Python now defaults to limiting the max threads it opens in a thread pool to be your CPU core count + 4 up to a max of 32 i.e. a range of 5 to 32, so will probably do the same.
@MoojMidge , I've been playing with concurrent.futures.ThreadPoolExecutor()
and think it is a bit more reliable for a range of potential problems. For the OP problem, i.e. a new thread cannot be started, it appears the max_workers
argument must be manually set very high to reliably reproduce the problem. It appears to never occur when max_workers
is less than or equal to the HTTPAdapter
argument pool_maxsize
. From what I've seen it does a pretty good job of not stuffing the HTTPAdapter
with too many threads at once and as such avoids the problem.
I'm just doing some more testing and will trim my edits before posting here...
Ok, so using concurrent.futures.ThreadPoolExecutor()
I wasn't able to reproduce the problem using the default values for the concurrent.futures.ThreadPoolExecutor._max_workers property
, which is 8 on a Raspberry Pi 3 B+, nor with the maximum max_workers
default of 32 (which I manually specified in order to test). I could reproduce the problem with some frequency when using a max_workers
value between 300 to 500. At 1500 the problem was reliably reproducible.
This is all predicated on the HTTPAdapter
using the default pool_maxsize
of 10.
Seeing the HTTPAdapter
I propose includes the urllib3
retry mechanism, in the end I didn't see a need to include retry mechanics for the actual fetch. The same applies to thread creation as the concurrent.futures.ThreadPoolExecutor()
appears to be very reliable. As such, I think the following is sufficient:
--- CodeSafe/__dev/kodi/youtube/plugin.video.youtube-7.0.2.2.matrix.1_release/resources/lib/youtube_plugin/youtube/client/youtube.py
+++ CodeSafe/__dev/kodi/youtube/plugin.video.youtube-7.0.2.2.matrix.1_wip/resources/lib/youtube_plugin/youtube/client/youtube.py
@@ -12,11 +12,13 @@
import json
import re
import threading
+import concurrent.futures
import traceback
import xml.etree.ElementTree as ET
import requests
+from urllib3.util import Retry
from .login_client import LoginClient
from ..youtube_exceptions import YouTubeException
from ..helper.video_info import VideoInfo
@@ -825,11 +827,14 @@
session = requests.Session()
session.headers = headers
session.verify = self._verify
- adapter = requests.adapters.HTTPAdapter(pool_maxsize=5, pool_block=True)
+ retries = Retry(total=3, backoff_factor=0.1, status_forcelist=[500, 502, 503, 504])
+ adapter = requests.adapters.HTTPAdapter(pool_block=True, max_retries=retries)
session.mount("https://", adapter)
responses = []
- def fetch_xml(_url, _responses):
+ def fetch_xml(_e_args):
+ _url = f'https://www.youtube.com/feeds/videos.xml?channel_id={_e_args[0]}'
+ _responses = _e_args[1]
try:
_response = session.get(_url, timeout=(3.05, 27))
_response.raise_for_status()
@@ -839,18 +844,35 @@
return
_responses.append(_response)
- threads = []
- for channel_id in sub_channel_ids:
- thread = threading.Thread(
- target=fetch_xml,
- args=('https://www.youtube.com/feeds/videos.xml?channel_id=' + channel_id,
- responses)
- )
- threads.append(thread)
- thread.start()
-
- for thread in threads:
- thread.join(30)
+ def tpool_handler():
+ with concurrent.futures.ThreadPoolExecutor() as fetch_tpool:
+ e_args = []
+ fetch_args = []
+ for channel_id in sub_channel_ids:
+ e_args = [channel_id, responses]
+ fetch_args.append(e_args)
+ try:
+ futures = {fetch_tpool.submit(fetch_xml, task): task for task in fetch_args}
+ except Exception as e:
+ fetch_tpool.shutdown(cancel_futures=True)
+ return e
+ else:
+ while len(futures) > 0:
+ pend_futures = {}
+ done, not_done = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_COMPLETED)
+ for tsk in done:
+ if tsk.exception(): raise tsk.exception()
+ for tsk in not_done:
+ task = futures[tsk]
+ pend_futures[tsk] = task
+ futures = pend_futures
+ return True
+
+ tp_status = tpool_handler()
+ if not tp_status == True:
+ _context.log_error('FATAL : Failed to start Channel XML Fetch ThreadPool executor or component.')
+ raise tp_status
+
session.close()
for response in responses:
Here it is again with some error and minor debug logging:
--- CodeSafe/__dev/kodi/youtube/plugin.video.youtube-7.0.2.2.matrix.1_release/resources/lib/youtube_plugin/youtube/client/youtube.py
+++ CodeSafe/__dev/kodi/youtube/plugin.video.youtube-7.0.2.2.matrix.1_wip/resources/lib/youtube_plugin/youtube/client/youtube.py
@@ -12,11 +12,13 @@
import json
import re
import threading
+import concurrent.futures
import traceback
import xml.etree.ElementTree as ET
import requests
+from urllib3.util import Retry
from .login_client import LoginClient
from ..youtube_exceptions import YouTubeException
from ..helper.video_info import VideoInfo
@@ -825,11 +827,14 @@
session = requests.Session()
session.headers = headers
session.verify = self._verify
- adapter = requests.adapters.HTTPAdapter(pool_maxsize=5, pool_block=True)
+ retries = Retry(total=3, backoff_factor=0.1, status_forcelist=[500, 502, 503, 504])
+ adapter = requests.adapters.HTTPAdapter(pool_block=True, max_retries=retries)
session.mount("https://", adapter)
responses = []
- def fetch_xml(_url, _responses):
+ def fetch_xml(_e_args):
+ _url = f'https://www.youtube.com/feeds/videos.xml?channel_id={_e_args[0]}'
+ _responses = _e_args[1]
try:
_response = session.get(_url, timeout=(3.05, 27))
_response.raise_for_status()
@@ -839,18 +844,45 @@
return
_responses.append(_response)
- threads = []
- for channel_id in sub_channel_ids:
- thread = threading.Thread(
- target=fetch_xml,
- args=('https://www.youtube.com/feeds/videos.xml?channel_id=' + channel_id,
- responses)
- )
- threads.append(thread)
- thread.start()
-
- for thread in threads:
- thread.join(30)
+ def tpool_handler():
+ with concurrent.futures.ThreadPoolExecutor() as fetch_tpool:
+ e_args = []
+ fetch_args = []
+
+ for channel_id in sub_channel_ids:
+ e_args = [channel_id, responses]
+ fetch_args.append(e_args)
+
+ try:
+ futures = {fetch_tpool.submit(fetch_xml, task): task for task in fetch_args}
+ except Exception as e:
+ fetch_tpool.shutdown(cancel_futures=True)
+ return e
+ else:
+ while len(futures) > 0:
+ pend_futures = {}
+ done, not_done = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_COMPLETED)
+ _context.log_debug(f'Channel XML Fetch Active Threads : {len(futures)} | Done : {len(done)} | Not Done : {len(not_done)}')
+ for tsk in done:
+ if tsk.exception():
+ _context.log_error(f'FATAL : Failed to fetch xml data for channel_id {task[2]}')
+ raise tsk.exception()
+ else:
+ task = futures[tsk]
+ _context.log_debug(f'SUCCESS : Fetched xml data for channel_id {task[2]}')
+ for tsk in not_done:
+ task = futures[tsk]
+ pend_futures[tsk] = task
+ futures = pend_futures
+ return True
+
+ tp_status = tpool_handler()
+ if not tp_status == True:
+ _context.log_error('FATAL : Failed to start Channel XML Fetch ThreadPool executor or component.')
+ raise tp_status
+ else:
+ _context.log_debug(f'SUCCESS : Exited Channel XML Fetch ThreadPool cleanly | Status : {tp_status}')
+
session.close()
for response in responses:
This one has full error and debug logging (but without timekeeping and statistical logging), and includes urllib3 debug logging and a retry capability for threadpool start and xml fetch operations (the latter being largely redundant when using the included HTTPAdapter
with urllib3
retry mechanics):
--- CodeSafe/__dev/kodi/youtube/plugin.video.youtube-7.0.2.2.matrix.1_release/resources/lib/youtube_plugin/youtube/client/youtube.py
+++ CodeSafe/__dev/kodi/youtube/plugin.video.youtube-7.0.2.2.matrix.1_wip/resources/lib/youtube_plugin/youtube/client/youtube.py
@@ -12,11 +12,13 @@
import json
import re
import threading
+import concurrent.futures
import traceback
import xml.etree.ElementTree as ET
import requests
+from urllib3.util import Retry
from .login_client import LoginClient
from ..youtube_exceptions import YouTubeException
from ..helper.video_info import VideoInfo
@@ -24,8 +26,16 @@
from ...kodion.utils import datetime_parser
from ...kodion.utils import to_unicode
+# Maximum times to attempt to fetch channel data.
+# Session HTTPAdapter already also tries 3 times.
+# This value acts as a multiplier of session retries.
+MAX_FETCH_XML_RETRIES = 1
+MAX_THREADSTART_RETRIES = 5
+
+
_context = Context(plugin_id='plugin.video.youtube')
+requests.urllib3.add_stderr_logger()
class YouTube(LoginClient):
def __init__(self, config=None, language='en-US', region='US', items_per_page=50, access_token='', access_token_tv=''):
@@ -825,11 +835,13 @@
session = requests.Session()
session.headers = headers
session.verify = self._verify
- adapter = requests.adapters.HTTPAdapter(pool_maxsize=5, pool_block=True)
+ retries = Retry(total=3, backoff_factor=0.1, status_forcelist=[500, 502, 503, 504])
+ adapter = requests.adapters.HTTPAdapter(pool_block=True, max_retries=retries)
session.mount("https://", adapter)
- responses = []
- def fetch_xml(_url, _responses):
+ def fetch_xml(_e_args):
+ _url = f'https://www.youtube.com/feeds/videos.xml?channel_id={_e_args[2]}'
+ _responses = _e_args[3]
try:
_response = session.get(_url, timeout=(3.05, 27))
_response.raise_for_status()
@@ -837,20 +849,76 @@
_context.log_debug('Response: {0}'.format(error.response and error.response.text))
_context.log_error('Failed |%s|' % traceback.print_exc())
return
+ except Exception as e:
+ _context.log_error(f'Task # {_e_args[0]} failed to perform a clean Channel XML Fetch for channel_id {_e_args[2]}.')
+ raise e
_responses.append(_response)
- threads = []
- for channel_id in sub_channel_ids:
- thread = threading.Thread(
- target=fetch_xml,
- args=('https://www.youtube.com/feeds/videos.xml?channel_id=' + channel_id,
- responses)
- )
- threads.append(thread)
- thread.start()
-
- for thread in threads:
- thread.join(30)
+ def tpool_handler(_attempt):
+ with concurrent.futures.ThreadPoolExecutor() as fetch_tpool:
+ task_idx = 0
+ retry_ctr = 0
+ e_args = []
+ fetch_args = []
+ _context.log_debug(f'Channel XML Fetch ThreadPool has {fetch_tpool._max_workers} workers...')
+
+ for channel_id in sub_channel_ids:
+ e_args = [task_idx, retry_ctr, channel_id, responses]
+ fetch_args.append(e_args)
+ task_idx +=1
+
+ try:
+ futures = {fetch_tpool.submit(fetch_xml, task): task for task in fetch_args}
+ except Exception as e:
+ _context.log_error(f'Failed to start Channel XML Fetch ThreadPool executor or component on attempt # {(_attempt + 1)} of {MAX_THREADSTART_RETRIES}. ')
+ try:
+ futures
+ except NameError:
+ _context.log_error(f'No Threads to cancel. Retrying...')
+ except:
+ _context.log_error(f'Threads to be cancelled before retrying : {len(futures)}')
+ finally:
+ fetch_tpool.shutdown(cancel_futures=True)
+ return e
+ else:
+ _context.log_debug('SUCCESS : All jobs have been submitted to the Channel XML Fetch ThreadPool.')
+ while len(futures) > 0:
+ pend_futures = {}
+ done, not_done = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_COMPLETED)
+ _context.log_debug(f'Channel XML Fetch Active Threads : {len(futures)} | Done : {len(done)} | Not Done : {len(not_done)}')
+ for tsk in done:
+ if tsk.exception():
+ task = futures[tsk]
+ if task[1] < MAX_FETCH_XML_RETRIES:
+ task[1] += 1
+ _context.log_error(f'Failed to fetch xml for channel_id {task[2]}. Attempting retry # {task[1]} of {MAX_FETCH_XML_RETRIES}...')
+ pend_futures[fetch_tpool.submit(fetch_xml, task)] = task
+ else:
+ _context.log_error(f'FATAL : Failed to fetch xml data for channel_id {task[2]}')
+ raise tsk.exception()
+ else:
+ task = futures[tsk]
+ _context.log_debug(f'SUCCESS : Fetched xml data for channel_id {task[2]}')
+ for tsk in not_done:
+ task = futures[tsk]
+ pend_futures[tsk] = task
+ futures = pend_futures
+ return True
+
+ for attempt in range(MAX_THREADSTART_RETRIES):
+ responses = []
+ tp_status = tpool_handler(attempt)
+ if not tp_status == True:
+ _context.log_debug(f'Channel XML Fetch ThreadPool failed. Retrying now... | Status : {tp_status}')
+ continue
+ else:
+ _context.log_debug(f'SUCCESS : Exited Channel XML Fetch ThreadPool cleanly | Status : {tp_status}')
+ break
+ else:
+ if not tp_status == True:
+ _context.log_error('FATAL : Failed to start Channel XML Fetch ThreadPool executor or component.')
+ raise tp_status
+
session.close()
for response in responses:
I hope some of it is of use... 8^d
I hope some of it is of use... 8^d
It definitely is. Thanks for the investigation, will take me a while to look through this properly, just a bit busy with other things at the moment.
I should probably point out the limitations.
Importantly, I used the submit()
method rather than map()
so results are not returned in order, but as they finish. The reason for using submit()
was originally to avoid spinning the main thread but also to make use of Future
objects, and more specifically add_done_callback(future)
in conjunction with a locked condition.wait()
and corresponding notify()
.
This didn't really work - I suspect because these particular threads are so short lived. Often the callback would execute and the future object would be torn down which would raise an undefined exception. This could actually be an upstream bug, as really the future shouldn't be torn down until after the callback returns.
Anyway, in the end, I used the concurrent.futures.wait()
module function and let the main thread spin in a while loop. imho, the use of futures is superior to iterating over results using map()
, so for this reason I continued to use submit()
.
The submit()
method really requires all the function arguments in an array as it only accepts one argument before kwargs. One could use kwargs here, but you will see I used a dictionary comprehension to populate the futures
variable with a collection of future objects. These objects only exist until a short time after they reach the "done" status, so we are dependent on the results being in the responses
array (which outlives futures
).
When an ordered response is required then a sort operation can be performed using an index (e.g. task_idx
in the last diff), although a new parent responses
array would need to first incorporate said index to allow for any such re-ordering once all futures are completed, e.g. something like responses = sorted(indexed_responses, key=lambda x: x[0])
Creating the thread pool using with
takes care of the pool shutdown (which otherwise would have to be done manually) and should free up resources created within the conditional once it does shutdown.
Hope that helps explain some of the choices made... 8^d
Update for v7.0.3.2:
--- plugin.video.youtube-7.0.3.2_release/resources/lib/youtube_plugin/youtube/client/youtube.py
+++ ThreadPool_XML_Fetch_7.0.3.2_release/resources/lib/youtube_plugin/youtube/client/youtube.py
@@ -11,6 +11,7 @@
from __future__ import absolute_import, division, unicode_literals
import threading
+import concurrent.futures
import xml.etree.ElementTree as ET
from copy import deepcopy
from itertools import chain, islice
@@ -1507,23 +1508,41 @@
responses = []
- def fetch_xml(_url, _responses):
+ def fetch_xml(_e_args):
+ _url = f'https://www.youtube.com/feeds/videos.xml?channel_id={_e_args[0]}'
+ _responses = _e_args[1]
_response = self.request(_url, headers=headers)
if _response:
_responses.append(_response)
- threads = []
- for channel_id in sub_channel_ids:
- thread = threading.Thread(
- target=fetch_xml,
- args=('https://www.youtube.com/feeds/videos.xml?channel_id=' + channel_id,
- responses)
- )
- threads.append(thread)
- thread.start()
-
- for thread in threads:
- thread.join(30)
+ def tpool_handler():
+ with concurrent.futures.ThreadPoolExecutor() as fetch_tpool:
+ e_args = []
+ fetch_args = []
+ for channel_id in sub_channel_ids:
+ e_args = [channel_id, responses]
+ fetch_args.append(e_args)
+ try:
+ futures = {fetch_tpool.submit(fetch_xml, task): task for task in fetch_args}
+ except Exception as e:
+ fetch_tpool.shutdown(cancel_futures=True)
+ return e
+ else:
+ while len(futures) > 0:
+ pend_futures = {}
+ done, not_done = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_COMPLETED)
+ for tsk in done:
+ if tsk.exception(): raise tsk.exception()
+ for tsk in not_done:
+ task = futures[tsk]
+ pend_futures[tsk] = task
+ futures = pend_futures
+ return True
+
+ tp_status = tpool_handler()
+ if not tp_status == True:
+ _context.log_error('FATAL : Failed to start Channel XML Fetch ThreadPool executor or component.')
+ raise tp_status
for response in responses:
if response:
Update for 7.0.4:
--- plugin.video.youtube-7.0.4.nexus.1/resources/lib/youtube_plugin/youtube/client/youtube.py 2024-03-22 09:53:34.000000000 +1100
+++ plugin.video.youtube-7.0.4.nexus.1_threadpool_xml_fetch/resources/lib/youtube_plugin/youtube/client/youtube.py 2024-03-25 18:20:28.983669273 +1100
@@ -11,6 +11,7 @@
from __future__ import absolute_import, division, unicode_literals
import threading
+import concurrent.futures
import xml.etree.ElementTree as ET
from copy import deepcopy
from itertools import chain, islice
@@ -1515,23 +1516,41 @@
responses = []
- def fetch_xml(_url, _responses):
+ def fetch_xml(_e_args):
+ _url = f'https://www.youtube.com/feeds/videos.xml?channel_id={_e_args[0]}'
+ _responses = _e_args[1]
_response = self.request(_url, headers=headers)
if _response:
_responses.append(_response)
- threads = []
- for channel_id in sub_channel_ids:
- thread = threading.Thread(
- target=fetch_xml,
- args=('https://www.youtube.com/feeds/videos.xml?channel_id=' + channel_id,
- responses)
- )
- threads.append(thread)
- thread.start()
-
- for thread in threads:
- thread.join(30)
+ def tpool_handler():
+ with concurrent.futures.ThreadPoolExecutor() as fetch_tpool:
+ e_args = []
+ fetch_args = []
+ for channel_id in sub_channel_ids:
+ e_args = [channel_id, responses]
+ fetch_args.append(e_args)
+ try:
+ futures = {fetch_tpool.submit(fetch_xml, task): task for task in fetch_args}
+ except Exception as e:
+ fetch_tpool.shutdown(cancel_futures=True)
+ return e
+ else:
+ while len(futures) > 0:
+ pend_futures = {}
+ done, not_done = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_COMPLETED)
+ for tsk in done:
+ if tsk.exception(): raise tsk.exception()
+ for tsk in not_done:
+ task = futures[tsk]
+ pend_futures[tsk] = task
+ futures = pend_futures
+ return True
+
+ tp_status = tpool_handler()
+ if not tp_status == True:
+ self._context.log_error('FATAL : Failed to start Channel XML Fetch ThreadPool executor or component.')
+ raise tp_status
for response in responses:
if response:
Update for v7.0.5+beta.1:
--- plugin.video.youtube-7.0.5+beta.1.matrix.1/resources/lib/youtube_plugin/youtube/client/youtube.py 2024-03-28 16:41:56.000000000 +1100
+++ plugin.video.youtube-7.0.5+beta.1.matrix.1_threadpool_xml_fetch/resources/lib/youtube_plugin/youtube/client/youtube.py 2024-03-29 19:01:06.845441805 +1100
@@ -11,6 +11,7 @@
from __future__ import absolute_import, division, unicode_literals
import threading
+import concurrent.futures
import xml.etree.ElementTree as ET
from copy import deepcopy
from itertools import chain, islice
@@ -1524,23 +1525,41 @@
responses = []
- def fetch_xml(_url, _responses):
+ def fetch_xml(_e_args):
+ _url = f'https://www.youtube.com/feeds/videos.xml?channel_id={_e_args[0]}'
+ _responses = _e_args[1]
_response = self.request(_url, headers=headers)
if _response:
_responses.append(_response)
- threads = []
- for channel_id in sub_channel_ids:
- thread = threading.Thread(
- target=fetch_xml,
- args=('https://www.youtube.com/feeds/videos.xml?channel_id=' + channel_id,
- responses)
- )
- threads.append(thread)
- thread.start()
-
- for thread in threads:
- thread.join(30)
+ def tpool_handler():
+ with concurrent.futures.ThreadPoolExecutor() as fetch_tpool:
+ e_args = []
+ fetch_args = []
+ for channel_id in sub_channel_ids:
+ e_args = [channel_id, responses]
+ fetch_args.append(e_args)
+ try:
+ futures = {fetch_tpool.submit(fetch_xml, task): task for task in fetch_args}
+ except Exception as e:
+ fetch_tpool.shutdown(cancel_futures=True)
+ return e
+ else:
+ while len(futures) > 0:
+ pend_futures = {}
+ done, not_done = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_COMPLETED)
+ for tsk in done:
+ if tsk.exception(): raise tsk.exception()
+ for tsk in not_done:
+ task = futures[tsk]
+ pend_futures[tsk] = task
+ futures = pend_futures
+ return True
+
+ tp_status = tpool_handler()
+ if not tp_status == True:
+ self._context.log_error('FATAL : Failed to start Channel XML Fetch ThreadPool executor or component.')
+ raise tp_status
for response in responses:
if response:
Update for 7.0.5+beta.4 (includes edit for #679):
--- plugin.video.youtube-7.0.5+beta.4/resources/lib/youtube_plugin/youtube/client/youtube.py
+++ plugin.video.youtube-7.0.5+beta.4_threadpool_xml_fetch/resources/lib/youtube_plugin/youtube/client/youtube.py
@@ -11,6 +11,7 @@
from __future__ import absolute_import, division, unicode_literals
import threading
+import concurrent.futures
import xml.etree.ElementTree as ET
from copy import deepcopy
from itertools import chain, islice
@@ -1524,23 +1525,41 @@
responses = []
- def fetch_xml(_url, _responses):
+ def fetch_xml(_e_args):
+ _url = f'https://www.youtube.com/feeds/videos.xml?channel_id={_e_args[0]}'
+ _responses = _e_args[1]
_response = self.request(_url, headers=headers)
if _response:
_responses.append(_response)
- threads = []
- for channel_id in sub_channel_ids:
- thread = threading.Thread(
- target=fetch_xml,
- args=('https://www.youtube.com/feeds/videos.xml?channel_id=' + channel_id,
- responses)
- )
- threads.append(thread)
- thread.start()
-
- for thread in threads:
- thread.join(30)
+ def tpool_handler():
+ with concurrent.futures.ThreadPoolExecutor() as fetch_tpool:
+ e_args = []
+ fetch_args = []
+ for channel_id in sub_channel_ids:
+ e_args = [channel_id, responses]
+ fetch_args.append(e_args)
+ try:
+ futures = {fetch_tpool.submit(fetch_xml, task): task for task in fetch_args}
+ except Exception as e:
+ fetch_tpool.shutdown(cancel_futures=True)
+ return e
+ else:
+ while len(futures) > 0:
+ pend_futures = {}
+ done, not_done = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_COMPLETED)
+ for tsk in done:
+ if tsk.exception(): raise tsk.exception()
+ for tsk in not_done:
+ task = futures[tsk]
+ pend_futures[tsk] = task
+ futures = pend_futures
+ return True
+
+ tp_status = tpool_handler()
+ if not tp_status == True:
+ self._context.log_error('FATAL : Failed to start Channel XML Fetch ThreadPool executor or component.')
+ raise tp_status
do_encode = not current_system_version.compatible(19, 0)
--- plugin.video.youtube-7.0.5+beta.4/resources/lib/youtube_plugin/youtube/helper/video_info.py
+++ plugin.video.youtube-7.0.5+beta.4_threadpool_xml_fetch/resources/lib/youtube_plugin/youtube/helper/video_info.py
@@ -596,7 +596,7 @@
48: '48000/1000', # 48.00 fps
50: '50000/1000', # 50.00 fps
60: '60000/1000', # 60.00 fps
- },
+ }
FRACTIONAL_FPS_SCALE = {
0: '{0}000/1000', # --.00 fps
24: '24000/1001', # 23.976 fps
Update for v7.0.5:
--- plugin.video.youtube-7.0.5_release/resources/lib/youtube_plugin/youtube/client/youtube.py
+++ plugin.video.youtube-7.0.5_threadpool_xml_fetch/resources/lib/youtube_plugin/youtube/client/youtube.py
@@ -11,6 +11,7 @@
from __future__ import absolute_import, division, unicode_literals
import threading
+import concurrent.futures
import xml.etree.ElementTree as ET
from copy import deepcopy
from itertools import chain, islice
@@ -1524,23 +1525,41 @@
responses = []
- def fetch_xml(_url, _responses):
+ def fetch_xml(_e_args):
+ _url = f'https://www.youtube.com/feeds/videos.xml?channel_id={_e_args[0]}'
+ _responses = _e_args[1]
_response = self.request(_url, headers=headers)
if _response:
_responses.append(_response)
- threads = []
- for channel_id in sub_channel_ids:
- thread = threading.Thread(
- target=fetch_xml,
- args=('https://www.youtube.com/feeds/videos.xml?channel_id=' + channel_id,
- responses)
- )
- threads.append(thread)
- thread.start()
-
- for thread in threads:
- thread.join(30)
+ def tpool_handler():
+ with concurrent.futures.ThreadPoolExecutor() as fetch_tpool:
+ e_args = []
+ fetch_args = []
+ for channel_id in sub_channel_ids:
+ e_args = [channel_id, responses]
+ fetch_args.append(e_args)
+ try:
+ futures = {fetch_tpool.submit(fetch_xml, task): task for task in fetch_args}
+ except Exception as e:
+ fetch_tpool.shutdown(cancel_futures=True)
+ return e
+ else:
+ while len(futures) > 0:
+ pend_futures = {}
+ done, not_done = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_COMPLETED)
+ for tsk in done:
+ if tsk.exception(): raise tsk.exception()
+ for tsk in not_done:
+ task = futures[tsk]
+ pend_futures[tsk] = task
+ futures = pend_futures
+ return True
+
+ tp_status = tpool_handler()
+ if not tp_status == True:
+ self._context.log_error('FATAL : Failed to start Channel XML Fetch ThreadPool executor or component.')
+ raise tp_status
do_encode = not current_system_version.compatible(19, 0)
Update for 7.0.6+beta.1:
--- plugin.video.youtube-7.0.6+beta.1_release/resources/lib/youtube_plugin/youtube/client/youtube.py
+++ plugin.video.youtube-7.0.6+beta.1_threadpool_xml_fetch/resources/lib/youtube_plugin/youtube/client/youtube.py
@@ -11,6 +11,7 @@
from __future__ import absolute_import, division, unicode_literals
import threading
+import concurrent.futures
import xml.etree.ElementTree as ET
from copy import deepcopy
from itertools import chain, islice
@@ -1525,23 +1526,41 @@
responses = []
- def fetch_xml(_url, _responses):
+ def fetch_xml(_e_args):
+ _url = f'https://www.youtube.com/feeds/videos.xml?channel_id={_e_args[0]}'
+ _responses = _e_args[1]
_response = self.request(_url, headers=headers)
if _response:
_responses.append(_response)
- threads = []
- for channel_id in sub_channel_ids:
- thread = threading.Thread(
- target=fetch_xml,
- args=('https://www.youtube.com/feeds/videos.xml?channel_id=' + channel_id,
- responses)
- )
- threads.append(thread)
- thread.start()
-
- for thread in threads:
- thread.join(30)
+ def tpool_handler():
+ with concurrent.futures.ThreadPoolExecutor() as fetch_tpool:
+ e_args = []
+ fetch_args = []
+ for channel_id in sub_channel_ids:
+ e_args = [channel_id, responses]
+ fetch_args.append(e_args)
+ try:
+ futures = {fetch_tpool.submit(fetch_xml, task): task for task in fetch_args}
+ except Exception as e:
+ fetch_tpool.shutdown(cancel_futures=True)
+ return e
+ else:
+ while len(futures) > 0:
+ pend_futures = {}
+ done, not_done = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_COMPLETED)
+ for tsk in done:
+ if tsk.exception(): raise tsk.exception()
+ for tsk in not_done:
+ task = futures[tsk]
+ pend_futures[tsk] = task
+ futures = pend_futures
+ return True
+
+ tp_status = tpool_handler()
+ if not tp_status == True:
+ self._context.log_error('FATAL : Failed to start Channel XML Fetch ThreadPool executor or component.')
+ raise tp_status
do_encode = not current_system_version.compatible(19, 0)
Update for v7.0.6.3:
--- plugin.video.youtube-7.0.6.3_release/resources/lib/youtube_plugin/youtube/client/youtube.py
+++ plugin.video.youtube-7.0.6.3_threadpool_xml_fetch/resources/lib/youtube_plugin/youtube/client/youtube.py
@@ -11,6 +11,7 @@
from __future__ import absolute_import, division, unicode_literals
import threading
+import concurrent.futures
import xml.etree.ElementTree as ET
from copy import deepcopy
from itertools import chain, islice
@@ -1525,23 +1526,41 @@
responses = []
- def fetch_xml(_url, _responses):
+ def fetch_xml(_e_args):
+ _url = f'https://www.youtube.com/feeds/videos.xml?channel_id={_e_args[0]}'
+ _responses = _e_args[1]
_response = self.request(_url, headers=headers)
if _response:
_responses.append(_response)
- threads = []
- for channel_id in sub_channel_ids:
- thread = threading.Thread(
- target=fetch_xml,
- args=('https://www.youtube.com/feeds/videos.xml?channel_id=' + channel_id,
- responses)
- )
- threads.append(thread)
- thread.start()
-
- for thread in threads:
- thread.join(30)
+ def tpool_handler():
+ with concurrent.futures.ThreadPoolExecutor() as fetch_tpool:
+ e_args = []
+ fetch_args = []
+ for channel_id in sub_channel_ids:
+ e_args = [channel_id, responses]
+ fetch_args.append(e_args)
+ try:
+ futures = {fetch_tpool.submit(fetch_xml, task): task for task in fetch_args}
+ except Exception as e:
+ fetch_tpool.shutdown(cancel_futures=True)
+ return e
+ else:
+ while len(futures) > 0:
+ pend_futures = {}
+ done, not_done = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_COMPLETED)
+ for tsk in done:
+ if tsk.exception(): raise tsk.exception()
+ for tsk in not_done:
+ task = futures[tsk]
+ pend_futures[tsk] = task
+ futures = pend_futures
+ return True
+
+ tp_status = tpool_handler()
+ if not tp_status == True:
+ self._context.log_error('FATAL : Failed to start Channel XML Fetch ThreadPool executor or component.')
+ raise tp_status
do_encode = not current_system_version.compatible(19, 0)
@benyamin-codez - I have had to make some more changes to My Subscriptions, that I hadn't planned on doing until more substantial work was done on moving other unrelated functionality to use the v1 API, so that parallel requests can be transparently handled by the plugin requests module.
Long story short - I have also implemented a more naive implementation of the thread pool fix you are using, as a stepping stone to the end goal. The main reason I didn't use your patch is because I am ostensibly trying to maintain basic functionality for Kodi 18 (using Python 2.7), in which concurrent.futures
doesn't exist.
The git history is here: https://github.com/MoojMidge/plugin.video.youtube/commits/master/resources/lib/youtube_plugin/youtube/client/youtube.py
Main threading related changes are: https://github.com/MoojMidge/plugin.video.youtube/commit/76b29546f91fe778a8d7cc0ed1929a54ea212d6c#diff-5ecde54fa534f31c07c435544e8497cb462674901a9d688be492bfbbde50b579R1560-R1615
Before I merge this, can you test? https://github.com/MoojMidge/plugin.video.youtube/releases/tag/v7.0.7%2Bbeta.2
If you want, you can also PR your patch and I can edit it so the forthcoming changes will merge cleanly.
@MoojMidge, many thanks for that. I've checked on two RPi3B+ and your changes do resolve the problem in the OP.
Given your backwards compatibility restraints, I thought your solution was quite elegant. I should mention that it is demonstrably slower on initial cache population, but only marginally slower otherwise - at least it seems slightly slower. I'm not sure if this is due to your threading implementation or perhaps another change...?
Using concurrent.futures
might result in faster execution - I couldn't say. If I did a PR for my patch, would you then use concurrent.futures
in the non-Leia builds? If you want me to raise the PR, which repository do you want it in?
I should mention that it is demonstrably slower on initial cache population, but only marginally slower otherwise - at least it seems slightly slower. I'm not sure if this is due to your threading implementation or perhaps another change...?
How are you measuring this? I think the issue is that I changed what was being done in each thread, so rather than just making the request, each thread was also processing the feed xml i.e. doing a bunch of cpu bound rather than I/O bound operations. Also there was an unnecessary lock for writing to the output, and also an excessive amount of attempts to acquire the input read/modify lock, and apparently those are fairly expensive operations.
Can you see if https://github.com/MoojMidge/plugin.video.youtube/commit/513a9f400748708b6480df18790b1c45f81fa6b2 restores the speed?
If I did a PR for my patch, would you then use
concurrent.futures
in the non-Leia builds? If you want me to raise the PR, which repository do you want it in?
On master
. Won't be maintaining separate branches for older versions of Kodi, but there are lightweight compatibility shims for Leia. Will have to see what the easiest way to implement it will be, along with making some changes to the data structure used for argument passing.
How are you measuring this? I think the issue is that I changed what was being done in each thread, so rather than just making the request, each thread was also processing the feed xml. Also there was an unnecessary lock for writing to the output, and also an excessive amount of attempts to acquire the input read/modify lock, and apparently those are fairly expensive operations.
Can you see if MoojMidge@513a9f4 restores the speed?
Yes, that was a significant improvement.
This round I obtained some rough run times using debug log time stamps. I forced cached = []
also. Your solution had a runtime of approx. 20s on initial run and 9s for subsequent runs, whereas 7.0.7+beta.1 with my patch runs for approx. 18s on initial run and 9s for subsequent runs. This is for approx. 215 channels plus 3 bookmarked channels.
I think the main difference is that your solution is executing the whole_job max_threads, whereas mine is only executing each worker max_threads. So your solution is quite efficient given you still need to do the locking for your args.pop()
.
On run-times, I should point out that the fetch has run a bit slower since I reverted to using your HTTPAdapter
with pool_maxsize=5
rather than the default value of 10.
Given the above, do you still want me to submit the PR...? 2 seconds isn't much, but I guess a little noticeable. I don't mind submitting the PR if you think you'll make use of it.
Can you see how the latest beta works? https://github.com/anxdpanic/plugin.video.youtube/releases/tag/v7.0.7%2Bbeta.2 If the speed is comparable, then can just run with this from now on, otherwise can look into it further based on the patch you have been using.
@MoojMidge, I wasn't able to get v7.0.7+beta.2 working, per #769. 8^d
It looks like your solution has matured quite a bit since I last had a peek. I look forward to giving it a go.
The intent is to make this a more generic helper method that can be used in other parts of the plugin, so still needs some more work, but let's see how it works for this initial application
The intent is to make this a more generic helper method that can be used in other parts of the plugin, so still needs some more work, but let's see how it works for this initial application
Well it is certainly very quick..! At 10 to 11 seconds a run using cached = []
it is noticeably faster. Very impressive. When you migrate this into a helper method, it will be interesting to see what other gains can be had. Thank you for resolving this @MoojMidge. I am well pleased with the outcome and happy for you to close the issue.
It was a bit faster for me on a fast multi-core windows laptop, but good to hear that the same or better speed gains can be observed on more limited devices where it matters more.
Context
Please provide any relevant information about your setup
Expected Behavior
When selecting "My Subscriptions" the subscription feed loads and is displayed.
Current Behavior
When selecting "My Subscriptions" the subscription feed continually loads (never stops) and an error message is displayed stating that a new thread cannot be started.
Steps to Reproduce
Please provide detailed steps for reproducing the issue.
Log
If you really want it let me know and I will consider dropping one, but know that I don't really have the time to perform necessary sanitization of GPS and other identifying data from it...
Additional Information
Platform is a Raspberry Pi Model 3B+.
I suspected a possible race condition on resource constrained hardware.
I modified per the following to resolve the problem.
Following this, the log shows many threads restarting on second try, and none needing more than three tries.
Hope that helps...!
Ben