Python之threading模块使用

什么是线程

在说清楚什么是线程之前,我们先了解一下什么是进程。从操作系统层面来讲,一个进程相当于一个任务,比如说打开一个采集脚本,这就是一个进程,在采集任务中,为了加快采集速度,我们可以启动多线路采集,比如一个站点一个执行流,而不用等待一个站点采集完再采集下一个站点,这种同时多线路采集的策略就相当高效,这种执行流就是一个线程

当然,严格来说,线程并非同步执行,某一个时间点上实际只运行了一个线程。系统会给每一个可执行的线程一个小时间段来处理任务;当该时间段用完后,系统就会剥夺该线程所占用的资源,让其他线程获得执行的机会。在选择下一个线程时,系统会考虑线程的优先级

threading模块

threading是python中支持线程的模块。

主要方法

  • threading.currentThread():返回当前的线程对象
  • threading.enumerate():返回一个包含正在运行的线程的list
  • threading.activeCount():返回正在运行的线程数量,与len(threading.enumerate())有相同的结果
  • run(): 用来表示线程执行过程的方法
  • start():启动线程
  • join(time);阻塞当前线程,等待至调用线程中止,例如在线程A中调用了线程B.join(),必须等待B线程执行完成,A线程才会继续执行
  • isAlive():返回线程是否活动的
  • getName():返回线程名
  • setName():设置线程名

代码演示

下面用代码示范创建线程和启动线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# encoding=utf-8

import logging
import threading
import time

logging.basicConfig(level = logging.DEBUG, format='%(levelname)s - %(message)s')

logger = logging.getLogger(__name__)


# 定义一个线程
class myThread(threading.Thread):

def __init__(self, i):
threading.Thread.__init__(self)
self.i = i

# run()是表示线程的执行过程,启动线程后,run()就会执行
def run(self):
while self.i < 10:
logger.debug('Thread {} start, Time:{}, i:{}'.format(self.getName(), int(time.time()), self.i))
self.i += 1

logger.debug('Thread {} end, Time:{}'.format(self.getName(), int(time.time())))


threadindex = 1
# 创建的线程数量
threadnum = 5

while threadindex <= threadnum:
# 创建线程
thread = myThread(threadindex)
thread.start()
threadindex += 1

除了自定义类,集成threading.Thread类之外,我们还可以通过threading.Thread(target=执行方法,args=(元祖))来创建一个线程对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# encoding=utf-8

import logging
import threading
import time

logging.basicConfig(level = logging.DEBUG, format='%(levelname)s - %(message)s')

logger = logging.getLogger(__name__)


tlist = []

def action(i):
while i < 10:
logger.debug('Thread {} start, Time:{}, i:{}'.format(threading.current_thread().getName(), int(time.time()), i))
i += 1

logger.debug('Thread {} end, Time:{}'.format(threading.current_thread().getName(), int(time.time())))



threadindex = 1
# 创建的线程数量
threadnum = 5

while threadindex <= threadnum:
# 创建线程
thread = threading.Thread(target=action, args=(threadindex,))
thread.start()
threadindex = threadindex + 1

join()

目前,我们已经知道怎么去创建和启动线程,但是线程之间是互相独立的,无法协同。假设线程A依赖线程B的运行结果,那怎么办呢?Thread提供了让一个线程等待另一个线程完成的join()方法。当在某个程序执行流中调用其他线程的join()方法时,调用线程将被阻塞,直到被join()方法加入的join线程执行完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# encoding=utf-8

import logging
import threading
import time

logging.basicConfig(level = logging.DEBUG, format='%(levelname)s - %(message)s')

logger = logging.getLogger(__name__)


tlist = []

def action(i):
while i < 10:
logger.debug('Thread {} start, Time:{}, i:{}'.format(threading.current_thread().getName(), int(time.time()), i))
i += 1

logger.debug('Thread {} end, Time:{}'.format(threading.current_thread().getName(), int(time.time())))



threadindex = 1
# 创建的线程数量
threadnum = 5

