Using requests with Threading¶
New in version 0.4.0.
The toolbelt provides a simple API for using requests with threading.
A requests Session is documented as threadsafe but there are still a couple corner cases where it isn’t perfectly threadsafe. The best way to use a Session is to use one per thread.
The implementation provided by the toolbelt is naïve. This means that we use one session per thread and we make no effort to synchronize attributes (e.g., authentication, cookies, etc.). It also means that we make no attempt to direct a request to a session that has already handled a request to the same domain. In other words, if you’re making requests to multiple domains, the toolbelt’s Pool will not try to send requests to the same domain to the same thread.
This module provides three classes:
In 98% of the situations you’ll want to just use a
Pool
and you’ll treat a
ThreadResponse
as if it were a
regular requests.Response
.
Here’s an example:
# This example assumes Python 3
import queue
from requests_toolbelt.threaded import pool
jobs = queue.Queue()
urls = [
# My list of URLs to get
]
for url in urls:
queue.put({'method': 'GET', 'url': url})
p = pool.Pool(job_queue=q)
p.join_all()
for response in p.responses():
print('GET {0}. Returned {1}.'.format(response.request_kwargs['url'],
response.status_code))
This is clearly a bit underwhelming. This is why there’s a short-cut class
method to create a Pool
from a list
of URLs.
from requests_toolbelt.threaded import pool
urls = [
# My list of URLs to get
]
p = pool.Pool.from_urls(urls)
p.join_all()
for response in p.responses():
print('GET {0}. Returned {1}.'.format(response.request_kwargs['url'],
response.status_code))
If one of the URLs in your list throws an exception, it will be accessible
from the exceptions()
generator.
from requests_toolbelt.threaded import pool
urls = [
# My list of URLs to get
]
p = pool.Pool.from_urls(urls)
p.join_all()
for exc in p.exceptions():
print('GET {0}. Raised {1}.'.format(exc.request_kwargs['url'],
exc.message))
If instead, you want to retry the exceptions that have been raised you can do the following:
from requests_toolbelt.threaded import pool
urls = [
# My list of URLs to get
]
p = pool.Pool.from_urls(urls)
p.join_all()
new_pool = pool.Pool.from_exceptions(p.exceptions())
new_pool.join_all()
Not all requests are advisable to retry without checking if they should be retried. You would normally check if you want to retry it.
The Pool
object takes 4 other keyword arguments:
initializer
This is a callback that will initialize things on every session created. The callback must return the session.
auth_generator
This is a callback that is called after the initializer callback has modified the session. This callback must also return the session.
num_processes
By passing a positive integer that indicates how many threads to use. It is
None
by default, and will use the result ofmultiproccessing.cpu_count()
.session
You can pass an alternative constructor or any callable that returns a
requests.Sesssion
like object. It will not be passed any arguments because arequests.Session
does not accept any arguments.
Finally, if you don’t want to worry about Queue or Pool management, you can try the following:
from requests_toolbelt import threaded
requests = [{
'method': 'GET',
'url': 'https://httpbin.org/get',
# ...
}, {
# ...
}, {
# ...
}]
responses_generator, exceptions_generator = threaded.map(requests)
for response in responses_generator:
# Do something
API and Module Auto-Generated Documentation¶
This module provides the API for requests_toolbelt.threaded
.
The module provides a clean and simple API for making requests via a thread pool. The thread pool will use sessions for increased performance.
A simple use-case is:
from requests_toolbelt import threaded
urls_to_get = [{
'url': 'https://api.github.com/users/sigmavirus24',
'method': 'GET',
}, {
'url': 'https://api.github.com/repos/requests/toolbelt',
'method': 'GET',
}, {
'url': 'https://google.com',
'method': 'GET',
}]
responses, errors = threaded.map(urls_to_get)
By default, the threaded submodule will detect the number of CPUs your
computer has and use that if no other number of processes is selected. To
change this, always use the keyword argument num_processes
. Using the
above example, we would expand it like so:
responses, errors = threaded.map(urls_to_get, num_processes=10)
You can also customize how a requests.Session
is initialized by
creating a callback function:
from requests_toolbelt import user_agent
def initialize_session(session):
session.headers['User-Agent'] = user_agent('my-scraper', '0.1')
session.headers['Accept'] = 'application/json'
responses, errors = threaded.map(urls_to_get,
initializer=initialize_session)
-
requests_toolbelt.threaded.
map
(requests, **kwargs)¶ Simple interface to the threaded Pool object.
This function takes a list of dictionaries representing requests to make using Sessions in threads and returns a tuple where the first item is a generator of successful responses and the second is a generator of exceptions.
- Parameters
- Returns
Tuple of responses and exceptions from the pool
- Return type
Inspiration is blatantly drawn from the standard library’s multiprocessing library. See the following references:
multiprocessing’s pool source
map and map_async inspiration
-
class
requests_toolbelt.threaded.pool.
Pool
(job_queue, initializer=None, auth_generator=None, num_processes=None, session=<class 'requests.sessions.Session'>)¶ Pool that manages the threads containing sessions.
- Parameters
queue (queue.Queue) – The queue you’re expected to use to which you should add items.
initializer (collections.Callable) – Function used to initialize an instance of
session
.auth_generator (collections.Callable) – Function used to generate new auth credentials for the session.
num_process (int) – Number of threads to create.
session (requests.Session) –
-
exceptions
()¶ Iterate over all the exceptions in the pool.
- Returns
Generator of
ThreadException
-
classmethod
from_exceptions
(exceptions, **kwargs)¶ Create a
Pool
from anThreadException
s.Provided an iterable that provides
ThreadException
objects, this classmethod will generate a new pool to retry the requests that caused the exceptions.- Parameters
exceptions (iterable) – Iterable that returns
ThreadException
kwargs – Keyword arguments passed to the
Pool
initializer.
- Returns
An initialized
Pool
object.- Return type
-
get_exception
()¶ Get an exception from the pool.
- Return type
-
get_response
()¶ Get a response from the pool.
- Return type
-
join_all
()¶ Join all the threads to the master thread.
-
responses
()¶ Iterate over all the responses in the pool.
- Returns
Generator of
ThreadResponse
-
class
requests_toolbelt.threaded.pool.
ThreadResponse
(request_kwargs, response)¶ A wrapper around a requests Response object.
This will proxy most attribute access actions to the Response object. For example, if you wanted the parsed JSON from the response, you might do:
thread_response = pool.get_response() json = thread_response.json()
-
request_kwargs
¶ The original keyword arguments provided to the queue
-
response
¶ The wrapped response
-
-
class
requests_toolbelt.threaded.pool.
ThreadException
(request_kwargs, exception)¶ A wrapper around an exception raised during a request.
This will proxy most attribute access actions to the exception object. For example, if you wanted the message from the exception, you might do:
thread_exc = pool.get_exception() msg = thread_exc.message
-
exception
¶ The captured and wrapped exception
-
request_kwargs
¶ The original keyword arguments provided to the queue
-