How to use threading.Condition to wait for several Flask-APScheduler one-off jobs to complete execution in your Python 3 application
Previously, I discussed how to use Flask-APScheduler in your Python 3 Flask application to run multiple tasks in parallel, from a single HTTP request.
When we run jobs as discussed in that post, jobs are ran once by the underlying ApScheduler instance. In addition, our Flask endpoint return the HTTP response back to the HTTP client as soon as the jobs are scheduled.
If we do not want the HTTP client to know the outcome of the jobs within that HTTP call, then we are good. But what if we want to include any errors that the jobs encounter in the same HTTP response?
In such a situation, we will need a mechanism to wait for the one-off jobs to complete execution before returning that response.
Given that in mind, this post shows how we can use threading.Condition
to wait for several Flask-APScheduler
one-off jobs to complete execution.
What is threading.Condition
used for?
In case you are wondering, an instance of threading.Condition
can help us synchronize activities between different threads. Once we have an instance of threading.Condition
, we can use its wait
, notify
and notify_all
methods to synchronize activities between different threads.
If we want the thread holding the lock to release the lock and hold further execution, then we call the wait
method. When we want a waiting thread to resume operation, we call the notify
method. If we want all waiting threads to resume operations, then we call the notify_all
method.
Creating the JobsSynchronizer
class to help the Flask endpoint wait for jobs to complete execution before returning a HTTP response
Since threading.Condition
provides us with the ability to synchronize activities between different threads, we can build a helper class to serve our needs:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | import threading class JobsSynchronizer: def __init__( self , num_tasks_to_complete): self .condition = threading.Condition() self .current_completed = 0 self .status_list = [] self .num_tasks_to_complete = num_tasks_to_complete def notify_task_completion( self , status_to_report = None ): with ( self .condition): self .current_completed = self .current_completed + 1 if status_to_report is not None : self .status_list.append(status_to_report) # Notify waiting thread if self .current_completed = = self .num_tasks_to_complete: self .condition.notify() def wait_for_tasks_to_be_completed( self ): with( self .condition): self .condition.wait() def get_status_list( self ): return self .status_list |
Before we define the JobsSynchronizer
class, we import the threading
module into our script. When we do so, we can create an instance of threading.Condition
later.
__init__
method
Next, we define a constructor for the JobsSynchronizer
class which takes a total number of jobs to wait for, num_tasks_to_complete
, as input.
When an instance of JobsSynchronizer
is created, we perform the following actions inside the default constructor:
- create an instance of threading.Condition,
- maintain a counter of completed jobs
- create a list to keep statuses from the jobs.
- remember the total number of jobs to wait for.
notify_task_completion
method
After we define the default constructor, we define the notify_task_completion
method for jobs to call when they have finished their work. When we wrap self.condition
with the with keyword, the running thread acquire the lock at the start and release it at the end. After acquiring the lock, we update the counter of completed jobs. If there is anything to include in status_list
, then we add status_to_report
to status_list
. When the current thread is the last thread to wait for, we call self.condition.notify()
to wake up the waiting thread.
wait_for_tasks_to_be_completed
method
When the wait_for_tasks_to_be_completed
method is called, the executing thread calls self.condition.wait()
to wait for further notification.
get_status_list
method
Once the jobs are completed, the awakened thread need a way to get inputs from the jobs. Therefore, the get_status_list
method returns the status_list
back to the caller.
Example Python 3 Flask application that uses threading.Condition
to wait for several Flask-APScheduler one-off jobs to complete execution
After looking at how the JobsSynchronizer
class work, let's look at an example Python 3 Flask application:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 | from flask import Flask from flask_apscheduler import APScheduler import json, threading, time class JobsSynchronizer: def __init__( self , num_tasks_to_complete): self .condition = threading.Condition() self .current_completed = 0 self .status_list = [] self .num_tasks_to_complete = num_tasks_to_complete def notify_task_completion( self , status_to_report = None ): with ( self .condition): self .current_completed = self .current_completed + 1 if status_to_report is not None : self .status_list.append(status_to_report) # Notify waiting thread if self .current_completed = = self .num_tasks_to_complete: self .condition.notify() def wait_for_tasks_to_be_completed( self ): with( self .condition): self .condition.wait() def get_status_list( self ): return self .status_list app = Flask(__name__) scheduler = APScheduler() scheduler.init_app(app) scheduler.start() @app .route( '/' ) def welcome(): return 'Welcome to flask_apscheduler demo' , 200 @app .route( '/run-tasks' ) def run_tasks(): job_synchronizer = JobsSynchronizer( 10 ) for i in range ( 10 ): app.apscheduler.add_job(func = scheduled_task, trigger = 'date' , args = [job_synchronizer, i], id = 'j' + str (i)) job_synchronizer.wait_for_tasks_to_be_completed() return json.dumps(job_synchronizer.get_status_list()), 200 def scheduled_task(job_synchronizer, task_id): for i in range ( 10 ): time.sleep( 1 ) print ( 'Task {} running iteration {}' . format (task_id, i)) job_synchronizer.notify_task_completion( 'Task {} completed execution' . format (task_id)) app.run(host = '0.0.0.0' , port = 12345 ) |
After importing the dependencies that are needed, we included the JobsSynchronizer
class that we talked about earlier.
Initializing Flask and APScheduler
Once we had defined our helper class, we create a Flask
object and a APScheduler
object. After these two objects are created, we use scheduler.init_app(app)
to associate our APScheduler
object with our Flask
object.
Starting the APScheduler
object
When the APScheduler
object is associated with our Flask
object, we then start the APScheduler
object running at the background. Given that, we can then add tasks to the APScheduler
object to run our tasks later.
Scheduling jobs for APScheduler
inside the run_tasks function
Next, we define two functions and decorate them with @app.route
. Given that, we will have a HTTP server serving HTTP GET requests at the / and /run-tasks endpoints.
When a HTTP request is received at /run-tasks, run_tasks
will be run.
At this point in time, we create an instance of JobSynchronizer
that will help us wait for 10 jobs to complete.
After that, we add 10 jobs that will run scheduled_task
via app.apscheduler.add_job
and the following keyword arguments:
func=scheduled_task
: the function to run afterwards isscheduled_task
.trigger='date'
: an indication that we want to run the task immediately afterwards, since we did not supply an input forrun_date
.args=[job_synchronizer, i]
: a list of arguments to pass toscheduled_task
whenAPScheduler
runs it. This is a way that we can pass in the same job_synchronizer object to all the jobs.id='j'+str(i)
: an identifier for the job. When another job with the same identifier is added, it will be ignored by default. Therefore, we give each job a unique id to make sure all the jobs get to run.
After we had added the 10 jobs, we call job_synchronizer.wait_for_tasks_to_be_completed()
. When we do so, the executing thread waits until it is notified again at a later point in time.
Once it gets to run again, we can return the status list as a HTTP response back to the HTTP client.
Simulating long running tasks in scheduled_task
When scheduled_task
is run by APScheduler, the JobSynchronizer
instance that was created earlier will be available.
After printing 10 statements that are spaced with 1 second delays, we call job_synchronizer.notify_task_completion
. As mentioned earlier, the last job to complete will trigger a notification to wake up the thread that ran run_tasks
.
Starting the web server
Finally, at the end of the script, we start our web server through app.run(host='0.0.0.0', port=12345)
.
Observations from running the Python 3 Flask application
When you run the Python script, a web server will listen at port 12345. After that, you can then run the following command to initiate a HTTP request to run the 10 tasks:
1 | curl localhost:12345 /run-tasks |
When you run the command, you will notice that the call does not return immediately. After waiting for around 10 seconds, you will find a JSON list of status strings in the HTTP response.