scrapy / scrapyd

A service daemon to run Scrapy spiders
https://scrapyd.readthedocs.io/en/stable/
BSD 3-Clause "New" or "Revised" License
2.94k stars 571 forks source link

Poll jobs by comparing both priority and insert_time #344

Closed my8100 closed 2 months ago

my8100 commented 5 years ago

This PR use project_priority_map to store (priority, -timestamp) as value, in order to find out the queue to pop.

Fix #187 (the updated test_poll_next() demonstrates the effect). Also, provide backward compatibility for custom SqliteSpiderQueue and JsonSqlitePriorityQueue.

codecov[bot] commented 5 years ago

Codecov Report

Merging #344 into master will decrease coverage by 0.02%. The diff coverage is 67.64%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #344      +/-   ##
==========================================
- Coverage   68.37%   68.35%   -0.03%     
==========================================
  Files          17       17              
  Lines         860      891      +31     
  Branches      104      112       +8     
==========================================
+ Hits          588      609      +21     
- Misses        242      250       +8     
- Partials       30       32       +2
Impacted Files Coverage Δ
scrapyd/sqlite.py 89.28% <100%> (+0.39%) :arrow_up:
scrapyd/spiderqueue.py 95.65% <100%> (+0.41%) :arrow_up:
scrapyd/poller.py 74.07% <60.71%> (-12.14%) :arrow_down:

Continue to review full report at Codecov.

Legend - Click here to learn more Δ = absolute <relative> (impact), ø = not affected, ? = missing data Powered by Codecov. Last update 3ff7c1c...f017268. Read the comment docs.

codecov[bot] commented 5 years ago

Codecov Report

Merging #344 into master will increase coverage by 0.26%. The diff coverage is 73.33%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #344      +/-   ##
==========================================
+ Coverage   68.37%   68.63%   +0.26%     
==========================================
  Files          17       17              
  Lines         860      915      +55     
  Branches      104      117      +13     
==========================================
+ Hits          588      628      +40     
- Misses        242      252      +10     
- Partials       30       35       +5
Impacted Files Coverage Δ
scrapyd/spiderqueue.py 96% <100%> (+0.76%) :arrow_up:
scrapyd/poller.py 75% <62.96%> (-11.21%) :arrow_down:
scrapyd/sqlite.py 86.76% <79.31%> (-2.13%) :arrow_down:

Continue to review full report at Codecov.

Legend - Click here to learn more Δ = absolute <relative> (impact), ø = not affected, ? = missing data Powered by Codecov. Last update 4520c45...1a0cb2b. Read the comment docs.

my8100 commented 5 years ago

@Digenis How do you think about this?

Digenis commented 5 years ago

Your solution implements the second idea in #187.

Your code reveals that the second idea has another drawback. A for-loop in the poller, querying all queues, if more processes write/read queues, will cause race conditions to occur more often.

Digenis commented 5 years ago

The compatibility attribute trick was a nice idea by the way

my8100 commented 5 years ago

But the original code also iterates the queues, and if msg is not None: can handle concurrency problem, so can the new code. https://github.com/scrapy/scrapyd/blob/4520c45acc115d0fc766eae30788468a77690433/scrapyd/poller.py#L20-L26

my8100 commented 5 years ago

FYI: #198

Digenis commented 5 years ago

But the original code also iterates the queues,

It was stopping on the first non-empty queue.

Unless I misunderstand returnValue(), maybe I do. Here's what I think it does. msg = yield maybeDeferred(q.pop) gives control to twisted to call q.pop. Then twisted sends the result back to the function, which stores it to msg resuming execution. yield/send, although used here only for giving/receiving control of the execution, actually make the function a generator, so we can't use a return statement for our result. As a workaround, twisted has returnValue() which raises an exception to stop there. Does execution continue after returnValue() is called?

my8100 commented 5 years ago

poll() ends when returnValue() is called.

Can you explain ‘race conditions‘ in details, maybe with an example?

my8100 commented 5 years ago

How about the third commit? I save the highest priority of each queue in the queue_priority_map, and get it updated whenever needed, so that poll() can figure out the highest priority of all queues instantly.

Digenis commented 5 years ago

A multiple queue consumer scenario. Suppose a slow consumer iterates all the queues and collects the top job of each queue. Meanwhile, a fast consumer does the same and also pops the job with the highest priority among all queues. The slow consumer will then get an empty result when trying to pop the same job and will start over. This things are more likely to happen if the process of finding the top job of all queues takes more time / cpu circles.

Digenis commented 5 years ago

Also, I just realized that the FIFO principal is still violated.

A limit of 2 parallel processes. 2 empty fresh queues.