while threadindex <= threadnum:
# 创建线程
thread = threading.Thread(target=action, args=(threadindex,))
thread.start()
# 阻塞线程,必须等thread线程执行完成才继续
thread.join()
threadindex = threadindex + 1

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
DEBUG - Thread Thread-1 start, Time:1564285042, i:1
DEBUG - Thread Thread-1 start, Time:1564285042, i:2
DEBUG - Thread Thread-1 start, Time:1564285042, i:3
DEBUG - Thread Thread-1 start, Time:1564285042, i:4
DEBUG - Thread Thread-1 start, Time:1564285042, i:5
DEBUG - Thread Thread-1 start, Time:1564285042, i:6
DEBUG - Thread Thread-1 start, Time:1564285042, i:7
DEBUG - Thread Thread-1 start, Time:1564285042, i:8
DEBUG - Thread Thread-1 start, Time:1564285042, i:9
DEBUG - Thread Thread-1 end, Time:1564285042
DEBUG - Thread Thread-2 start, Time:1564285042, i:2
DEBUG - Thread Thread-2 start, Time:1564285042, i:3
DEBUG - Thread Thread-2 start, Time:1564285042, i:4
DEBUG - Thread Thread-2 start, Time:1564285042, i:5
DEBUG - Thread Thread-2 start, Time:1564285042, i:6
DEBUG - Thread Thread-2 start, Time:1564285042, i:7
DEBUG - Thread Thread-2 start, Time:1564285042, i:8
DEBUG - Thread Thread-2 start, Time:1564285042, i:9
DEBUG - Thread Thread-2 end, Time:1564285042
DEBUG - Thread Thread-3 start, Time:1564285042, i:3
DEBUG - Thread Thread-3 start, Time:1564285042, i:4
DEBUG - Thread Thread-3 start, Time:1564285042, i:5
DEBUG - Thread Thread-3 start, Time:1564285042, i:6
DEBUG - Thread Thread-3 start, Time:1564285042, i:7
DEBUG - Thread Thread-3 start, Time:1564285042, i:8
DEBUG - Thread Thread-3 start, Time:1564285042, i:9
DEBUG - Thread Thread-3 end, Time:1564285042
DEBUG - Thread Thread-4 start, Time:1564285042, i:4
DEBUG - Thread Thread-4 start, Time:1564285042, i:5
DEBUG - Thread Thread-4 start, Time:1564285042, i:6
DEBUG - Thread Thread-4 start, Time:1564285042, i:7
DEBUG - Thread Thread-4 start, Time:1564285042, i:8
DEBUG - Thread Thread-4 start, Time:1564285042, i:9
DEBUG - Thread Thread-4 end, Time:1564285042
DEBUG - Thread Thread-5 start, Time:1564285042, i:5
DEBUG - Thread Thread-5 start, Time:1564285042, i:6
DEBUG - Thread Thread-5 start, Time:1564285042, i:7
DEBUG - Thread Thread-5 start, Time:1564285042, i:8
DEBUG - Thread Thread-5 start, Time:1564285042, i:9
DEBUG - Thread Thread-5 end, Time:1564285042

可以看到,线程出现了阻塞,变成顺序执行。

join(timeout=None)方法可以指定一个timeout参数,该参数指定等待被join的线程的时间最长为timeout秒。如果在timeout秒内被join的线程还没有执行结束,则不再等待。

守护进程

我们一般把在后台运行的进程称为守护进程, Thread则提供了daemon属性可以将指定线程设置成后台线程(守护进程)

后台线程主要是为前台线程提供服务,它有一个特征,如果所有的前台线程都死亡了,那么后台线程会自动死亡。也就是说当在整个虚拟机中只剩下后台线程时,程序就没有继续运行的必要了,所以程序也就退出了。

将daemon属性设为True,必须在start()方法调用之前进行,否则会引发RuntimeError异常。

代码演示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# encoding=utf-8

import logging
import threading
import time

logging.basicConfig(level = logging.DEBUG, format='%(levelname)s - %(message)s')

logger = logging.getLogger(__name__)


def deamon_action(i):
while i < 10:
logger.debug('Thread {} start, Time:{}, i:{}'.format(threading.current_thread().getName(), int(time.time()), i))
i += 1

logger.debug('Thread {} end, Time:{}'.format(threading.current_thread().getName(), int(time.time())))


def action(i):
while i < 5:
logger.debug('Thread {} start, Time:{}, i:{}'.format(threading.current_thread().getName(), int(time.time()), i))
i += 1

logger.debug('Thread {} end, Time:{}'.format(threading.current_thread().getName(), int(time.time())))


thread = threading.Thread(target=action, args=(1,))
thread.start()

# 守护进程
deamon_thread = threading.Thread(target=deamon_action, args=(1,))
deamon_thread.daemon=True
deamon_thread.start()

运行结果:

1
2
3
4
5
6
7
8
9
10
DEBUG - Thread Thread-1 start, Time:1564285760, i:1
DEBUG - Thread Thread-2 start, Time:1564285760, i:1
DEBUG - Thread Thread-1 start, Time:1564285760, i:2
DEBUG - Thread Thread-2 start, Time:1564285760, i:2
DEBUG - Thread Thread-1 start, Time:1564285760, i:3
DEBUG - Thread Thread-2 start, Time:1564285760, i:3
DEBUG - Thread Thread-1 start, Time:1564285760, i:4
DEBUG - Thread Thread-2 start, Time:1564285760, i:4
DEBUG - Thread Thread-1 end, Time:1564285760
DEBUG - Thread Thread-2 start, Time:1564285760, i:5

可以看到,守护进程并没有执行完成,当前台进程执行完成之后,守护进程也会自动退出。

线程同步

