LOGO OA教程 ERP教程 模切知识交流 PMS教程 CRM教程 开发文档 其他文档  
 
网站管理员

【C#】为什么选择 async/await 而不是线程?

admin
2024年4月25日 18:46 本文热度 869

一个常见的说法是,线程可以做到 async/await 所能做的一切,且更简单。那么,为什么大家选择 async/await 呢?

Rust 是一种低级语言,它不会隐藏协程的复杂性。这与像 Go 这样的语言相反,在 Go 中,异步是默认发生的,程序员甚至不需要考虑它。

聪明的程序员试图避免复杂性。因此,他们看到 async/await 中的额外复杂性,并质疑为什么需要它。当考虑到存在一个合理的替代方案——操作系统线程时,这个问题尤其相关。

让我们通过 async 来进行一次思维之旅吧。

背景概览

通常,代码是线性的;一件事情在另一件事情之后运行。它看起来像这样:

fn main() {
    foo();
    bar();
    baz();
}

很简单,对吧?然而,有时你会想同时运行很多事情。这方面的典型例子是 web 服务器。考虑以下用线性代码编写的:

fn main() -> io::Result<()> {
    let socket = TcpListener::bind("0.0.0.0:80")?;
    loop {
        let (client, _) = socket.accept()?;
        handle_client(client)?;
    }
}

想象一下,如果 handle_client 需要几毫秒,并且两个客户端同时尝试连接到你的 web 服务器。你会遇到一个严重的问题!

  • 客户端 #1 连接到 web 服务器,并被 accept() 函数接受。它开始运行 handle_client()

  • 客户端 #2 连接到 web 服务器。然而,由于 accept() 当前没有运行,我们必须等待 handle_client() 完成客户端 #1 的运行。

  • 几毫秒后,我们回到 accept()。客户端 #2 可以连接。

现在想象一下,如果有两百万个同时客户端。在队列的末尾,你必须等待几分钟,web 服务器才能帮助你。它很快就会变得不可扩展。

显然,初期的 web 试图解决这个问题。最初的解决方案是引入线程。通过将一些寄存器的值和程序的栈保存到内存中,操作系统可以停止一个程序,用另一个程序替换它,然后再后继续运行那个程序。本质上,它允许多个例程(或“线程”,或“进程”)在同一个 CPU 上运行。

使用线程,我们可以将上述代码重写如下:

fn main() -> io::Result<()> {
    let socket = TcpListener::bind("0.0.0.0:80")?;
    loop {
        let (client, _) = socket.accept()?;
        thread::spawn(move || handle_client(client));
    }
}

现在,客户端由一个与处理新连接等待不同的线程处理。太棒了!通过允许并发线程访问,这避免了问题。

  • 客户端 #1 被服务器接受。服务器生成一个调用 handle_client 的线程。

  • 客户端 #2 尝试连接到服务器。

  • 最终,handle_client 在某处阻塞。操作系统保存处理客户端 #1 的线程,并将主线程带回来。

  • 主线程接受客户端 #2。它生成一个单独的线程来处理客户端 #2。在只有几微秒的延迟后,客户端 #1 和客户端 #2 并行运行。

线程在考虑到生产级 web 服务器拥有几十个 CPU 核心时特别好用。不仅仅是操作系统可以给人一种所有这些线程同时运行的错觉;实际上,操作系统可以让它们真正同时运行。

最终,出于我稍后将详细说明的原因,程序员希望将这种并发性从操作系统空间带到用户空间。用户空间并发性有许多不同的模型。有事件驱动编程、actor 和协程。Rust 选择的是 async/await

简单来说,你将程序编译成一个状态机的集合,这些状态机可以独立于彼此运行。Rust 本身提供了一种创建状态机的机制;async 和 await 的机制。使用 smol 编写的上述程序将如下所示:

#[apply(smol_macros::main!)]
async fn main(ex: &smol::Executor) -> io::Result<()> {
    let socket = TcpListener::bind("0.0.0.0:80").await?;
    loop {
        let (client, _) = socket.accept().await?;
        ex.spawn(async move {
            handle_client(client).await;
        }).detach();
    }
}

主函数前面有 async 关键字。这意味着它不是一个传统函数,而是一个返回状态机的函数。大致上,函数的内容对应于该状态机。

await 包括另一个状态机作为当前运行状态机的一部分。对于 accept(),这意味着状态机将把它作为一个步骤包含在内。

最终,一个内部函数将会产生结果,或者放弃控制。例如,当 accept() 等待新连接时。在这一点上,整个状态机将把执行权交给更高级别的执行器。对我们来说,那是 smol::Executor

一旦执行被产生,执行器将用另一个正在并发运行的状态机替换当前状态机,该状态机是通过 spawn 函数生成的。

我们将一个异步块传递给 spawn 函数。这个块代表一个完全新的状态机,独立于由 main 函数创建的状态机。这个状态机所做的一切都是运行 handle_client 函数。

