PySide6/PyQT多线程之 线程安全:互斥锁&条件变量的最佳实践

举报
frica01 发表于 2023/10/31 20:56:52 2023/10/31
【摘要】 在使用PySide6/PyQT中多线程的线程安全问题,介绍了QMutex 和 QWaitCondition,一句话说完了就是:只使用QMutex就可以实现线程安全,但是使用 QWaitCondition能够更加精细地控制线程的运行。

前言

PySide6/PyQT中使用多线程时,线程锁和线程安全是非常重要的概念。本文将介绍线程锁和线程安全的基本概念,以及如何在PySide6/PyQT中使用它们。

使用PySide6/PyQT开发GUI应用程序,在多个线程同时访问同一个共享对象时候,如果没有进行同步处理那就可能会导致数据不一致或者一些意料之外的问题发生。因此,确保线程安全是非常重要的。

而说到线程安全,最简单的处理方法就是用 互斥锁条件变量。

所以本文着力介绍PySide6/PyQT 的三个组件:QMutexQMutexLockerQWaitCondition ,以解决 线程安全 的问题。



先说结论:

  • 只使用QMutex就可以实现线程安全,但是QWaitCondition能够更加精细地控制线程的运行。

代码在下面,直接拿来就能用。
这篇文章写的我很累,以后不考虑分享这么乱糟糟的文章了。


知识点📖📖

本文用到的几个PySide6的知识点及链接。

作用 链接
创建新线程 QThread
对象间通信的机制,允许对象发送和接收信号 Signal
用于响应Signal信号的方法 Slot
线程同步机制,用于协调多个线程之间对共享资源的访问 QMutex
锁定互斥锁的对象,简化代码,避免手动处理锁的加锁和解锁操作 QMutexLocker
线程同步机制,一般配合 QMutex 使用 QWaitCondition

多线程通信和同步

在多线程的编程中,线程之间的通信和同步是绕不开的话题。

通信:

  • PySide6/PyQT 提供了信号槽(Signal and Slot) 机制,它们用于在线程之间传递消息和触发事件。通过在不同的线程中发送信号和连接槽函数,可以实现线程间的通信。

同步:

  • 在共享对象被多个线程同时访问时候容易出现意料之外的问题,需要保护好资源争夺;
  • 互斥锁(QMutex)条件变量(QWaitCondition) 等同步机制可以用于控制线程的并发访问,确保线程安全和避免线程竞争。

注意事项

关于保证线程安全,可以遵循以下几个原则:

  • 尽量不要在多个线程中访问和修改同一个对象;
  • 如果必须要访问和修改同一个对象,需要使用线程同步机制,例如信号槽、互斥锁、条件变量等;
  • 避免使用共享状态,例如全局变量,尽量将状态封装在对象内部,并使用线程安全的方式访问和修改状态;
  • 不要使用原生的线程库,选择使用 Qt 提供的 QThreadQThreadPool 等线程库。

上面这几点其实差不多一个意思,有些概念就行。

互斥锁

下面这份代码使用了 QMetuxQMutexLocker

# -*- coding: utf-8 -*-
# Name:         demo3.py
# Author:       小菜
# Date:         2023/5/4 
# Description:

import sys

from PySide6.QtCore import (QThread, Signal, Slot, QMutex)
from PySide6.QtWidgets import (QApplication, QLabel, QPushButton, QVBoxLayout, QWidget)


class Worker(QThread):
    valueChanged = Signal(tuple)

    def __init__(self, name, mutex, main_window):
        super().__init__()
        self.name = name
        self.mutex = mutex
        self.main_window = main_window

    def run(self):
        for i in range(5):
            with QMutexLocker(self.mutex):
                self.main_window.count += 1
                self.msleep(100)
                self.valueChanged.emit((self.name, self.main_window.count))