On hour 13:00 projectA gets 5 jobs scheduled, with queueA ids (in sqlite) 1,2,3,4,5

By hour 13:05, projectA's job with queueA id 1 has finished and 2, 3 are running while projectB gets 5 jobs scheduled, with queueB id 1,2,3,4,5

By hour 13:10, projectA's job with queueA id 2 finishes and projectB job with queueB id 1 is starting.

projectA job with queue id 3, although scheduled at 13:00, will have to wait for projectB queue id 1 and 2, which were scheduled 10 minutes later

Of course this is not exactly the current behaviour of your branch but may have been your next commit ­— selecting & sorting ids too. Currently, the priority scope is global but same priorities revert back to the old behaviour.

The in-queue id defines a priority too — the insertion order. However, we can't compare insertion orders of different queues. If we did, we would also end up with "arbitrary" order. (Not exactly arbitrary but I won't open a parentheses because it's not a useful scheduling policy in this context)

Digenis commented 5 years ago

So, how do we solve this while keeping the multiple queue approach? To make things easy, suppose all queues were different tables in the same db.

SELECT 'projectA', id FROM queueA
UNION ALL
SELECT 'projectB', id FROM queueB
ORDER BY priority DESC -- then what column?
-- the information about the global insertion order was never stored
LIMIT 1;

At this point, unless you have a business-critical non-upgradeable legacy system you probably give up any attempt to do a backwards compatible fix that'd go in 1.3. It's dirty fixes all the way from here:

In the poller, an equivalent of ORDER BY RANDOM(), priority DESC (which I just decided to name random-robin priority queues).

In all queues, a new datatime column, saving the insertion time, then ORDER BY priority DESC, insert_time ASC

A master queue saving a global insertion order

CREATE TABLE queue (id INTEGER PRIMARY KEY,
                    project TEXT NOT NULL,
                    foreign_id INTEGER NOT NULL);

which is close to the unified queue idea, only more complicated implementing all the rest simultaneously.

Of all the desperate attempts to keep backwards compatibility, the one with the insertion time is the most acceptable, although of debatable compatibility. It even qualifies as something to test a unified queue against. Multiple tables + datetime definitely add a probability of error but FIFO will be violated only in sub-second resolutions and bothering about this is like bothering about the time it takes for a scheduling request to cross the network. I'd still like to see the queue unified, even in a single-db multiple tables schema. Let's discuss and test the datetime trick against the single table approach.

my8100 commented 5 years ago

A multiple queue consumer scenario. Suppose a slow consumer iterates all the queues

So, if cancel.json is called while poll() is iterating all the queues with yield maybeDeferred, chances are that msg = yield maybeDeferred(q.pop) may return None?

            for p, q in iteritems(self.queues):
                c = yield maybeDeferred(q.count)
                if c:
                    msg = yield maybeDeferred(q.pop)
                    if msg is not None:  # In case of a concurrently accessed queue
                        returnValue(self.dq.put(self._message(msg, p)))
my8100 commented 5 years ago

Also, I just realized that the FIFO principal is still violated.

A limit of 2 parallel processes. 2 empty fresh queues.

On hour 13:00 projectA gets 5 jobs scheduled, with queueA ids (in sqlite) 1,2,3,4,5

By hour 13:05, projectA's job with queueA id 1 has finished and 2, 3 are running while projectB gets 5 jobs scheduled, with queueB id 1,2,3,4,5

By hour 13:10, projectA's job with queueA id 2 finishes and projectB job with queueB id 1 is starting.

projectA job with queue id 3, although scheduled at 13:00, will have to wait for projectB queue id 1 and 2, which were scheduled 10 minutes later

What are the names of these projects and the priorities of each job?

my8100 commented 5 years ago

Even though for p, q in iteritems(self.queues): would violate FIFO when polling multiple projects, we can still use the priority parameter to adjust the polling order as long as we figure out the project with the highest priority of all queues.

Maybe we can/should make it in v1.3.0, which provides backward compatibility.

Check out my fourth commit, which saves the highest priority of each project in project_priority_map, so that there's no need to iterate all the queues.

my8100 commented 5 years ago

Changes in the fifth commit:

  1. A new column named 'insert_time' is add when creating tables in JsonSqlitePriorityQueue: insert_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
  2. The table name in SqliteSpiderQueue now defaults to 'spider_queue_insert_time', in order to avoid such error when selecting the non-existing 'insert_time' column in the existing db file: sqlite3.OperationalError: no such column: insert_time
  3. project_priority_map now stores (priority, -timestamp) as value so that the priority is taken first and the FIFO principle is also respected. (see test_poll_next()) {'project1': (0.0, -1561646348), 'project2': (1.0, -1561646349)}
  4. Backward compatibility is still provided.