一旦 main 产生结果,就选择一个客户端来代替它运行。一旦那个客户端产生结果,循环就会重复。

你现在可以处理数百万的并发客户端。

当然,像这样的用户空间并发性引入了复杂性的提升。当你使用线程时,你不必处理执行器、任务和状态机等。

如果你是一个理智的人,你可能会问:“我们为什么需要做所有这些事情?线程工作得很好;对于 99% 的程序,我们不需要涉及任何用户空间并发性。引入新复杂性是技术债务,技术债务会花费我们的时间和金钱。”

“那么,我们为什么不使用线程呢?”

超时问题

也许 Rust 最大的优势之一是可组合性。它提供了一组可以嵌套、构建、组合和扩展的抽象。

我记得让我坚持使用 Rust 的是 Iterator trait。它可以让我将某个东西变成 Iterator,应用一些不同的组合器,然后将结果 Iterator 传递给任何接受 Iterator 的函数,这让我大开眼界。

它继续给我留下深刻印象。假设你想从另一个线程接收一列表整数,只取那些立即可用的整数,丢弃任何不是偶数的整数,给它们全部加一,然后将它们推到一个新列表上。

在某些其他语言中,这将是五十行代码和一个辅助函数。在 Rust 中,可以用五行完成:

let (send, recv) = mpsc::channel();
my_list.extend(
    recv.try_iter()
        .filter(|x| x & 1 == 0)
        .map(|x| x + 1)
);

async/await 最好的事情是,它允许你将这种可组合性应用于 I/O 限制函数。假设你有一个新的客户端要求;你想在上面的函数中添加一个超时。假设我们的 handle_client 函数看起来像这样:

async fn handle_client(client: TcpStream) -> io::Result<()> {
    let mut data = vec![];
    client.read_to_end(&mut data).await?;

    let response = do_something_with_data(data).await?;
    client.write_all(&response).await?;
    Ok(())
}

如果我们想添加一个三秒钟的超时,我们可以组合两个组合器来做到这一点:

race 函数同时运行两个 future

Timer future 等待一段时间后返回。

最终的代码看起来像这样:

async fn handle_client(client: TcpStream) -> io::Result<()> {
    // 处理实际连接的 future
    let driver = async move {
        let mut data = vec![];
        client.read_to_end(&mut data).await?;

        let response = do_something_with_data(data).await?;
        client.write_all(&response).await?;
        Ok(())
    };
    // 处理等待超时的 future
    let timeout = async {
        Timer::after(Duration::from_secs(3)).await;
        // 我们刚刚超时了!返回一个错误。
        Err(io::ErrorKind::TimedOut.into())
    };
    // 并行运行两者
    driver.race(timeout).await
}

我发现这是一个非常简单的过程。你所要做的就是将你的现有代码包装在一个异步块中,然后将其与另一个 future 竞速。

这种方法的额外好处是,它适用于任何类型的流。在这里,我们使用 TcpStream。然而,我们可以很容易地将其替换为任何实现 impl AsyncRead + AsyncWrite 的东西。async 可以轻松地适应你需要的任何模式。

用线程实现

如果我们想在我们的线程示例中实现这一点呢?

fn handle_client(client: TcpStream) -> io::Result<()> {
    let mut data = vec![];
    client.read_to_end(&mut data)?;
    let response = do_something_with_data(data)?;
    client.write_all(&response)?;
    Ok(())
}

这并不容易。通常,你不能在阻塞代码中中断 read 或 write 系统调用,除非做一些灾难性的事情,比如关闭文件描述符(在 Rust 中无法做到)。

幸运的是,TcpStream 有两个函数 set_read_timeout 和 set_write_timeout,可以用来分别设置读写超时。然而,我们不能天真地使用它。想象一个客户端每 2.9 秒发送一个字节,只是为了重置超时。

所以我们需要在这里稍微防御性地编程。由于 Rust 组合器的强大,我们可以编写自己的类型,包装 TcpStream 来编程超时。

// `TcpStream` 的截止日期感知包装器
struct DeadlineStream {
    tcp: TcpStream,
    deadline: Instant,
}

impl DeadlineStream {
    // 创建一个新的 `DeadlineStream`,经过一段时间后过期
    fn new(tcp: TcpStream, timeout: Duration) -> Self {
        Self {
            tcp,
            deadline: Instant::now() + timeout,
        }
    }
}

impl io::Read for DeadlineStream {
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        // 设置截止日期
        let time_left = self.deadline.saturating_duration_since(Instant::now());
        self.tcp.set_read_timeout(Some(time_left))?;
        // 从流中读取
        self.tcp.read(buf)
    }
}

impl io::Write for DeadlineStream {
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
        // 设置截止日期
        let time_left = self.deadline.saturating_duration_since(Instant::now());
        self.tcp.set_write_timeout(Some(time_left))?;
        // 从流中读取
        self.tcp.write(buf)
    }
}

// 创建包装器
let client = DeadlineStream::new(client, Duration::from_secs(3));
let mut data = vec![];
client.read_to_end(&mut data)?;
let response = do_something_with_data(data)?;
client.write_all(&response)?;
Ok(())