class MainWindow(QWidget):
    def __init__(self):
        self.count = int()
        self.mutex = QMutex()  # 定义锁对象
        super().__init__()
        self.setup_ui()
        self.setup_thread()

    def setup_ui(self):
        layout = QVBoxLayout()
        self.label = QLabel("Count: 0", self)

        self.btn_start = QPushButton("Start", self)
        self.btn_start.clicked.connect(self.start_threads)

        layout.addWidget(self.label)
        layout.addWidget(self.btn_start)
        self.setLayout(layout)
        self.setGeometry(300, 300, 250, 150)
        self.show()

    def setup_thread(self):
        self.worker1 = Worker('thread_1', self.mutex, self)
        self.worker2 = Worker('thread_2', self.mutex, self)
        self.worker1.valueChanged.connect(self.thread_finished)
        self.worker2.valueChanged.connect(self.thread_finished)

    def start_threads(self):
        self.worker1.start()
        self.worker2.start()

    @Slot(tuple)
    def thread_finished(self, value):
        print(str(value))
        self.label.setText(f"{str(value)}")


if __name__ == '__main__':
    app = QApplication()
    ex = MainWindow()
    sys.exit(app.exec())

代码释义

Worker类

  • 继承了QThread类,并创建 信号valueChanged
  • 接收两个参数,一个为name,一个为MainWindow类实例对象本身;
  • 循环5次,每次为 MainWindow实例count累加1;
  • 并使用valueChanged 信号将执行结果和执行次数发送出去;

MainWindow类

  • 继承了QWidget,实现了包含一个按钮和一个标签的窗口;
  • setup_ui函数为窗口布局;
  • setup_thread函数 实例化两个Worker类,并将它们的信号连接到Slot槽函数 thread_finished
  • 按钮绑定了 start_threads函数
  • thread_finished函数Slot槽函数,用于接收 Worker 的信号发送的结果。

代码中两个线程同时访问了 self.count,结合本篇文章标题,看看下面的运行结果。


运行结果

代码运行效果如下图所示:

  • 左边是没有考虑线程安全的,右边是上面代码运行结果;
  • 如果程序没有出错,那应该是按照顺序打印 1~10
  • 左边,这个并不是按照顺序的,说明它们在互相争夺共享资源 self.count时候出现了岔子,这是不推荐的;
  • 右边,是线程安全的,是推荐的。

QMetux & QMetuxLocker

PySide6中可以通过QMutex实现线程锁。

QMutex是一个用于同步线程执行的互斥锁,可以保护共享资源不受多个线程同时访问以及修改;

QMutexLocker可以简化 QMutex 的代码,避免手动处理锁的加锁和解锁操作 ,且避免了资源竞争和死锁的发生。

在线程编程中,竞态条件是一种常见的问题。当多个线程尝试同时修改共享资源时,可能会发生竞态条件,导致程序出现意外的行为。为了解决这个问题,可以使用线程锁来保护共享资源。

下面是一个简单的示例,展示如何使用 QMutex + QMutexLocker 来保证线程安全。

from PySide6.QtCore import QObject, QMutex, QMutexLocker

class MyObject(QObject):
    def __init__(self):
        super().__init__()
        self.shared_data = list()

        # 创建互斥锁
        self.mutex = QMutex()

    def add_data(self, data):
        # 不需要手动解锁,QMutexLocker会在离开作用域时自动解锁
        with QMutexLocker(self.mutex):
            self.shared_data.append(data)

上面代码包含了一个共享的列表shared_data

add_data 方法中,使用 QMutex + QMutexLocker 方法锁定了共享资源,然后向列表中添加了一个新的元素。

这里使用了QMutexLocker,它也会在离开add_data方法时自动释放锁。这样就确保了线程安全,即使出现异常也不会影响其他线程的访问。


QWaitCondition

QWaitCondition 是一种同步机制,可以用来协调线程之间的操作。它通过让线程进入等待状态,等待某个条件成立来保证线程安全

在线程安全中,使用QWaitCondition 不是必须的。只是用了 能够更加精细地控制线程的运行。

一些概念

QWaitCondition 是 PySide6 中的一个同步机制,它可以阻塞一个线程,直到收到一个信号通知。

QWaitCondition 主要由三个方法构成:

  • wait(mutex: QMutex, time: int = ULONG_MAX): 阻塞当前线程,直到收到该 QWaitCondition 对象的信号,或等待时间超时。在等待期间,会释放 mutex
  • wakeOne(): 发送一个信号来唤醒一个等待在该 QWaitCondition 上的线程。如果没有线程等待,则该方法没有任何效果;
  • wakeAll(): 发送一个信号来唤醒所有等待在该 QWaitCondition 上的线程。

