Python多任務多線程任務管理類

LueOsburn 8年前發布 | 1K 次閱讀 Python

task_manage.py  

#coding=utf-8
import threading
import logging
import time

# 多任務多線程任務管理類
class task_manage():
    # name 任務名稱
    # task_func 任務函數指針
    # task_args 任務函數參數,默認空
    # check_func 任務是否執行完檢測函數指針,返回True表示還有任務沒有執行,默認調用自身的runtimes_control函數
    # check_args 任務檢測函數指針參數,默認空
    # max_thread_count 最大線程數
    # run_time 任務運行次數,設置check_func后,該參數無效
    def __init__(self,name,task_func,task_args=(),check_func=None,check_args=(),max_thread_count=1,run_time=1):
        self._is_finished = False
        self._task_func = task_func
        self._task_args = task_args
        self._max_thread_count = max_thread_count
        self._threads = []
        self._name = name
        self._check_args = check_args
        self._run_time = run_time
        self._task_index = 0
        if check_func is None:
            self._check_func = self.runtimes_control
            self._check_args = ()
        else:
            self._check_func = check_func
    # 任務運行次數控制函數
    def runtimes_control(self):
        if self._run_time > 0:
            self._run_time -= 1
            return True
        return False
    # 清除已退出線程
    def clear_exit_threads(self):
        for t in self._threads[:]:
            if not t.is_alive() :
                self._threads.remove(t)
    # 運行任務
    def run(self):
        while(len(self._threads)<self._max_thread_count and not self._is_finished):
            if self._check_func(*self._check_args):
                t = threading.Thread(target=self._task_func,args=self._task_args)
                self._threads.append(t)
                t.setDaemon(True)
                t.start()
                self._task_index += 1
                logging.debug("%s run %s" %(self._name,self._task_index))
            else:
                self._is_finished = True
                break
    # 對外接口,檢測是否所有任務都執行完成
    def is_finish(self):
        self.clear_exit_threads()
        self.run()
        return self._is_finished and len(self._threads)==0


def run_task_until_all_finished(manages=[]):
    while True:
        all_finished = True
        for manage in manages:
            all_finished = manage.is_finish() and all_finished
        if all_finished : break
        time.sleep(1)
    logging.debug('all have finished!')

task_test.py           

#coding=utf-8

from base.task_manage import *

def watch_movie(move_name):
    print("I am watching movie %s now." %(move_name))

def listen_music(music_name):
    print("I am listening music %s now." %(music_name))


manages=[]
manages.append( task_manage(name="watching movie",task_func=watch_movie,task_args=(u"捉妖記",),run_time=10,max_thread_count=3))
manages.append( task_manage(name="listening music",task_func=listen_music,task_args=(u"笨小孩",),run_time=10,max_thread_count=3))

run_task_until_all_finished(manages)
print('all have finished!')
quit()

[Python]代碼    

運行結果:
I am watching movie 捉妖記 now.
I am watching movie 捉妖記 now.
I am watching movie 捉妖記 now.
I am listening music 笨小孩 now.
I am listening music 笨小孩 now.
I am listening music 笨小孩 now.
I am watching movie 捉妖記 now.
I am watching movie 捉妖記 now.
I am watching movie 捉妖記 now.
I am listening music 笨小孩 now.
I am listening music 笨小孩 now.
I am listening music 笨小孩 now.
I am watching movie 捉妖記 now.
I am watching movie 捉妖記 now.
I am watching movie 捉妖記 now.
I am listening music 笨小孩 now.
I am listening music 笨小孩 now.
I am listening music 笨小孩 now.
I am watching movie 捉妖記 now.
I am listening music 笨小孩 now.
all have finished!
 本文由用戶 LueOsburn 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!