一方面,可以认为这是优雅的。我们使用 Rust 的能力用一个相对简单的组合器解决了问题。我相信它会运行得很好。

另一方面,这绝对是 hacky。

我们锁定了自己使用 TcpStream。Rust 中没有特质来抽象使用 set_read_timeout 和 set_write_timeout 类型。所以如果要使用任何类型的写入器,需要额外的工作。

这涉及到设置超时的额外系统调用。

我认为这种类型对于 web 服务器要求的实际逻辑来说,使用起来要笨重得多。

异步成功案例

这就是为什么 HTTP 生态系统采用 async/await 作为其主要运行机制的原因,即使是客户端也是如此。你可以取任何进行 HTTP 调用的函数,并使其适应你想要的任何用例。

tower 可能是我能想到的这种现象最好的例子,这也是让我意识到 async/await 可以有多强大的东西。如果你将你的服务实现为一个异步函数,你会得到超时、速率限制、负载均衡、对冲和背压处理。所有这些都是无负担实现的。

不管你使用的是什么运行时,或者你的服务实际上在做什么。你可以将它扔给 tower,使其更加健壮。

macroquad 是一个小型 Rust 游戏引擎,旨在使游戏开发尽可能简单。它的主函数使用 async/await 来运行其引擎。这是因为 async/await 确实是在 Rust 中表达需要停下来等待其他事情的线性函数的最佳方式。

在实践中,这可能非常强大。想象一下,同时轮询你的游戏服务器和你的 GUI 框架的网络连接,在同一线程上。可能性是无限的。

提升异步的形象

我认为问题不在于有人认为线程比异步更好。我认为问题是异步的好处没有被广泛传播。这导致一些人对异步的好处有误解。

如果这是一个教育问题,我认为值得看一下教育材料。这是 Rust Async Book 在比较 async/await 和操作系统线程时所说的:

操作系统线程不需要对编程模型做任何改变,这使得并发表达非常容易。然而,线程间的同步可能会很困难,性能开销也很大。线程池可以缓解这些成本,但不足以支持大规模的 I/O 密集型工作负载。

—— Rust Async Book

我认为这是整个异步社区的一个一贯问题。当有人问“为什么我们想用这个而不是操作系统线程”时,人们倾向于挥挥手说“异步开销更小。除此之外,其他都一样。”

这就是 web 服务器作者转向 async/await 的原因。这就是他们如何解决 C10k 问题的。但这不会是其他人转向 async/await 的原因。

c10k 问题:https://en.wikipedia.org/wiki/C10k_problem

性能提升是不稳定的,可能会在错误的情况下消失。有很多情况下,线程工作流程可以比等效的异步工作流程更快(主要是在 CPU 密集型任务的情况下)。可能以前我们过分强调了异步 Rust 的短暂性能优势,但低估了它的语义优势。

在最坏的情况下,这会导致人们对 async/await 置之不理,认为它是“你为小众用例而求助的奇怪事物”。它应该被视为一个强大的编程模型,让你能够简洁地表达在同步 Rust 中无法表达的模式,而不需要数十个线程和通道。

有一种趋势是试图使异步 Rust “就像同步 Rust 一样”,这种方式鼓励了负面比较。当我说到“趋势”时,我的意思是这是 Rust 项目的明确路线图,即“编写异步 Rust 代码应该像编写同步代码一样容易,除了偶尔的 async 和 await 关键字。”

我拒绝这种框架,因为它根本不可能。这就像试图在一个滑雪坡上举办披萨派对。我们不应该试图将我们的模型强行塞入不友好的惯用法,以迎合拒绝采用另一种模式的程序员。我们应该努力突出 Rust 的 async/await 生态系统的优势;它的可组合性和它的能力。我们应该努力使 async/await 成为程序员达到并发性时的默认选择。我们不应该试图使同步 Rust 和异步 Rust 相同,我们应该接受差异。


该文章在 2024/4/28 21:30:25 编辑过
关键字查询
相关文章
正在查询...
点晴ERP是一款针对中小制造业的专业生产管理软件系统,系统成熟度和易用性得到了国内大量中小企业的青睐。
点晴PMS码头管理系统主要针对港口码头集装箱与散货日常运作、调度、堆场、车队、财务费用、相关报表等业务管理,结合码头的业务特点,围绕调度、堆场作业而开发的。集技术的先进性、管理的有效性于一体,是物流码头及其他港口类企业的高效ERP管理信息系统。
点晴WMS仓储管理系统提供了货物产品管理,销售管理,采购管理,仓储管理,仓库管理,保质期管理,货位管理,库位管理,生产管理,WMS管理系统,标签打印,条形码,二维码管理,批号管理软件。
点晴免费OA是一款软件和通用服务都免费,不限功能、不限时间、不限用户的免费OA协同办公管理系统。
Copyright 2010-2025 ClickSun All Rights Reserved