使用 QWaitCondition 的基本步骤是:

  1. 创建一个 QWaitCondition 对象和一个 QMutex 对象,并将它们传递给需要协调的线程。

  2. 在需要等待信号的线程中,使用 wait() 方法阻塞线程。

  3. 在发送信号的线程中,使用 wakeOne()wakeAll() 方法发送信号。




代码

is_paused 标志为 True 时,线程会调用 self.cond.wait(self.mutex) 阻塞自己,等待 resume_thread() 方法的调用。
resume_thread() 方法被调用后,会将 is_paused 设置为 False,然后调用 self.cond.wakeOne() 发送信号来唤醒被阻塞的线程。

在线程中使用了条件变量 self.cond 和互斥锁 self.mutex 来控制线程的暂停和恢复,QWaitCondition 可以让线程在等待状态时休眠,直到某个条件被满足并且可以被唤醒。这样避免线程在忙等待时占用 CPU 资源,并减少程序的资源消耗。

# -*- coding: utf-8 -*-
# Name:         demo.py
# Author:       小菜
# Date:         2023/5/4
# Description:

import sys
from PySide6.QtCore import (QThread, QWaitCondition, QMutex, Signal, QMutexLocker)
from PySide6.QtWidgets import (QWidget, QVBoxLayout, QPushButton, QProgressBar, QApplication)


class MyThread(QThread):
    valueChange = Signal(int)

    def __init__(self):
        super().__init__()
        self.is_paused = bool()
        self.progress_value = int(0)
        self.mutex = QMutex()
        self.cond = QWaitCondition()

    def pause_thread(self):
        with QMutexLocker(self.mutex):
            self.is_paused = True

    def resume_thread(self):
        if not self.is_paused:
            return
        with QMutexLocker(self.mutex):
            self.is_paused = False
            # 释放其它线程
            self.cond.wakeOne()

    def run(self):
        while True:
            with QMutexLocker(self.mutex):
                while self.is_paused:
                    # 阻塞当前线程
                    self.cond.wait(self.mutex)
                if self.progress_value > 100:
                    self.progress_value = 0
                    return
                self.progress_value += 1
                self.valueChange.emit(self.progress_value)
                self.msleep(10)


class MainWindow(QWidget):
    def __init__(self):
        super().__init__()
        self.setup_ui()
        self.setup_thread()

    def setup_ui(self):
        layout = QVBoxLayout(self)
        self.progressBar = QProgressBar(self)
        layout.addWidget(self.progressBar)
        layout.addWidget(QPushButton(r'启动&&停止', self, clicked=self.paused_thread))
        layout.addWidget(QPushButton('恢复线程', self, clicked=self.wake_thread))
        self.show()

    def setup_thread(self):
        self.thread = MyThread()
        self.thread.valueChange.connect(self.progressBar.setValue)

    def paused_thread(self):
        if not self.thread.isRunning():
            self.thread.start()
        else:
            self.thread.pause_thread()

    def wake_thread(self):
        self.thread.resume_thread()


if __name__ == '__main__':
    app = QApplication()
    window = MainWindow()
    sys.exit(app.exec())

代码释义

MyThread类

  • MyThread 继承自 QThread,重写run方法
  • while 循环中更新进度条的值,并通过信号 valueChange 发送更新后的进度值
  • 包含一个互斥量 mutex 和一个等待条件 cond,用于实现线程的暂停和恢复

MainWindow类

  • setup_ui() 中创建了一个进度条和两个按钮,分别用于启动/停止线程和恢复线程;
  • setup_thread() 中创建了一个 MyThread 对象,并连接了其信号 valueChange 和界面上的进度条;
  • paused_thread() 方法用于启动/停止线程,如果线程没有启动,则启动线程;
  • 如果线程已经启动,则调用 MyThread 中的 pause_thread() 方法,将线程暂停;
  • wake_thread() 方法用于恢复线程,调用 MyThread 中的 resume_thread() 方法,将线程从暂停中恢复。

运行结果

总结✨✨

只使用QMutex就可以实现线程安全,但是加上QWaitCondition能够更加精细地控制线程的运行。

后话

本次分享到此结束,
see you~🐱‍🏍🐱‍🏍

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。