Python Programming by Example (2015)
11. Concurrency
This chapter explains how to create concurrency in Python
11.1 Getting Started
We can run a program or a function in background using Python. In this chapter, we explore several scenarios to build concurrency application. In Python, you can read concurrency on this website, https://docs.python.org/3/library/concurrency.html .
11.2 Threading
Basically, we can implement threading using Thread with passing the function. You can call start() to run a thread.
import time
import threading
global running
def perform():
global running
counter = 0
running = True
while running:
print('counter:', str(counter))
time.sleep(2)
counter += 1
my_thread = threading.Thread(target=perform)
my_thread.setDaemon(True)
my_thread.start()
# python 3
input("Press Enter to stop...")
# python 2
#raw_input("Press Enter to stop...")
running = False
my_thread.join(2)
To exit from a thread, we can call join() with timeout value.
Save into a file, called ch11_01.py.
Run the program.
$ python3 ch11_01.py
When Thread.start() is called, it executes perform() function.
A sample of program output can be seen in Figure below.
Thread object can be implemented by a derived class from threading.Thread. For instance, we create a class, MyThread. It has inheritance from threading.Thread and implement run() function.
Write these scripts.
import time
import threading
class MyThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.running = False
def run(self):
counter = 0
self.running = True
while self.running:
print('counter:', str(counter))
time.sleep(2)
counter += 1
def stop(self):
print('stopping thread...')
self.running = False
self.join(2)
my_thread = MyThread()
my_thread.setDaemon(True)
my_thread.start()
# python 3
input("Press Enter to stop...")
# python 2
#raw_input("Press Enter to stop...")
my_thread.stop()
Save these scripts into a file, called ch11_02.py. You can see our object, my_thread, call start() then, it call run(). We call stop() function to stop our thread. Basically, MyThread will call join() while called stop() function.
Now you can run the program.
$ python3 ch11_02.py
A sample of program output:
11.3 Synchronizing Threads
We can synchronize among background codes in Python. In this section, we use mutex lock and event for thread synchronization.
Let's start.
11.3.1 Mutex Locks
The idea is a simple. When we access a resource, we call acquire(). If done, you call release() from Lock object.
For testing, we define a shared resource, such as a variable called value. This variable will be access by two threads. Only one thread can access this variable.
Ok, write these scripts.
import time
import threading
class MyThread(threading.Thread):
def __init__(self, name, o_lock):
threading.Thread.__init__(self)
self.name = name
self.running = False
self.value_lock = o_lock
def run(self):
global value
self.running = True
while self.running:
self.value_lock.acquire()
value += 1
print('value:', str(value),' from ', self.name)
self.value_lock.release()
time.sleep(2)
def stop(self):
print('stopping ', self.name)
self.running = False
self.join(2)
global value
value = 0
value_lock = threading.Lock()
my_thread1 = MyThread('Thread 1',value_lock)
my_thread1.setDaemon(True)
my_thread2 = MyThread('Thread 2',value_lock)
my_thread2.setDaemon(True)
my_thread1.start()
my_thread2.start()
# python 3
input("Press Enter to stop...")
# python 2
#raw_input("Press Enter to stop...")
my_thread1.stop()
my_thread2.stop()
Save into a file, called ch11_03.py.
Now you can run the program.
$ python3 ch11_03.py
Program output:
11.3.2 Event
Another option to synch threading, we can use Event object. Call wait() to block operation. It means the program can't execute codes after wait(). Then, call set() to release the blocking process.
For illustration, we create three worker threads. These threads will perform something after calling set() from event. This is useful for initialization state process.
Write these scripts.
import time
import threading
class Worker(threading.Thread):
def __init__(self, name, signal):
threading.Thread.__init__(self)
self.name = name
self.signal = signal
def run(self):
print('waiting from ', self.name)
self.signal.wait()
print('processing from ', self.name)
time.sleep(2)
print('done from ', self.name)
signal_event = threading.Event()
my_thread1 = Worker('Thread 1', signal_event)
my_thread1.setDaemon(True)
my_thread2 = Worker('Thread 2', signal_event)
my_thread2.setDaemon(True)
my_thread3 = Worker('Thread 3', signal_event)
my_thread3.setDaemon(True)
my_thread1.start()
my_thread2.start()
my_thread3.start()
# waiting for 10 seconds
time.sleep(10)
# start process
print('Send a signal to start processing')
signal_event.set()
# python 3
input("Press Enter to stop...")
# python 2
#raw_input("Press Enter to stop...")
print('Done all')
Save into a file, called ch11_04.py. Then, run the program.
$ python3 ch11_04.py
Program output:
11.4 Queue
In this section, we learn about Queue object from Python, https://docs.python.org/3/library/queue.html .
For testing, we add some jobs into Queue. Then, some worker threads will peak the job and run it.
Let's write these scripts.
import time
import threading
import queue
class Worker(threading.Thread):
def __init__(self, name, q):
threading.Thread.__init__(self)
self.name = name
self.q = q
def run(self):
while True:
if self.q.empty():
print('thread stopped')
break
job = self.q.get()
print('run job', str(job), ' from', self.name)
time.sleep(1)
self.q.task_done()
q = queue.Queue()
# generate jobs
print('populate jobs')
for i in range(15):
q.put(i)
my_thread1 = Worker('Thread 1', q)
my_thread1.setDaemon(True)
my_thread2 = Worker('Thread 2', q)
my_thread2.setDaemon(True)
my_thread3 = Worker('Thread 3', q)
my_thread3.setDaemon(True)
my_thread1.start()
my_thread2.start()
my_thread3.start()
my_thread1.join()
my_thread2.join()
my_thread3.join()
# python 3
input("Press Enter to stop...")
# python 2
#raw_input("Press Enter to stop...")
print('Done all')
Save into a file, called ch11_05.py.
Now you can run the program.
$ python3 ch11_05.py
Program output:
11.5 Multiprocessing
We can implement concurrency in Python using multiprocessing. You can read the information about this on this site, https://docs.python.org/3/library/multiprocessing.html .
11.5.1 Process
We can use Process object to implement multiprocessing in Python. For instance, we build a counter from a process.
Write these scripts.
import time
import multiprocessing
class MyProcess(multiprocessing.Process):
def __init__(self):
multiprocessing. Process.__init__(self)
self.running = False
def run(self):
counter = 0
self.running = True
while self.running:
print('counter:', str(counter))
time.sleep(2)
counter += 1
def stop(self):
print('stopping process...')
self.running = False
self.join(1)
my_process = MyProcess()
my_process.daemon = True
my_process.start()
# python 3
input("Press Enter to stop...")
# python 2
#raw_input("Press Enter to stop...")
my_process.stop()
Save into a file, called ch11_06.py, and run it.
$ python3 ch11_06.py
Program output:
11.5.2 Synchronizing Processes
We can synch among processes using multiprocessing.Value. This object implement synchronizing process.
For testing, we define a shared resource via multiprocessing.Value. Write these scripts.
import time
import multiprocessing
class MyProcess(multiprocessing.Process):
def __init__(self, name, shared_dt):
multiprocessing. Process.__init__(self)
self.name = name
self.running = False
self.shared_data = shared_dt
def run(self):
self.running = True
while self.running:
time.sleep(1)
with self.shared_data.get_lock():
self.shared_data.value += 1
print('value:', str(self.shared_data.value), ' from ', self.name)
def stop(self):
print('stopping ', self.name)
self.running = False
self.join(1)
shared_data = multiprocessing.Value('i', 0, lock=True)
my_process1 = MyProcess('Process 1', shared_data)
my_process1.daemon = True
my_process2 = MyProcess('Process 2', shared_data)
my_process2.daemon = True
my_process1.start()
my_process2.start()
# python 3
input("Press Enter to stop...")
# python 2
#raw_input("Press Enter to stop...")
my_process1.stop()
my_process2.stop()
Save the program into a file, called ch11_07.py.
Now you can run the program.
$ python3 ch11_07.py
Program output:
11.6 Parallel Tasks
The last section is to implement parallel tasks using concurrent.futures. There are two options to implement this: ThreadPoolExecutor and ProcessPoolExecutor.
11.6.1 ThreadPoolExecutor
ThreadPoolExecutor uses thread to do parallel tasks.
A sample of script for parallel tasks can be written the following script.
import queue
import concurrent.futures
import random
import time
import datetime
def perform(q, a, b, c):
rand_val = random.uniform(0, 2)
res = a * b * 10 - c * 2
time.sleep(rand_val)
q.put(res)
t1 = datetime.datetime.now()
q = queue.Queue()
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
for i in range(1, 15):
val_a = random.randint(1, 10)
val_b = random.randint(1, 10)
val_c = random.randint(1, 10)
executor.submit(perform, q, val_a, val_b, val_c)
print('Print results')
t2 = datetime.datetime.now()
while not q.empty():
print(q.get())
t = t2 - t1
print('total time:', str(t.total_seconds()), 'seconds')
Save into a file, called ch11_08.py.
$ python3 ch11_08.py
Program output:
11.6.2 ProcessPoolExecutor
ProcessPoolExecutor uses process to do parallel tasks.
We implement the same scenario from previous section.
import multiprocessing
import concurrent.futures
import random
import time
import datetime
def perform(q, a, b, c):
rand_val = random.uniform(0, 2)
res = a * b * 10 - c * 2
time.sleep(rand_val)
q.put(res)
t1 = datetime.datetime.now()
m = multiprocessing.Manager()
q = m.Queue()
with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
for i in range(1, 15):
val_a = random.randint(1, 10)
val_b = random.randint(1, 10)
val_c = random.randint(1, 10)
executor.submit(perform, q, val_a, val_b, val_c)
print('Print results')
t2 = datetime.datetime.now()
while not q.empty():
print(q.get())
t = t2 - t1
print('total time:', str(t.total_seconds()), 'seconds')
Save these scripts into a file, called ch11_09.py.
$ python3 ch11_09.py
Program output: