mirror of
https://github.com/Estom/notes.git
synced 2026-02-03 02:23:31 +08:00
Python 协程并发与线程并发
This commit is contained in:
@@ -0,0 +1,140 @@
|
||||
|
||||
> [参考文献](https://blog.csdn.net/cdlwhm1217096231/article/details/99704267)
|
||||
|
||||
|
||||
# 7.1 基于线程的并发执行
|
||||
|
||||
## 多线程实现
|
||||
|
||||
|
||||
### 方法和属性
|
||||

|
||||
|
||||
|
||||
### theading模块包含以下的类:
|
||||
|
||||
* Thread: 基本线程类
|
||||
* Lock:互斥锁
|
||||
* RLock:可重入锁,使单一进程再次获得已持有的锁(递归锁)
|
||||
* Condition:条件锁,使得一个线程等待另一个线程满足特定条件,比如改变状态或某个值。
|
||||
* Semaphore:信号锁,为线程间共享的有限资源提供一个”计数器”,如果没有可用资源则会被阻塞。
|
||||
* Event:事件锁,任意数量的线程等待某个事件的发生,在该事件发生后所有线程被激活。
|
||||
* Timer:一种计时器
|
||||
* Barrier:Python3.2新增的“阻碍”类,必须达到指定数量的线程后才可以继续执行。
|
||||
|
||||
|
||||
|
||||
|
||||
### 面向对象实现方法
|
||||
|
||||
```py
|
||||
import threading
|
||||
|
||||
class MyThread(threading.Thread):
|
||||
def __init__(self, thread_name):
|
||||
super(MyThread, self).__init__(name = thread_name)
|
||||
|
||||
# 重写run()方法
|
||||
def run(self):
|
||||
print("%s正在运行中......" % self.name)
|
||||
|
||||
for i in range(10):
|
||||
MyThread("thread-" + str(i)).start() # 启动线程
|
||||
```
|
||||
|
||||
### 函数式实现方法
|
||||
|
||||
```py
|
||||
import threading
|
||||
import time
|
||||
|
||||
def show(arg):
|
||||
time.sleep(1)
|
||||
print("thread " + str(arg) + " running......")
|
||||
|
||||
for i in range(10):
|
||||
t = threading.Thread(target=show, args=(i,)) # 注意传入的参数一定是一个元组!
|
||||
t.start()
|
||||
```
|
||||
|
||||
### 类参数说明
|
||||
* 对于Thread类,它的定义如下:
|
||||
* 参数group是预留的,用于将来扩展
|
||||
* 参数target是一个可调用对象,在线程启动后执行;
|
||||
* 参数name是线程的名字。默认值为“Thread-N“,N是一个数字
|
||||
* 参数args和kwargs分别表示调用target时的参数列表和关键字参数
|
||||
```
|
||||
threading.Thread(self, group=None, target=None, name=None,agrs=(),kwargs=None, *, daemon=None)
|
||||
```
|
||||
|
||||
### 类方法说明
|
||||
|
||||

|
||||
|
||||
在多线程执行过程中,有一个特点要注意,每个线程各自执行自己的任务,不等待其他的线程,自顾自的完成自己的任务
|
||||
|
||||
* Python默认会等待最后一个线程执行完毕后才退出。上面例子中,主线程没有等待子线程t执行完毕,而是啥都不管,继续往下执行它自己的代码,执行完毕后也没有结束整个程序,而是等待子线程t执行完毕,整个程序才结束。
|
||||
```py
|
||||
import time
|
||||
import threading
|
||||
|
||||
def doWaiting():
|
||||
print("开始等待:", time.strftime('%H:%M:%S'))
|
||||
time.sleep(3)
|
||||
print("结束等待:", time.strftime("%H:%M:%S"))
|
||||
|
||||
t = threading.Thread(target=doWaiting)
|
||||
t.start()
|
||||
|
||||
time.sleep(1) # 确保线程已经启动
|
||||
print("开始工作")
|
||||
print("结束工作")
|
||||
```
|
||||
|
||||
* 有时候我们希望主线程等等子线程,不要“埋头往前跑”。那要怎么办?使用join()方法
|
||||
|
||||
```py
|
||||
import threading
|
||||
import time
|
||||
|
||||
def doWaiting():
|
||||
print("开始等待: ", time.strftime("%H:%M:%S"))
|
||||
time.sleep(3)
|
||||
print("结束等待:", time.strftime("%H:%M:%S"))
|
||||
|
||||
t = threading.Thread(target=doWaiting)
|
||||
|
||||
t.start()
|
||||
# 确保线程t已经启动
|
||||
time.sleep(1)
|
||||
|
||||
print("开始阻塞主线程,等待子线程执行")
|
||||
t.join() # 主线程不要着急走,等等子线程吧!!! 将一直堵塞,直到t运行结束
|
||||
print("子线程执行完,结束阻塞,主线程继续执行!")
|
||||
```
|
||||
|
||||
* 还可以使用setDaemon(True)吧所有的子线程都变成主线程的守护进程。当主线程结束后,守护子进程也会随之结束,整个程序也跟着退出。
|
||||
|
||||
```py
|
||||
import threading
|
||||
import time
|
||||
|
||||
def run():
|
||||
print(threading.current_thread().getName(), "开始工作")
|
||||
time.sleep(2) # 子线程停两秒
|
||||
print("子线程工作执行完成!")
|
||||
|
||||
for i in range(3):
|
||||
t = threading.Thread(target=run)
|
||||
t.setDaemon(True) # 把子线程设置为守护进程,必须在start()之前设置!!!
|
||||
|
||||
t.start()
|
||||
|
||||
time.sleep(1) # 主线程停1s
|
||||
print("主线程结束运行...")
|
||||
print(threading.active_count()) # 输出活跃的线程数量
|
||||
```
|
||||
|
||||
|
||||
## 线程对象
|
||||
## 锁对象
|
||||
@@ -1 +1,145 @@
|
||||
# asyncio --- 异步 I/O
|
||||
# asyncio --- 异步 I/O
|
||||
|
||||
|
||||
> 踩坑 进行总结。tmd。只记录最新版的Python能够使用的协程方式。
|
||||
|
||||
|
||||
|
||||
## 对象
|
||||
|
||||
|
||||
### Eventloop
|
||||
Eventloop可以说是asyncio应用的核心,是中央总控。Eventloop实例提供了注册、取消和执行任务和回调的方法。
|
||||
|
||||
把一些异步函数(就是任务,Task,一会就会说到)注册到这个事件循环上,事件循环会循环执行这些函数(但同时只能执行一个),当执行到某个函数时,如果它正在等待I/O返回,事件循环会暂停它的执行去执行其他的函数;当某个函数完成I/O后会恢复,下次循环到它的时候继续执行。因此,这些异步函数可以协同(Cooperative)运行:这就是事件循环的目标。
|
||||
|
||||
|
||||
|
||||
### Coroutine
|
||||
协程(Coroutine)本质上是一个函数,特点是在代码块中可以将执行权交给其他协程
|
||||
|
||||
|
||||
|
||||
|
||||
```
|
||||
❯ cat coro1.py
|
||||
import asyncio
|
||||
|
||||
|
||||
async def a():
|
||||
print('Suspending a')
|
||||
await asyncio.sleep(0)
|
||||
print('Resuming a')
|
||||
|
||||
|
||||
async def b():
|
||||
print('In b')
|
||||
|
||||
|
||||
async def main():
|
||||
await asyncio.gather(a(), b())
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
asyncio.run(main())
|
||||
```
|
||||
1. 协程要用async def声明,Python 3.5时的装饰器写法已经过时,我就不列出来了。
|
||||
2. asyncio.gather用来并发运行任务,在这里表示协同的执行a和b2个协程
|
||||
3. 在协程a中,有一句await asyncio.sleep(0),await表示调用协程,sleep 0并不会真的sleep(因为时间为0),但是却可以把控制权交出去了。
|
||||
4. **非常关键:**只要在await操作的时候。才会将执行权交给其他协程。否则会一直执行该协程。
|
||||
|
||||
|
||||
|
||||
### Future
|
||||
它代表了一个「未来」对象,异步操作结束后会把最终结果设置到这个Future对象上。Future是对协程的封装,不过日常开发基本是不需要直接用这个底层Future类的。
|
||||
|
||||
可以对这个Future实例添加完成后的回调(add_done_callback)、取消任务(cancel)、设置最终结果(set_result)、设置异常(如果有的话,set_exception)等。现在我们让Future完成:
|
||||
|
||||
```
|
||||
In : for a in dir(future):
|
||||
...: if not a.startswith('_'):
|
||||
...: print(a)
|
||||
...:
|
||||
add_done_callback
|
||||
cancel
|
||||
cancelled
|
||||
done
|
||||
exception
|
||||
get_loop
|
||||
remove_done_callback
|
||||
result
|
||||
set_exception
|
||||
set_result
|
||||
```
|
||||
|
||||
### Task
|
||||
Eventloop除了支持协程,还支持注册Future和Task2种类型的对象,那为什么要存在Future和Task这2种类型呢?
|
||||
|
||||
先回忆前面的例子,Future是协程的封装,Future对象提供了很多任务方法(如完成后的回调、取消、设置任务结果等等),但是开发者并不需要直接操作Future这种底层对象,而是用Future的子类Task协同的调度协程以实现并发。
|
||||
|
||||
```py
|
||||
# 或者用task = loop.create_task(a())
|
||||
In : task = asyncio.ensure_future(a())
|
||||
|
||||
In : task
|
||||
Out: <Task pending coro=<a() running at /Users/dongwm/mp/2019-05-22/coro1.py:4>>
|
||||
|
||||
In : task.done()
|
||||
Out: False
|
||||
|
||||
In : await task
|
||||
Suspending a
|
||||
Resuming a
|
||||
|
||||
In : task
|
||||
Out: <Task finished coro=<a() done, defined at /Users/dongwm/mp/2019-05-22/coro1.py:4> result=None>
|
||||
|
||||
In : task.done()
|
||||
Out: True
|
||||
```
|
||||
|
||||
## asyncio 正确使用
|
||||
|
||||
1. async函数本身不会并发。调用async函数会顺序执行。除非一组并行执行的async函数中有一部分**被await协程阻塞**了。
|
||||
2. 强烈注意。即是await堵塞可以导致进程间切换。但如果**await占用的资源加锁**,不支持多协程访问,同样会导致,切换到其他线程的时候,堵塞在锁上。
|
||||
|
||||
|
||||
|
||||
### asyncio.create_task()/asyncio.ensure_future()
|
||||
|
||||
* 创建**task和future**,不执行内容,只创建待执行的task和future。
|
||||
* 配合**await关键字可以并发执行**
|
||||
```py
|
||||
async def c3():
|
||||
task1 = asyncio.create_task(a())
|
||||
task2 = asyncio.create_task(b())
|
||||
await task1
|
||||
await task2
|
||||
```
|
||||
|
||||
### asyncio.gather()
|
||||
|
||||
* gather多个**异步函数**可以**并发执行**
|
||||
* asyncio.gather 会创建 2 个子任务,当出现 await 的时候,程序会在这 2 个子任务之间进行调度。
|
||||
|
||||
|
||||
```
|
||||
async def c1():
|
||||
await asyncio.gather(a(), b())
|
||||
```
|
||||
|
||||
### asyncio.wait()
|
||||
|
||||
* wait()多个**异步task或异步函数**构成的**列表**,可以**并发执行**
|
||||
|
||||
```
|
||||
await asyncio.wait([a(), b()])
|
||||
```
|
||||
|
||||
### asyncio.run()
|
||||
|
||||
* 堵塞执行一个异步任务直到结束
|
||||
* **同步执行。不能并发**
|
||||
```
|
||||
asyncio.run(main())
|
||||
```
|
||||
|
||||
BIN
Python/python标准库2/image/2021-08-18-21-39-38.png
Normal file
BIN
Python/python标准库2/image/2021-08-18-21-39-38.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 30 KiB |
BIN
Python/python标准库2/image/2021-08-18-21-42-36.png
Normal file
BIN
Python/python标准库2/image/2021-08-18-21-42-36.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 56 KiB |
7
grpc/protos.md
Normal file
7
grpc/protos.md
Normal file
@@ -0,0 +1,7 @@
|
||||
## 常见类型
|
||||
|
||||
|
||||
|
||||
## 生成文件
|
||||
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
* [ ] 联邦学习针对恶意软件数据集的实验。(一周)
|
||||
* [ ] 差分隐私应用的实验(一周)
|
||||
* 完成恶意软件检测展示系统(四周)
|
||||
* [ ] 技术选型
|
||||
* [ ] 实现Python/linux下RPC通信模块
|
||||
* [ ] 实现算法训练模块(算法核心)-----联邦学习算法框架
|
||||
* [ ] 实现算法检测模块(算法核心)-----恶意软件检测结果
|
||||
|
||||
Reference in New Issue
Block a user