Многопоточность

Введение

Примеры

  • 7

    Основы многопоточности

    Используя threading модуль, новый поток выполнения может быть начат путем создания нового threading.Thread и присвоения ему функции для выполнения:

     import threading
    
    def foo():
      print "Hello threading!"
    
    my_thread = threading.Thread(target=foo)
    
     

    target параметр ссылается на функцию (или вызываемый объект) , который будет работать. Нить не начнется выполнение до start не вызывается на Thread объекта.

    Начало темы

     my_thread.start() # prints 'Hello threading!'
    
     

    Теперь, когда my_thread побежал и завершается, вызов start снова будет производить RuntimeError . Если вы хотите , чтобы запустить нить , как демон, передавая daemon=True kwarg или установить my_thread.daemon в True перед вызовом start() , вызывает ваш Thread запустить в фоновом режиме , как демон.

    Присоединение к теме

    В тех случаях , когда вы разбили одну большую работу на несколько маленьких и хотят запустить их одновременно, но нужно ждать их все , чтобы закончить , прежде чем продолжить, Thread.join() является метод , который вы ищете.

    Например, допустим, вы хотите загрузить несколько страниц веб-сайта и собрать их в одну страницу. Вы бы сделали это:

     import requests
    from threading import Thread
    from queue import Queue
    
    q = Queue(maxsize=20)
    def put_page_to_q(page_num):
        q.put(requests.get('http://some-website.com/page_%s.html' % page_num)
    
    def compile(q):
        # magic function that needs all pages before being able to be executed
        if not q.full():
            raise ValueError
        else:
            print("Done compiling!")
    
    threads = []
    for page_num in range(20):
         t = Thread(target=requests.get, args=(page_num,))
         t.start()
         threads.append(t)
    
    # Next, join all threads to make sure all threads are done running before
    # we continue. join() is a blocking call (unless specified otherwise using 
    # the kwarg blocking=False when calling join)
    for t in threads:
        t.join()
    
    # Call compile() now, since all threads have completed
    compile(q)
    
     

    Более пристальный взгляд на то, как join() работает , можно найти здесь .

    Создать пользовательский класс потока

    Использование threading.Thread класса мы можем создать подкласс нового пользовательского класса Thread. мы должны переопределить run метод в подклассе.

     from threading import Thread
    import time
    
    class Sleepy(Thread):
    
        def run(self):
            time.sleep(5)
            print("Hello form Thread")
    
    if __name__ == "__main__":
        t = Sleepy()
        t.start()      # start method automatic call Thread class run method.
        # print 'The main program continues to run in foreground.'
        t.join()
        print("The main program continues to run in the foreground.")
    
    
    
     
  • 4

    Общение между потоками

    В вашем коде несколько потоков, и вам нужно безопасно общаться между ними.

    Вы можете использовать Queue из queue библиотеки.

     from queue import Queue
    from threading import Thread
    
    # create a data producer 
    def producer(output_queue):
        while True:
            data = data_computation()
    
            output_queue.put(data)
    
    # create a consumer
    def consumer(input_queue):
        while True:
            # retrieve data (blocking)
            data = input_queue.get()
    
            # do something with the data
    
            # indicate data has been consumed
            input_queue.task_done()
    
     

    Создание потоков производителей и потребителей с общей очередью

     q = Queue()
    t1 = Thread(target=consumer, args=(q,))
    t2 = Thread(target=producer, args=(q,))
    t1.start()
    t2.start() 
  • 2

    Создание рабочего пула

    Использование threading & queue :

     from socket import socket, AF_INET, SOCK_STREAM
    from threading import Thread
    from queue import Queue
    
    def echo_server(addr, nworkers):
        print('Echo server running at', addr)
        # Launch the client workers
        q = Queue()
        for n in range(nworkers):
            t = Thread(target=echo_client, args=(q,))
            t.daemon = True
            t.start()
    
        # Run the server
        sock = socket(AF_INET, SOCK_STREAM)
        sock.bind(addr)
        sock.listen(5)
        while True:
            client_sock, client_addr = sock.accept()
            q.put((client_sock, client_addr))
    
    echo_server(('',15000), 128)
    
     

    Использование concurrent.futures.Threadpoolexecutor :

     from socket import AF_INET, SOCK_STREAM, socket
    from concurrent.futures import ThreadPoolExecutor
    
    def echo_server(addr):
        print('Echo server running at', addr)
        pool = ThreadPoolExecutor(128)
        sock = socket(AF_INET, SOCK_STREAM)
        sock.bind(addr)
        sock.listen(5)
        while True:
            client_sock, client_addr = sock.accept()
            pool.submit(echo_client, client_sock, client_addr)
    
    echo_server(('',15000))
    
     

    Поваренная книга Питона, 3-е издание, Дэвидом Бизли и Брайаном К. Джонсом (О'Рейли). Copyright 2013 Дэвид Бизли и Брайан Джонс, 978-1-449-34037-7.

  • 0

    Расширенное использование многопоточности

    Этот раздел будет содержать некоторые из самых сложных примеров, реализованных с использованием многопоточности.

    Усовершенствованный принтер (регистратор)

    Поток, который печатает все, получен и изменяет вывод в соответствии с шириной терминала. Приятной особенностью является то, что также «уже записанный» вывод изменяется при изменении ширины терминала.

    #!/usr/bin/env python2
    
    import threading
    import Queue
    import time
    import sys
    import subprocess
    from backports.shutil_get_terminal_size import get_terminal_size
    
    printq = Queue.Queue()
    interrupt = False
    lines = []
    
    def main():
    
        ptt = threading.Thread(target=printer) # Turn the printer on
        ptt.daemon = True
        ptt.start()
    
        # Stupid example of stuff to print
        for i in xrange(1,100):
            printq.put(' '.join([str(x) for x in range(1,i)]))           # The actual way to send stuff to the printer
            time.sleep(.5)
    
    def split_line(line, cols):
        if len(line) > cols:
            new_line = ''
            ww = line.split()
            i = 0
            while len(new_line) <= (cols - len(ww[i]) - 1):
                new_line += ww[i] + ' '
                i += 1
                print len(new_line)
            if new_line == '':
                return (line, '')
    
            return (new_line, ' '.join(ww[i:]))
        else:
            return (line, '')
    
    
    def printer():
    
        while True:
            cols, rows = get_terminal_size() # Get the terminal dimensions
            msg = '#' + '-' * (cols - 2) + '#\n' # Create the
            try:
                new_line = str(printq.get_nowait())
                if new_line != '!@#EXIT#@!': # A nice way to turn the printer
                                             # thread out gracefully
                    lines.append(new_line)
                    printq.task_done()
                else:
                    printq.task_done()
                    sys.exit()
            except Queue.Empty:
                pass
    
            # Build the new message to show and split too long lines
            for line in lines:
                res = line          # The following is to split lines which are
                                    # longer than cols.
                while len(res) !=0:
                    toprint, res = split_line(res, cols)
                    msg += '\n' + toprint
    
            # Clear the shell and print the new output
            subprocess.check_call('clear') # Keep the shell clean
            sys.stdout.write(msg)
            sys.stdout.flush()
            time.sleep(.5) 
  • 0

    Остановляемая нить с петлей

     import threading
    import time
    
    class StoppableThread(threading.Thread):
        """Thread class with a stop() method. The thread itself has to check
        regularly for the stopped() condition."""
    
        def __init__(self):
            super(StoppableThread, self).__init__()
            self._stop_event = threading.Event()
    
        def stop(self):
            self._stop_event.set()
    
        def join(self, *args, **kwargs):
            self.stop()
            super(StoppableThread,self).join(*args, **kwargs)
    
        def run()
            while not self._stop_event.is_set():
                print("Still running!")
                time.sleep(2)
            print("stopped!"
    
     

    На основе этого вопроса .

Синтаксис

Параметры

Примечания