Digenis commented 5 years ago

Even if it's backwards compatible with user code, it's not with user data in the queue.

If we break compatibility, it better be once and better be worthy.

But I also don't want to let users fix it on their own. It's annoying because most of their work would be boilerplate stuff. Let's do something similar to what scrapy did with scrapy.contrib. We can add a sub-package with these sample fixes and users would only need to change configuration. Any idea for a name?

my8100 commented 5 years ago

Even if it's backwards compatible with user code, it's not with user data in the queue.

But it only ignores the pending jobs on the first startup, which may not exist at all for most cases.

Any idea for a name?

Simply use the same name ‘contrib’? (I still don’t know how it works yet) Or are you asking for a name of the new configuration option?

my8100 commented 5 years ago

What about inserting a new column into the existing table via ALTER TABLE?

my8100 commented 5 years ago

The 6th commit introduces the ensure_insert_time_column method to add a new column named 'insert_time' into the table of the existing db files if needed. Also, fill in the 'insert_time' column for any pending jobs inside the table.

my8100 commented 5 years ago

@Digenis How about the 6th commit?

Digenis commented 5 years ago

I'm a bit busy right now

I think I'll be able to review it in the next 2 days

Digenis commented 5 years ago

Comments are to be red in chronological order (not commit order).

I feel reluctant to even make comments. You do understand that this will most probably only be a workaround, right? In a contrib/ subpackage, I wouldn't mind merging it even in 1.2 but in 1.3 I'd rather see FIFO queues taking turns in round robin and fixing the strategy in 1.4 instead of making code with so many workarounds the default in 1.3.

codecov-io commented 5 years ago

Codecov Report

Merging #344 into master will increase coverage by 0.61%. The diff coverage is 80.76%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #344      +/-   ##
==========================================
+ Coverage   68.37%   68.98%   +0.61%     
==========================================
  Files          17       17              
  Lines         860      906      +46     
  Branches      104      116      +12     
==========================================
+ Hits          588      625      +37     
- Misses        242      247       +5     
- Partials       30       34       +4
Impacted Files Coverage Δ
scrapyd/sqlite.py 90.55% <100%> (+1.66%) :arrow_up:
scrapyd/spiderqueue.py 96% <100%> (+0.76%) :arrow_up:
scrapyd/poller.py 75% <62.96%> (-11.21%) :arrow_down:

Continue to review full report at Codecov.

Legend - Click here to learn more Δ = absolute <relative> (impact), ø = not affected, ? = missing data Powered by Codecov. Last update 3ff7c1c...53a619b. Read the comment docs.

my8100 commented 5 years ago

I feel reluctant to even make comments.

If so, how can we go on discussing?

You do understand that this will most probably only be a workaround, right?

No, this PR aims to fix #187 in a creative and effective way:

  1. The priority of a job is taken first and the FIFO principle is also respected for multiple projects, and the effect is verified in the new test_poll_next().
  2. There's no delay in poll() as project_priority_map is updated by SQLite triggers.
  3. No change to the current 'multiple queues' implementation.
  4. Backward compatibility for custom modules is provided.
  5. Backward compatibility for user data is provided.
  6. It's easy to remove the workarounds in v1.4.

In a contrib/ subpackage, I wouldn't mind merging it even in 1.2 but in 1.3 I'd rather see FIFO queues taking turns in round robin and fixing the strategy in 1.4 instead of making code with so many workarounds the default in 1.3.

I think we should fix #187 in v1.3 as the 'priority' parameter is exposed in PR #161, and merge PR #343 in v1.4 while removing backward compatibility.

my8100 commented 5 years ago

The queue table name is changed from 'spider_queue' to 'spider_queue_with_triggers'.

This PR introduces SQLite triggers which are stored in the database, Users would encounter a problem if they downgrade Scrapyd from v1.3 to v1.2: sqlite3.OperationalError: no such function: update_project_priority_map

Renaming the table name ensures upgrade and downgrade work well anytime, and the pending jobs could be retrieved if needed.

jpmckinney commented 2 months ago

As Digenis wrote, if we try for backwards compatibility, then "It's dirty fixes all the way from here"

This solution is very clever, but probably too clever for a project that only gets maintainer attention every few years. It takes too long to understand how it works and how to modify it – compared to a simpler solution (that breaks compatibility).

So, I'll close this PR, though the test updates might be useful in future work.

Edit: Unrelated to closure, but I think the last commit breaks backwards compatibility for user data (code starts using a new table without migrating any of the data from the old table).