• acquire(...) Запрашивает замок. Фактически вызывается одноименный метод принадлежащего объекту–условию объекта–замка.
• release() Снимает замок.
• wait([timeout]) Переводит поток в режим ожидания. Этот метод может быть вызван только в том случае, если вызывающий его поток получил замок. Метод снимает замок и блокирует поток до появления объявлений, то есть вызовов методов notify() и notifyAll() другими потоками. Необязательный аргумент timeout задает таймаут ожидания в секундах. При выходе из ожидания поток снова запрашивает замок и возвращается из метода wait().
• notify() Выводит из режима ожидания один из потоков, ожидающих данные условия. Метод можно вызвать, только овладев замком, ассоциированным с условием. Документация предупреждает, что в будущих реализациях модуля из целей оптимизации этот метод будет прерывать ожидание сразу нескольких потоков. Сам по себе метод notify() не приводит к продолжению выполнения ожидавших условия потоков, так как этому препятствует занятый замок. Потоки получают управление только после снятия замка потоком, вызвавшим метод notify().
• notifyAll() Этот метод аналогичен методу notify(), но прерывает ожидание всех ждущих выполнения условия потоков.
В следующем примере условия используются для оповещения потоков о прибытии новой порции данных (организуется связь производитель — потребитель, producer — consumer):
import threading
cv = threading.Condition()
class Item:
"""Класс–контейнер для элементов, которые будут потребляться
в потоках"""
def __init__(self):
self._items = []
def is_available(self):
return len(self._items) > 0
def get(self):
return self._items.pop()
def make(self, i):
self._items.append(i)
item = Item()
def consume():
"""Потребление очередного элемента (с ожиданием его появления)"""
cv.acquire()
while not item.is_available():
cv.wait()
it = item.get()
cv.release()
return it
def consumer():
while True:
print consume()
def produce(i):
"""Занесение нового элемента в контейнер и оповещение потоков"""
cv.acquire()
item.make(i)
cv.notify()
cv.release()
p1 = threading.Thread(target=consumer, name="t1")
p1.setDaemon(True)
p2 = threading.Thread(target=consumer, name="t2")
p2.setDaemon(True)
p1.start()
p2.start()
produce("ITEM1")
produce("ITEM2")
produce("ITEM3")
produce("ITEM4")
p1.join()
p2.join()
В этом примере условие cv отражает наличие необработанных элементов в контейнере item. Функция produce() «производит» элементы, а consume(), работающая внутри потоков, «потребляет». Стоит отметить, что в приведенном виде программа никогда не закончится, так как имеет бесконечный цикл в потоках, а в главном потоке — ожидание завершения этих потоков. Еще одна особенность — признак демона, установленный с помощью метода setDaemon() объекта–потока до его старта.
Процесс, показанный в предыдущем примере, имеет значение, достойное отдельного модуля. Такой модуль в стандартной библиотеке языка Python есть, и он называется Queue.
Помимо исключений — Queue.Full (очередь переполнена) и Queue.Empty (очередь пуста) — модуль определяет класс Queue, заведующий собственно очередью.
Собственно, здесь можно привести аналог примера выше, но уже с использованием класса Queue.Queue:
import threading, Queue
item = Queue.Queue()
def consume():
"""Потребление очередного элемента (с ожиданием его появления)"""
return item.get()
def consumer():
while True:
print consume()
def produce(i):
"""Занесение нового элемента в контейнер и оповещение потоков"""
item.put(i)
p1 = threading.Thread(target=consumer, name="t1")
p1.setDaemon(True)
p2 = threading.Thread(target=consumer, name="t2")
p2.setDaemon(True)
p1.start()
p2.start()
produce("ITEM1")
produce("ITEM2")
produce("ITEM3")
produce("ITEM4")
p1.join()
p2.join()
Следует отметить, что все блокировки спрятаны в реализации очереди, поэтому в коде они явным образом не присутствуют.
По сравнению с модулем threading, модуль thread предоставляет низкоуровневый доступ к потокам. Многие функции модуля threading, который рассматривался до этого, реализованы на базе модуля thread. Здесь стоит сделать некоторые замечания по применению потоков вообще. Документация по Python предупреждает, что использование потоков имеет особенности:
• Исключение KeyboardInterrupt (прерывание от клавиатуры) может быть получено любым из потоков, если в поставке Python нет модуля signal (для обработки сигналов).
• Не все встроенные функции, блокированные ожиданием ввода, позволяют другим потокам работать. Правда, основные функции вроде time.sleep(), select.select(), метод read() файловых объектов не блокируют другие потоки.
• Невозможно прервать метод acquire(), так как исключение KeyboardInterrupt возбуждается только после возврата из этого метода.
• Нежелательно, чтобы главный поток завершался раньше других потоков, так как не будут выполнены необходимые деструкторы и даже части finally в операторах try–finally. Это связано с тем, что почти все операционные системы завершают приложение, у которого завершился главный поток.
Визуализация работы потоков
Следующий пример иллюстрирует параллельность выполнения потоков, используя возможности библиотеки графических примитивов Tkinter (она входит в стандартную поставку Python). Несколько потоков наперегонки увеличивают размеры прямоугольника некоторого цвета. Цветом победившего потока окрашивается кнопка Go:
import threading, time, sys
from Tkinter import Tk, Canvas, Button, LEFT, RIGHT, NORMAL, DISABLED
global champion
# Задается дистанция, цвет полосок и другие параметры
distance = 300
colors = ["Red","Orange","Yellow","Green","Blue","DarkBlue","Violet"]
nrunners = len(colors) # количество дополнительных потоков
positions = [0] * nrunners # список текущих позиций
h, h2 = 20, 10 # параметры высоты полосок
def run(n):
"""Программа бега n–го участника (потока)"""
global champion
while 1:
for i in range(10000): # интенсивные вычисления
pass
graph_lock.acquire()
positions[n] += 1 # передвижение на шаг
if positions[n] == distance: # если уже финиш
if champion is None: # и чемпион еще не определен,
champion = colors[n] # назначается чемпион
graph_lock.release()
break
graph_lock.release()
def ready_steady_go():
"""Инициализация начальных позиций и запуск потоков"""
graph_lock.acquire()
for i in range(nrunners):
positions[i] = 0
threading.Thread(target=run, args=[i,]).start()
graph_lock.release()
def update_positions():
"""Обновление позиций"""
graph_lock.acquire()
for n in range(nrunners):
c.coords(rects[n], 0, n*h, positions[n], n*h+h2)
tk.update_idletasks() # прорисовка изменений
graph_lock.release()
def quit():
"""Выход из программы"""
tk.quit()
sys.exit(0)
# Прорисовка окна, основы для прямоугольников и самих прямоугольников,
# кнопок для пуска и выхода
tk = Tk()
tk.title("Соревнование потоков")
c = Canvas(tk, width=distance, height=nrunners*h, bg="White")
c.pack()
rects = [c.create_rectangle(0, i*h, 0, i*h+h2, fill=colors[i])
for i in range(nrunners)]
go_b = Button(text="Go", command=tk.quit)
go_b.pack(side=LEFT)
quit_b = Button(text="Quit", command=quit)
quit_b.pack(side=RIGHT)
# Замок, регулирующий доступ к функции пакета Tk
graph_lock = threading.Lock()
# Цикл проведения соревнований
while 1:
go_b.config(state=NORMAL), quit_b.config(state=NORMAL)
tk.mainloop() # Ожидание нажатия клавиш
champion = None
ready_steady_go()
go_b.config(state=DISABLED), quit_b.config(state=DISABLED)
# Главный поток ждет финиша всех участников
while sum(positions) < distance*nrunners:
update_positions()
update_positions()