Closed dxue2012 closed 7 years ago
For me it makes sense to allow returning multiple items; I'm fine with merging a PR which implements this feature.
@dxue2012 what is your use case?
Cool! I'm considering writing a split/merge pipeline for my scraper that may either increase or decrease the number of returned items. Here's an example of splitting an item by one field:
class SplitPipeline(object):
def process_item(self, item, spider):
"""
Example:
item = {'a': 1, 'b': 'foo bar'}
return an iterable with
{'a':1, 'b': 'foo'}, {'a':1, 'b': 'bar'}
"""
for v in item['b'].split():
item['b'] = v
yield item
It would be nice to be able to yield multiple items in pipelines.
I think that with current state of things the only way to "split" or multiply items is either doing this in spider middleware process_spider_output
def process_spider_output(self, response, result, spider):
for x in result:
if isinstance(x, Item):
for y in split items:
yield y
continue
yield x
or creating nested items in pipeline.
E.g. you can have item:
{
"name":"foo",
"nested_items": {"id": 123, "value": "alfa"} # field containing other items
}
but nested items are not very convenient to work with and you have to think hard to get proper logical handling of them (at least this is my experience), plus you end up with more complex output. The benefit of them is that you have direct association between parent item and child items which can be required or useful in some cases.
Just wanna +1 this, my use case is I'm parsing magic the gathering cards from their website, but some cards are actually doublesided so it would be really useful for me to detect this in a pipeline, create a new item for the other side of the card, and then return them both to the rest of the pipeline.
If I can get my project stable first, that'll free up some time and I'll take a look at it if no one else has tackled it before then
something to think about would be if these new items should go again for the whole pipelines process, so if they should pass again through all pipelines, because one (the main one) already passed through some of the pipelines.
btw, a hacky way to return items from a pipeline would be:
def process_item(self, item, spider):
...
self.crawler.engine.scraper._process_spidermw_output(
new_item, None, 'from_pipeline', spider)
return item
Were there any design decisions behind the current scenario of one item at a time?
Hi all,
I ended up defining a custom post-processing step after ItemPipelines in the following manner:
Each processor (analogous to pipeline) defines a function called process_iter_items
, which takes in an iterable of dicts, and must return an iterable of dicts. The set of processors is managed by BatchProcessorManager
, a MiddlewareManager class similar to ItemPipelineManager
, which supports the chaining of process_iter_items
functions instead of process_item
functions.
The chain of process_iter_items
is connected to the signal emitted by the last ItemPipeline that stores the items in MongoDB.
@mgachhui
This is just my personal opinion, but I think the intent behind per-item processing is that it happens on the fly and "in parallel", in the sense that we don't wait for other items to process the current one.
It makes sense to do this if the pipeline does not require information from other items (e.g. dropping an item based on one field's value), but sometimes it's just a bit restricting.
Edit: Added some more questions. In addition to what @eLRuLL said, I have a couple of questions.
Good questions @eLRuLL @mgachhui! My take on this:
AH, we can still fire a signal if an empty list is returned - there is a single input item, and this item is dropped.
So to sum up:
My doubts:
- A pipeline has to be able to return either a single object, or an iterable collection of it. It also has to accept both, otherwise it wouldn't make sense.
For me it makes more sense for pipeline to accept a single item, but return either a single item or a list/iterable. Before:
item1 --> [pipeline A] --> item1 --> [pipeline B] --> ...
item2 --> [pipeline A] --x raise DropItem()
New feature:
item3 --> [pipeline A],--> item4 --> [pipeline B] --> ...
'--> item5 --> [pipeline B] --> ...
So whenever a pipeline returns a list of items, each item is individually processed again in parallel, like the original call from the scraper. But that would mean that a group of related items can't be processed together, which means that merging items and so on ( or processing a group of items together) would not be possible in a pipeline.
Right; I'm not sure a pipeline is a right place to merge items. If we make it accept multiple inputs then merging options would still be very limited - we'd be able to only merge items if they are split by a previous pipeline. The downside is that all existing pipelines (both built-in and custom) will have to be changed to support this new input format. I think it doesn't worth it.
Another option is to add a new process_items
method which receives all items returned by a single callback. I'm also not wild about it - merging is still limited (now to items from a single response), and you can acheive the same using a spider middleware.
In general, to merge items you may need to have them all at once - not only items from a single response. That's why I think merging task is better suited for post-processing.
@dxue2012 you said:
It makes sense to do this if the pipeline does not require information from other items (e.g. dropping an item based on one field's value), but sometimes it's just a bit restricting.
What's your use case? When do you need to drop an item based on other item's field value?
@kmike
What's your use case? When do you need to drop an item based on other item's field value?
Sorry my previous comment was worded poorly. "dropping an item based on one field's value" was meant to be an example of a pipeline that doesn't require information from other items.
In general, to merge items you may need to have them all at once - not only items from a single response. That's why I think merging task is better suited for post-processing.
I agree that the merging task--and in general, tasks that require all items at once--should not happen in pipelines. What are your thoughts on Scrapy providing built-in post-processing capabilities that handle such tasks? This could be an extension to pipelines that begins after all pipelines have finished, and we could use a chain of process_items
functions (many-in, many-out).
edit: wording.
Created a new PR for the issue: #2029. The solution still has a few kinks to iron out though, so comments welcome!
Guys and girls, this is kind of large architectural change that might have various implications. You have to think of all the ways people use and abuse pipelines and make sure that any change is well thought, enough to not break tons of stuff.
To the extend that this feature is worth the effort, I would personally prefer to see two API calls that do explicitly pipeline.push_item_to_front()
and pipeline.push_item_to_next_stage()
and let them prove their usefulness for a while, before any of them becomes the default behaviour.
Just a random example of the things that could go bad - if a pipeline stage forks an Item
to 200 Item
s and the next pipeline stage is MySQL write stage which relies on CONCURRENT_ITEMS
in order to prevent MySQL from crashing, then, suddenly, as long as I can see in #2020, MySQL will receive 20000 write requests. (I might be getting the parallel
API wrong - but it might be a good point to have some tests in #2020 that prove that CONCURRENT_ITEMS
is still respected).
If new Item
s go to the beginning of the pipeline (instead of just subsequent stages), then you can end-up with the recursive situation of always forking as soon as you hit a pipeline stage i.e. from a single Item
you might end up with Inf
. Note that this seems unlikely and "definitely a bug" but as soon as you write somewhat generic pipeline code, I guess, that the number of forked Item
s actually depends on the content of the original Item
you crawled... which comes from the Web! Not good. It's ok if the cards have two sides, as @jeremysmitherman says - but what happens if suddenly they have 10k or 0? As soon as you allow your pipeline to loop... a web document becomes your program.
Let's see an example in the case you forward your Item
s just to subsequent stages. If you have stages A->B->C->D
and stage C
forks a few Item
s, then the new Item
might be missing fields that A->B
populated. Now D
will have to handle both "plain" and "forked" Item
s which makes it less reusable, maintainable and of course more complex. For example, if on stage A
I was calculating the price from the Zip code, then D
should have both that logic and the "passthrough" logic. It doesn't sound that good.
Not supporting forking Item
s seems like a restriction but it's a reasonable restriction to keep the complexity to an acceptable level and guide people to write simple and reasonably reusable pipelines.
As @pawelmhm and @kmike mention, a spider middleware is the right "instrument" for this type of functionality if you have a use case where it really makes sense. I would guess you have multiple Spiders creating the same type of Item
s (otherwise - obviously the code belongs to the Spider) and you have a business case where e.g. you crawl orders and orders might have multiple products. For some reason you want to denormalize early and issue multiple Product
Item
s where potentially many of them have the same order_id
. This is a case where one might think it's good to have a pipeline that returns iterable. But that's exactly what Spider Middlewares are there for - application specific logic that applies across many spiders. By default it expects an iterable and it's easy to use (example 1, 2). If you need to make split logic even more Spider-specific, you can write a middleware that delegates splitting to a Spider method (if it exists). This should work like charm.
Finally, Scrapy is supposed to be fast and relatively simple crawling engine. The fact that pipelines are called "pipeline"s doesn't mean that it's the right place to put computationally intensive DAGs. Storm, Flink and Spark are awesome for this job and will make you think about fault tolerance, redundancy and all those distributed aspects that one needs and are outside Scrapy's scope.
Just summarizing, if #2029 was 5 lines long and had 500 lines of tests, it would be fantastic to merge... It would also show that Scrapy - as an architecture - is open to this extension but it just hasn't been implemented yet. #2029 shows unbelievable skill and persistence - it's really admirable. I think that if it's about to be merged, someone should spend time to write sufficient tests and to make sure the community adopts it without it causing any serious problems.
Thanks for the comment @lookfwd . I'll try to address some of your points.
But I get your point about using Spider Middlewares instead. My solution is definitely convoluted, and if it can be done in an easier way, that would obviously take precedence.
I'm sorry to jump into slightly different topic. Would it be possible to reuse connection from previous pipeline for the Item? For example, my first pipeline has MongoDB connection and second pipeline to check duplicates would need to reuse same MongodDB connection from the first pipeline. I could create new MongoDB connection in second pipeline but just thought if there is a better way to minimize open connections. Thanks a lot.
@nengine, this is a question for the mailing list. The short answer is that you don't have to use pipelines/middlewares to separate different tasks. You can separate them with functions, classes and methods in one pipeline.
Thank you @Digenis
I belive we can close this issue as the consensus is to recommend using spider middlewares for this. I've opened https://github.com/scrapy/scrapy/issues/2240 to add a blob about this in the docs or FAQ.
The documentation for pipeline specifies that
process_item
must either return a dict with data,Item
object or raise aDropItem
exception. Is there a reason why we aren't allowed to return an iterable of dictionaries with data (orItem
objects)? It seems impossible to write a pipeline that modifies the input item and returns multiple items under the current framework.Thank you!