Python之threading模块使用
# 什么是线程
在说清楚什么是线程之前,我们先了解一下什么是进程。从操作系统层面来讲,一个进程相当于一个任务,比如说打开一个采集脚本,这就是一个进程,在采集任务中,为了加快采集速度,我们可以启动多线路采集,比如一个站点一个执行流,而不用等待一个站点采集完再采集下一个站点,这种同时多线路采集的策略就相当高效,这种执行流就是一个线程。
当然,严格来说,线程并非同步执行,某一个时间点上实际只运行了一个线程。系统会给每一个可执行的线程一个小时间段来处理任务;当该时间段用完后,系统就会剥夺该线程所占用的资源,让其他线程获得执行的机会。在选择下一个线程时,系统会考虑线程的优先级。
# threading模块
threading是python中支持线程的模块。
# 主要方法
- threading.currentThread():返回当前的线程对象
- threading.enumerate():返回一个包含正在运行的线程的list
- threading.activeCount():返回正在运行的线程数量,与len(threading.enumerate())有相同的结果
- run(): 用来表示线程执行过程的方法
- start():启动线程
- join(time);阻塞当前线程,等待至调用线程中止,例如在线程A中调用了线程B.join(),必须等待B线程执行完成,A线程才会继续执行
- isAlive():返回线程是否活动的
- getName():返回线程名
- setName():设置线程名
# 代码演示
下面用代码示范创建线程和启动线程
# encoding=utf-8
import logging
import threading
import time
logging.basicConfig(level = logging.DEBUG, format='%(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# 定义一个线程
class myThread(threading.Thread):
def __init__(self, i):
threading.Thread.__init__(self)
self.i = i
# run()是表示线程的执行过程,启动线程后,run()就会执行
def run(self):
while self.i < 10:
logger.debug('Thread {} start, Time:{}, i:{}'.format(self.getName(), int(time.time()), self.i))
self.i += 1
logger.debug('Thread {} end, Time:{}'.format(self.getName(), int(time.time())))
threadindex = 1
# 创建的线程数量
threadnum = 5
while threadindex <= threadnum:
# 创建线程
thread = myThread(threadindex)
thread.start()
threadindex += 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
除了自定义类,集成threading.Thread类之外,我们还可以通过threading.Thread(target=执行方法,args=(元祖))
来创建一个线程对象。
# encoding=utf-8
import logging
import threading
import time
logging.basicConfig(level = logging.DEBUG, format='%(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
tlist = []
def action(i):
while i < 10:
logger.debug('Thread {} start, Time:{}, i:{}'.format(threading.current_thread().getName(), int(time.time()), i))
i += 1
logger.debug('Thread {} end, Time:{}'.format(threading.current_thread().getName(), int(time.time())))
threadindex = 1
# 创建的线程数量
threadnum = 5
while threadindex <= threadnum:
# 创建线程
thread = threading.Thread(target=action, args=(threadindex,))
thread.start()
threadindex = threadindex + 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# join()
目前,我们已经知道怎么去创建和启动线程,但是线程之间是互相独立的,无法协同。假设线程A依赖线程B的运行结果,那怎么办呢?Thread提供了让一个线程等待另一个线程完成的join()方法。当在某个程序执行流中调用其他线程的join()方法时,调用线程将被阻塞,直到被join()方法加入的join线程执行完成。
# encoding=utf-8
import logging
import threading
import time
logging.basicConfig(level = logging.DEBUG, format='%(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
tlist = []
def action(i):
while i < 10:
logger.debug('Thread {} start, Time:{}, i:{}'.format(threading.current_thread().getName(), int(time.time()), i))
i += 1
logger.debug('Thread {} end, Time:{}'.format(threading.current_thread().getName(), int(time.time())))
threadindex = 1
# 创建的线程数量
threadnum = 5
while threadindex <= threadnum:
# 创建线程
thread = threading.Thread(target=action, args=(threadindex,))
thread.start()
# 阻塞线程,必须等thread线程执行完成才继续
thread.join()
threadindex = threadindex + 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
运行结果:
DEBUG - Thread Thread-1 start, Time:1564285042, i:1
DEBUG - Thread Thread-1 start, Time:1564285042, i:2
DEBUG - Thread Thread-1 start, Time:1564285042, i:3
DEBUG - Thread Thread-1 start, Time:1564285042, i:4
DEBUG - Thread Thread-1 start, Time:1564285042, i:5
DEBUG - Thread Thread-1 start, Time:1564285042, i:6
DEBUG - Thread Thread-1 start, Time:1564285042, i:7
DEBUG - Thread Thread-1 start, Time:1564285042, i:8
DEBUG - Thread Thread-1 start, Time:1564285042, i:9
DEBUG - Thread Thread-1 end, Time:1564285042
DEBUG - Thread Thread-2 start, Time:1564285042, i:2
DEBUG - Thread Thread-2 start, Time:1564285042, i:3
DEBUG - Thread Thread-2 start, Time:1564285042, i:4
DEBUG - Thread Thread-2 start, Time:1564285042, i:5
DEBUG - Thread Thread-2 start, Time:1564285042, i:6
DEBUG - Thread Thread-2 start, Time:1564285042, i:7
DEBUG - Thread Thread-2 start, Time:1564285042, i:8
DEBUG - Thread Thread-2 start, Time:1564285042, i:9
DEBUG - Thread Thread-2 end, Time:1564285042
DEBUG - Thread Thread-3 start, Time:1564285042, i:3
DEBUG - Thread Thread-3 start, Time:1564285042, i:4
DEBUG - Thread Thread-3 start, Time:1564285042, i:5
DEBUG - Thread Thread-3 start, Time:1564285042, i:6
DEBUG - Thread Thread-3 start, Time:1564285042, i:7
DEBUG - Thread Thread-3 start, Time:1564285042, i:8
DEBUG - Thread Thread-3 start, Time:1564285042, i:9
DEBUG - Thread Thread-3 end, Time:1564285042
DEBUG - Thread Thread-4 start, Time:1564285042, i:4
DEBUG - Thread Thread-4 start, Time:1564285042, i:5
DEBUG - Thread Thread-4 start, Time:1564285042, i:6
DEBUG - Thread Thread-4 start, Time:1564285042, i:7
DEBUG - Thread Thread-4 start, Time:1564285042, i:8
DEBUG - Thread Thread-4 start, Time:1564285042, i:9
DEBUG - Thread Thread-4 end, Time:1564285042
DEBUG - Thread Thread-5 start, Time:1564285042, i:5
DEBUG - Thread Thread-5 start, Time:1564285042, i:6
DEBUG - Thread Thread-5 start, Time:1564285042, i:7
DEBUG - Thread Thread-5 start, Time:1564285042, i:8
DEBUG - Thread Thread-5 start, Time:1564285042, i:9
DEBUG - Thread Thread-5 end, Time:1564285042
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
可以看到,线程出现了阻塞,变成顺序执行。
**join(timeout=None)**方法可以指定一个timeout
参数,该参数指定等待被join的线程的时间最长为timeout
秒。如果在timeout
秒内被join
的线程还没有执行结束,则不再等待。
# 守护进程
我们一般把在后台运行的进程称为守护进程, Thread则提供了daemon属性可以将指定线程设置成后台线程(守护进程)。
后台线程主要是为前台线程提供服务,它有一个特征,如果所有的前台线程都死亡了,那么后台线程会自动死亡。也就是说当在整个虚拟机中只剩下后台线程时,程序就没有继续运行的必要了,所以程序也就退出了。
将daemon属性设为True,必须在start()方法调用之前进行,否则会引发RuntimeError异常。
# 代码演示
# encoding=utf-8
import logging
import threading
import time
logging.basicConfig(level = logging.DEBUG, format='%(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def deamon_action(i):
while i < 10:
logger.debug('Thread {} start, Time:{}, i:{}'.format(threading.current_thread().getName(), int(time.time()), i))
i += 1
logger.debug('Thread {} end, Time:{}'.format(threading.current_thread().getName(), int(time.time())))
def action(i):
while i < 5:
logger.debug('Thread {} start, Time:{}, i:{}'.format(threading.current_thread().getName(), int(time.time()), i))
i += 1
logger.debug('Thread {} end, Time:{}'.format(threading.current_thread().getName(), int(time.time())))
thread = threading.Thread(target=action, args=(1,))
thread.start()
# 守护进程
deamon_thread = threading.Thread(target=deamon_action, args=(1,))
deamon_thread.daemon=True
deamon_thread.start()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
运行结果:
DEBUG - Thread Thread-1 start, Time:1564285760, i:1
DEBUG - Thread Thread-2 start, Time:1564285760, i:1
DEBUG - Thread Thread-1 start, Time:1564285760, i:2
DEBUG - Thread Thread-2 start, Time:1564285760, i:2
DEBUG - Thread Thread-1 start, Time:1564285760, i:3
DEBUG - Thread Thread-2 start, Time:1564285760, i:3
DEBUG - Thread Thread-1 start, Time:1564285760, i:4
DEBUG - Thread Thread-2 start, Time:1564285760, i:4
DEBUG - Thread Thread-1 end, Time:1564285760
DEBUG - Thread Thread-2 start, Time:1564285760, i:5
2
3
4
5
6
7
8
9
10
可以看到,守护进程并没有执行完成,当前台进程执行完成之后,守护进程也会自动退出。
# 线程同步
如果多线程对同一数据进行操作时,很有可能因为并发而导致数据与预期不一致。举个例子:两个线程,都判断一个数字大等于10,就减去10, 那如果A线程在判断完成后,并没有来得及减去10,就执行了B线程的减去10的操作,这个时候又回到A线程,继续减去10,这个时候,其实数值减去了20,这个明显和预期不符。所以在多线程编程时,我们一定要关注竞争资源的互斥访问。
举个例子:
# encoding=utf-8
import logging
import threading
import time
logging.basicConfig(level = logging.DEBUG, format='%(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
num = 10;
def action():
global num
if num >= 10:
time.sleep(1)
num -= 10
logger.debug('Thread {} end, Time:{}, num:{}'.format(threading.current_thread().getName(), int(time.time()), num))
threadA = threading.Thread(target=action)
threadA.start()
threadB = threading.Thread(target=action)
threadB.start()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
输出结果:
DEBUG - Thread Thread-1 end, Time:1564290516, num:0
DEBUG - Thread Thread-2 end, Time:1564290516, num:-10
2
针对竞争资源的互斥访问,可以用**锁(Lock)**来解决, 每次只允许一个线程操作的数据。
threading模块提供了Lock和RLock两个类,它们都提供了如下两个方法来加锁和释放锁:
- 加锁:acquire(blocking=True, timeout=-1) 请求对Lock或RLock加锁,其中timeout参数指定加锁多少秒
- 释放锁:release()
Lock和RLock的区别如下:
- threading.Lock:它是一个基本的锁对象,每次只能锁定一次,其余的锁请求,需等待锁释放后才能获取。
- threading.RLock:它代表可重入锁(Reentrant Lock)。对于可重入锁,在同一个线程中可以对它进行多次锁定,也可以多次释放。如果使用RLock,那么acquire()和release()方法必须成对出现。
修改一下上面的代码:
# encoding=utf-8
import logging
import threading
import time
logging.basicConfig(level = logging.DEBUG, format='%(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
num = 10;
def action():
global num
threadlock.acquire()
if num >= 10:
time.sleep(1)
num -= 10
threadlock.release()
logger.debug('Thread {} end, Time:{}, num:{}'.format(threading.current_thread().getName(), int(time.time()), num))
threadlock = threading.Lock()
threadA = threading.Thread(target=action)
threadA.start()
threadB = threading.Thread(target=action)
threadB.start()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
运行结果:
DEBUG - Thread Thread-1 end, Time:1564290977, num:0
DEBUG - Thread Thread-2 end, Time:1564290977, num:0
2
# 死锁
通过线程锁我们解决了竞争资源互斥访问的问题, 但当两个线程相互等待对方释放同步锁时就会发生死锁。例如: 线程A锁住了记录1并等待记录2,而线程B锁住了记录2并等待记录1,这样两个线程就发生了死锁现象。
# encoding=utf-8
import logging
import threading
import time
logging.basicConfig(level = logging.DEBUG, format='%(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
threadlockA = threading.Lock()
threadlockB = threading.Lock()
def action_A():
threadlockA.acquire()
logger.debug('Thread {} start, Time:{}'.format(threading.current_thread().getName(), int(time.time())))
threadlockB.acquire()
time.sleep(5)
threadlockB.release()
logger.debug('Thread {} end, Time:{}'.format(threading.current_thread().getName(), int(time.time())))
threadlockA.release()
def action_B():
threadlockB.acquire()
logger.debug('Thread {} start, Time:{}'.format(threading.current_thread().getName(), int(time.time())))
threadlockA.acquire()
time.sleep(5)
threadlockA.release()
logger.debug('Thread {} end, Time:{}'.format(threading.current_thread().getName(), int(time.time())))
threadlockB.release()
threadA = threading.Thread(target=action_A)
threadA.start()
threadB = threading.Thread(target=action_B)
threadB.start()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
输出结果:
DEBUG - Thread Thread-1 start, Time:1564296200
DEBUG - Thread Thread-2 start, Time:1564296200
2
然后一直卡主不动,因为A线程,启动时获取了A锁,打印日志之后,想获取B锁,但是B线程启动时已经获得了B锁,所以A线程此时会阻塞,等待B锁解锁; 同时B线程,启动时获取了B锁,打印日志之后,想获取A锁,但是A启动时已经获得了A锁,所以A线程此时会阻塞,等待A锁解锁。最后大家都相互等待,导致死锁。
# 解决方案
- 避免一个线程对多个lock进行锁定,例如上述案例A/B线程都对A/B锁锁定了,就埋下了死锁的隐患;
- 具有相同的加锁顺序,每个线程对锁的加锁顺序保持一致;
- 使用定时锁,指定锁定时间,超时自动解锁;