Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for custom exporter class #6273

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
63 changes: 63 additions & 0 deletions docs/topics/exporters.rst
Original file line number Diff line number Diff line change
Expand Up @@ -443,3 +443,66 @@ MarshalItemExporter
-------------------

.. autoclass:: MarshalItemExporter

Custom Item Exporters
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This section covers a generic concept like implementing custom item exporters, but focus exclusively in one type of such exporters. There could be others Custom Item Exporters, that are not aimed to be used as Streams. For example, to export into ASN.1 encoding.

I recommend to narrow this section and explain why you may want to do something like this: to export items into Streamed services like APIs or Databases rather tan dumping them into files.

=====================

You can also inherit from :class:`BaseItemExporter` and implement your own exporter.

Usage:

.. code-block:: python

custom_settings = {
"FEEDS": {"stdout://": {"format": "CustomAPI"}},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using stdout:// without any explanation seems like magic here. I would add some details about how this is required in order to avoid writing the data into a file.

Futhermore, there could be cases where you may not want to use stdout:// and instead you may want to use more descriptive ones like: postgres:// or mongodb://

"FEED_EXPORTERS": {"CustomAPI": "project.exporters.CustomExporter"},
}

Here you can override the :meth:`~BaseItemExporter.export_item`, :meth:`~BaseItemExporter.start_exporting`,
:meth:`~BaseItemExporter.finish_exporting`, :meth:`~BaseItemExporter.serialize_field` and :meth:`~BaseItemExporter.__init__` methods to customize the
behavior of your exporter.

.. tip:: In order to send non-blocking requests to external services, it is recommended
to use ``twisted.internet.threads.deferToThread``

.. warning:: The storage **file object** is passed to the custom exporter ``__init__`` method and
will behave like usual, according to the scheme in your FEEDS setting. It will also be closed
when a batch is completed or the spider is closed.

Example:

.. tip:: :meth:`~BaseItemExporter.finish_exporting` can be an async method.

.. code-block:: python

from scrapy.exporters import BaseItemExporter


class CustomExporter(BaseItemExporter):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rename it to HttpbinExporter , APIExporter or RequestPostExporter

def __init__(self, file, *args, dont_fail=False, **kwargs):
self._kwargs = kwargs
self.file = file
self._configure(kwargs, dont_fail=dont_fail)
self._pending_deferreds: List[defer.Deferred] = []

def start_exporting(self):
pass

def send_request(self, item):
response = requests.post("https://httpbin.org/anything", json={"item": item})

def export_item(self, item):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this work too?

@deferToThread
 def export_item(self, item):
      response = requests.post("https://httpbin.org/anything", json={"item": item})

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, use treq to directly return a defer object without having to use deferToThread

def export_item(self, item): 
    url = 'https://httpbin.org/post'
    data = {'item': item}
    headers = {'Content-Type': 'application/json'}
    return treq.post(url, json=data, headers=headers)

dfd = threads.deferToThread(self.send_request, item)
dfd.addCallbacks(
callback=lambda result: logger.info(f"Successful response: {result}"),
errback=lambda failure: logger.info(
f"Error exporting item {item}: {failure.getTraceback()}"
),
)
# optionally collect them to ensure that they are awaited
dfd.addBoth(lambda _: self._pending_deferreds.remove(dfd))
self._pending_deferreds.append(dfd)
return dfd

async def finish_exporting(self):
await DeferredList(self._pending_deferreds)
12 changes: 9 additions & 3 deletions scrapy/extensions/feedexport.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,8 +466,9 @@ def _get_exporter(
def finish_exporting(self) -> None:
if self._exporting:
assert self.exporter
self.exporter.finish_exporting()
result_or_coro = self.exporter.finish_exporting() # type: ignore
self._exporting = False
return result_or_coro


_FeedSlot = create_deprecated_class(
Expand Down Expand Up @@ -574,11 +575,11 @@ def get_file(slot_: FeedSlot) -> IO[bytes]:

if slot.itemcount:
# Normal case
slot.finish_exporting()
finish_exporting_deferred = maybeDeferred(slot.finish_exporting)
elif slot.store_empty and slot.batch_id == 1:
# Need to store the empty file
slot.start_exporting()
slot.finish_exporting()
finish_exporting_deferred = maybeDeferred(slot.finish_exporting)
else:
# In this case, the file is not stored, so no processing is required.
return None
Expand All @@ -600,6 +601,11 @@ def get_file(slot_: FeedSlot) -> IO[bytes]:
)
d.addBoth(lambda _: self._pending_deferreds.remove(d))

self._pending_deferreds.append(finish_exporting_deferred)
finish_exporting_deferred.addBoth(
lambda _: self._pending_deferreds.remove(finish_exporting_deferred)
)

return d

def _handle_store_error(
Expand Down
22 changes: 22 additions & 0 deletions tests/test_feedexport.py
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,11 @@ class FromCrawlerCsvItemExporter(CsvItemExporter, FromCrawlerMixin):
pass


class AsyncFinishExportingExporter(CsvItemExporter, FromCrawlerMixin):
async def finish_exporting(self):
self.stream.detach()


class FromCrawlerFileFeedStorage(FileFeedStorage, FromCrawlerMixin):
@classmethod
def from_crawler(cls, crawler, *args, feed_options=None, **kwargs):
Expand Down Expand Up @@ -1017,6 +1022,23 @@ def test_start_finish_exporting_items(self):
self.assertFalse(listener.start_without_finish)
self.assertFalse(listener.finish_without_start)

@defer.inlineCallbacks
def test_async_finish_exporting(self):
items = [
self.MyItem({"foo": "bar1", "egg": "spam1"}),
]
settings = {
"FEEDS": {self._random_temp_filename(): {"format": "csv"}},
"FEED_EXPORTERS": {"csv": AsyncFinishExportingExporter},
}
listener = IsExportingListener()
InstrumentedFeedSlot.subscribe__listener(listener)

with mock.patch("scrapy.extensions.feedexport.FeedSlot", InstrumentedFeedSlot):
_ = yield self.exported_data(items, settings)
self.assertFalse(listener.start_without_finish)
self.assertFalse(listener.finish_without_start)

@defer.inlineCallbacks
def test_start_finish_exporting_no_items(self):
items = []
Expand Down