如果多线程对同一数据进行操作时,很有可能因为并发而导致数据与预期不一致。举个例子:两个线程,都判断一个数字大等于10,就减去10, 那如果A线程在判断完成后,并没有来得及减去10,就执行了B线程的减去10的操作,这个时候又回到A线程,继续减去10,这个时候,其实数值减去了20,这个明显和预期不符。所以在多线程编程时,我们一定要关注竞争资源的互斥访问

举个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# encoding=utf-8

import logging
import threading
import time

logging.basicConfig(level = logging.DEBUG, format='%(levelname)s - %(message)s')

logger = logging.getLogger(__name__)


num = 10;
def action():
global num
if num >= 10:
time.sleep(1)
num -= 10

logger.debug('Thread {} end, Time:{}, num:{}'.format(threading.current_thread().getName(), int(time.time()), num))


threadA = threading.Thread(target=action)
threadA.start()

threadB = threading.Thread(target=action)
threadB.start()

输出结果:

1
2
DEBUG - Thread Thread-1 end, Time:1564290516, num:0
DEBUG - Thread Thread-2 end, Time:1564290516, num:-10

针对竞争资源的互斥访问,可以用锁(Lock)来解决, 每次只允许一个线程操作的数据。

threading模块提供了LockRLock两个类,它们都提供了如下两个方法来加锁和释放锁:

  • 加锁:acquire(blocking=True, timeout=-1) 请求对Lock或RLock加锁,其中timeout参数指定加锁多少秒
  • 释放锁:release()

LockRLock的区别如下:

  • threading.Lock:它是一个基本的锁对象,每次只能锁定一次,其余的锁请求,需等待锁释放后才能获取。
  • threading.RLock:它代表可重入锁(Reentrant Lock)。对于可重入锁,在同一个线程中可以对它进行多次锁定,也可以多次释放。如果使用RLock,那么acquire()和release()方法必须成对出现

修改一下上面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# encoding=utf-8

import logging
import threading
import time

logging.basicConfig(level = logging.DEBUG, format='%(levelname)s - %(message)s')

logger = logging.getLogger(__name__)


num = 10;
def action():
global num
threadlock.acquire()
if num >= 10:
time.sleep(1)
num -= 10
threadlock.release()
logger.debug('Thread {} end, Time:{}, num:{}'.format(threading.current_thread().getName(), int(time.time()), num))


threadlock = threading.Lock()
threadA = threading.Thread(target=action)
threadA.start()

threadB = threading.Thread(target=action)
threadB.start()

运行结果:

1
2
DEBUG - Thread Thread-1 end, Time:1564290977, num:0
DEBUG - Thread Thread-2 end, Time:1564290977, num:0

死锁

通过线程锁我们解决了竞争资源互斥访问的问题, 但当两个线程相互等待对方释放同步锁时就会发生死锁。例如: 线程A锁住了记录1并等待记录2,而线程B锁住了记录2并等待记录1,这样两个线程就发生了死锁现象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# encoding=utf-8

import logging
import threading
import time

logging.basicConfig(level = logging.DEBUG, format='%(levelname)s - %(message)s')

logger = logging.getLogger(__name__)

threadlockA = threading.Lock()
threadlockB = threading.Lock()

def action_A():
threadlockA.acquire()
logger.debug('Thread {} start, Time:{}'.format(threading.current_thread().getName(), int(time.time())))
threadlockB.acquire()
time.sleep(5)
threadlockB.release()
logger.debug('Thread {} end, Time:{}'.format(threading.current_thread().getName(), int(time.time())))
threadlockA.release()

def action_B():
threadlockB.acquire()
logger.debug('Thread {} start, Time:{}'.format(threading.current_thread().getName(), int(time.time())))
threadlockA.acquire()
time.sleep(5)
threadlockA.release()
logger.debug('Thread {} end, Time:{}'.format(threading.current_thread().getName(), int(time.time())))
threadlockB.release()



threadA = threading.Thread(target=action_A)
threadA.start()

threadB = threading.Thread(target=action_B)
threadB.start()

输出结果:

1
2
DEBUG - Thread Thread-1 start, Time:1564296200
DEBUG - Thread Thread-2 start, Time:1564296200

然后一直卡主不动,因为A线程,启动时获取了A锁,打印日志之后,想获取B锁,但是B线程启动时已经获得了B锁,所以A线程此时会阻塞,等待B锁解锁; 同时B线程,启动时获取了B锁,打印日志之后,想获取A锁,但是A启动时已经获得了A锁,所以A线程此时会阻塞,等待A锁解锁。最后大家都相互等待,导致死锁。

解决方案

  • 避免一个线程对多个lock进行锁定,例如上述案例A/B线程都对A/B锁锁定了,就埋下了死锁的隐患;
  • 具有相同的加锁顺序,每个线程对锁的加锁顺序保持一致;
  • 使用定时锁,指定锁定时间,超时自动解锁;
有用就打赏一下作者吧!