Rust 채널을 이용한 동시 데이터 전송

Beginner

This tutorial is from open-source community. Access the source code

소개

스레드 간 데이터 전송을 위한 메시지 전달 사용에 오신 것을 환영합니다. 이 랩은 Rust Book의 일부입니다. LabEx 에서 Rust 기술을 연습할 수 있습니다.

이 랩에서는 Rust 표준 라이브러리의 채널을 사용하여 스레드 간에 데이터를 송수신하는 안전한 동시성 접근 방식인 메시지 전달을 살펴봅니다.

스레드 간 데이터 전송을 위한 메시지 전달 사용

안전한 동시성을 보장하는 데 점점 더 널리 사용되는 접근 방식 중 하나는 메시지 전달(message passing) 입니다. 이는 스레드 또는 액터 (actor) 가 데이터를 포함하는 메시지를 서로 전송하여 통신하는 방식입니다. 다음은 *https://golang.org/doc/effective_go.html#concurrency*에서 가져온 Go 언어 문서의 슬로건입니다. "메모리 공유를 통해 통신하지 말고, 대신 통신을 통해 메모리를 공유하십시오."

메시지 전송 동시성을 달성하기 위해 Rust 표준 라이브러리는 채널(channels) 의 구현을 제공합니다. 채널은 한 스레드에서 다른 스레드로 데이터를 전송하는 일반적인 프로그래밍 개념입니다.

프로그래밍에서 채널을 시냇물이나 강과 같은 방향성 물길과 같다고 상상할 수 있습니다. 고무 오리 같은 것을 강에 넣으면 하류로 이동하여 수로의 끝에 도달합니다.

채널에는 송신기 (transmitter) 와 수신기 (receiver) 의 두 부분이 있습니다. 송신기 부분은 고무 오리를 강에 넣는 상류 위치이고, 수신기 부분은 고무 오리가 하류에서 도착하는 곳입니다. 코드의 한 부분은 전송하려는 데이터로 송신기의 메서드를 호출하고, 다른 부분은 도착하는 메시지를 수신 끝에서 확인합니다. 송신기 또는 수신기 중 하나라도 삭제되면 채널이 닫혔다(closed) 고 합니다.

여기서는 값을 생성하여 채널을 통해 전송하는 스레드 하나와 값을 수신하여 출력하는 다른 스레드를 갖는 프로그램을 만들 것입니다. 이 기능을 설명하기 위해 채널을 사용하여 스레드 간에 간단한 값을 전송할 것입니다. 이 기술에 익숙해지면 채팅 시스템이나 여러 스레드가 계산의 일부를 수행하고 결과를 집계하는 하나의 스레드로 부분을 전송하는 시스템과 같이 서로 통신해야 하는 모든 스레드에 채널을 사용할 수 있습니다.

먼저, Listing 16-6 에서 채널을 만들지만 아무 작업도 수행하지 않습니다. Rust 가 채널을 통해 어떤 유형의 값을 전송하려는지를 알 수 없으므로 아직 컴파일되지 않습니다.

파일 이름: src/main.rs

use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();
}

Listing 16-6: 채널을 생성하고 두 부분을 txrx에 할당

mpsc::channel 함수를 사용하여 새 채널을 생성합니다. mpscmultiple producer, single consumer(다중 생산자, 단일 소비자) 의 약자입니다. 간단히 말해서, Rust 표준 라이브러리가 채널을 구현하는 방식은 채널이 값을 생성하는 여러 전송 끝 (sending ends) 을 가질 수 있지만 해당 값을 소비하는 수신 끝 (receiving end) 은 하나만 가질 수 있음을 의미합니다. 여러 시냇물이 하나의 큰 강으로 합쳐지는 것을 상상해 보십시오. 모든 시냇물에서 전송된 모든 것은 결국 하나의 강으로 흘러 들어갈 것입니다. 지금은 단일 생산자 (producer) 로 시작하지만, 이 예제가 작동하면 여러 생산자를 추가할 것입니다.

mpsc::channel 함수는 튜플을 반환하며, 첫 번째 요소는 전송 끝 (송신기) 이고 두 번째 요소는 수신 끝 (수신기) 입니다. 약어 txrx는 각각 송신기수신기에 대해 많은 분야에서 전통적으로 사용되므로, 각 끝을 나타내기 위해 변수를 그렇게 명명합니다. 튜플을 분해하는 패턴이 있는 let 문을 사용하고 있습니다. 18 장에서 let 문과 분해 (destructuring) 에서 패턴의 사용에 대해 논의할 것입니다. 지금은 이 방식으로 let 문을 사용하는 것이 mpsc::channel에서 반환된 튜플의 조각을 추출하는 편리한 접근 방식이라는 것을 알아두십시오.

전송 끝을 스폰된 스레드로 옮기고 스폰된 스레드가 메인 스레드와 통신하도록 문자열 하나를 전송해 보겠습니다. Listing 16-7 에 나와 있습니다. 이것은 고무 오리를 강 상류에 넣거나 한 스레드에서 다른 스레드로 채팅 메시지를 보내는 것과 같습니다.

파일 이름: src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });
}

Listing 16-7: tx를 스폰된 스레드로 이동하고 "hi"를 전송

다시, thread::spawn을 사용하여 새 스레드를 생성한 다음 move를 사용하여 tx를 클로저로 이동하여 스폰된 스레드가 tx를 소유하도록 합니다. 스폰된 스레드는 채널을 통해 메시지를 전송할 수 있도록 송신기를 소유해야 합니다.

송신기에는 전송하려는 값을 사용하는 send 메서드가 있습니다. send 메서드는 Result<T, E> 유형을 반환하므로 수신기가 이미 삭제되어 값을 보낼 곳이 없으면 전송 작업이 오류를 반환합니다. 이 예제에서는 오류가 발생할 경우 패닉 (panic) 하기 위해 unwrap을 호출하고 있습니다. 그러나 실제 응용 프로그램에서는 이를 적절하게 처리합니다. 적절한 오류 처리를 위한 전략을 검토하려면 9 장으로 돌아가십시오.

Listing 16-8 에서 메인 스레드의 수신기에서 값을 가져오겠습니다. 이것은 강 끝에서 물에서 고무 오리를 검색하거나 채팅 메시지를 수신하는 것과 같습니다.

파일 이름: src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });

    let received = rx.recv().unwrap();
    println!("Got: {received}");
}

Listing 16-8: 메인 스레드에서 값 "hi"를 수신하고 출력

수신기에는 recvtry_recv의 두 가지 유용한 메서드가 있습니다. 수신(receive) 의 약자인 recv를 사용하고 있으며, 이는 메인 스레드의 실행을 차단하고 채널을 통해 값이 전송될 때까지 기다립니다. 값이 전송되면 recvResult<T, E>로 반환합니다. 송신기가 닫히면 recv는 더 이상 값이 오지 않을 것이라는 신호를 보내기 위해 오류를 반환합니다.

try_recv 메서드는 차단하지 않고 대신 즉시 Result<T, E>를 반환합니다. 사용 가능한 메시지가 있으면 메시지를 포함하는 Ok 값과 이번에는 메시지가 없으면 Err 값을 반환합니다. try_recv를 사용하는 것은 이 스레드가 메시지를 기다리는 동안 다른 작업이 있는 경우 유용합니다. try_recv를 가끔 호출하고, 메시지를 사용할 수 있으면 처리하고, 그렇지 않으면 잠시 다른 작업을 수행한 다음 다시 확인하는 루프를 작성할 수 있습니다.

이 예제에서는 단순성을 위해 recv를 사용했습니다. 메인 스레드가 메시지를 기다리는 것 외에 다른 작업이 없으므로 메인 스레드를 차단하는 것이 적절합니다.

Listing 16-8 의 코드를 실행하면 메인 스레드에서 값이 출력되는 것을 볼 수 있습니다.

Got: hi

완벽합니다!

채널과 소유권 이전

소유권 규칙은 안전한 동시성 코드를 작성하는 데 도움이 되므로 메시지 전송에서 중요한 역할을 합니다. 동시 프로그래밍에서 오류를 방지하는 것은 Rust 프로그램 전체에서 소유권을 고려하는 것의 장점입니다. 채널과 소유권이 함께 작동하여 문제를 방지하는 방식을 보여주는 실험을 해보겠습니다. 채널을 통해 전송한 스폰된 스레드에서 val 값을 사용하려고 시도할 것입니다. Listing 16-9 의 코드를 컴파일하여 이 코드가 허용되지 않는 이유를 확인하십시오.

파일 이름: src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
        println!("val is {val}");
    });

    let received = rx.recv().unwrap();
    println!("Got: {received}");
}

Listing 16-9: 채널을 통해 전송한 후 val을 사용하려는 시도

여기서는 tx.send를 통해 채널을 통해 전송한 후 val을 출력하려고 시도합니다. 이것을 허용하는 것은 좋지 않은 생각입니다. 값이 다른 스레드로 전송되면 해당 스레드가 값을 다시 사용하기 전에 값을 수정하거나 삭제할 수 있습니다. 잠재적으로 다른 스레드의 수정으로 인해 일관성이 없거나 존재하지 않는 데이터로 인해 오류 또는 예기치 않은 결과가 발생할 수 있습니다. 그러나 Listing 16-9 의 코드를 컴파일하려고 하면 Rust 에서 오류가 발생합니다.

error[E0382]: borrow of moved value: `val`
  --> src/main.rs:10:31
   |
8  |         let val = String::from("hi");
   |             --- move occurs because `val` has type `String`, which does
not implement the `Copy` trait
9  |         tx.send(val).unwrap();
   |                 --- value moved here
10 |         println!("val is {val}");
   |                           ^^^ value borrowed here after move

우리의 동시성 실수는 컴파일 시간 오류를 발생시켰습니다. send 함수는 매개변수의 소유권을 가져가고, 값이 이동하면 수신자가 소유권을 가져갑니다. 이렇게 하면 전송 후 실수로 값을 다시 사용하는 것을 방지할 수 있습니다. 소유권 시스템은 모든 것이 괜찮은지 확인합니다.

여러 값 전송 및 수신 대기 확인

Listing 16-8 의 코드는 컴파일되고 실행되었지만 두 개의 별도 스레드가 채널을 통해 서로 통신하고 있다는 것을 명확하게 보여주지 않았습니다. Listing 16-10 에서는 Listing 16-8 의 코드가 동시적으로 실행되고 있음을 증명할 수 있도록 몇 가지 수정을 했습니다. 이제 스폰된 스레드는 여러 메시지를 전송하고 각 메시지 사이에 1 초 동안 일시 중지됩니다.

파일 이름: src/main.rs

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {received}");
    }
}

Listing 16-10: 여러 메시지를 전송하고 각 메시지 사이에 일시 중지

이번에는 스폰된 스레드에 메인 스레드로 전송하려는 문자열 벡터가 있습니다. 각 문자열을 개별적으로 전송하면서 반복하고, Duration 값이 1 초인 thread::sleep 함수를 호출하여 각 문자열 사이에 일시 중지합니다.

메인 스레드에서는 더 이상 recv 함수를 명시적으로 호출하지 않습니다. 대신 rx를 반복자 (iterator) 로 취급하고 있습니다. 수신된 각 값에 대해 출력하고 있습니다. 채널이 닫히면 반복이 종료됩니다.

Listing 16-10 의 코드를 실행하면 각 줄 사이에 1 초의 일시 중지가 있는 다음 출력을 볼 수 있습니다.

Got: hi
Got: from
Got: the
Got: thread

메인 스레드의 for 루프에 일시 중지하거나 지연하는 코드가 없으므로 메인 스레드가 스폰된 스레드에서 값을 수신하기를 기다리고 있음을 알 수 있습니다.

송신자를 복제하여 여러 생산자 생성

앞서 mpscmultiple producer, single consumer의 약자라고 언급했습니다. Listing 16-10 의 코드를 확장하여 mpsc를 사용하고, 동일한 수신자에게 값을 전송하는 여러 스레드를 생성해 보겠습니다. Listing 16-11 과 같이 송신자를 복제하여 수행할 수 있습니다.

파일 이름: src/main.rs

--snip--

let (tx, rx) = mpsc::channel();

let tx1 = tx.clone();
thread::spawn(move || {
    let vals = vec![
        String::from("hi"),
        String::from("from"),
        String::from("the"),
        String::from("thread"),
    ];

    for val in vals {
        tx1.send(val).unwrap();
        thread::sleep(Duration::from_secs(1));
    }
});

thread::spawn(move || {
    let vals = vec![
        String::from("more"),
        String::from("messages"),
        String::from("for"),
        String::from("you"),
    ];

    for val in vals {
        tx.send(val).unwrap();
        thread::sleep(Duration::from_secs(1));
    }
});

for received in rx {
    println!("Got: {received}");
}

--snip--

Listing 16-11: 여러 생산자로부터 여러 메시지 전송

이번에는 첫 번째 스폰된 스레드를 생성하기 전에 송신자에 대해 clone을 호출합니다. 이렇게 하면 첫 번째 스폰된 스레드에 전달할 수 있는 새로운 송신자가 생성됩니다. 원래 송신자는 두 번째 스폰된 스레드에 전달합니다. 이렇게 하면 각 스레드가 서로 다른 메시지를 하나의 수신자에게 전송하는 두 개의 스레드가 생성됩니다.

코드를 실행하면 다음과 같은 출력이 표시됩니다.

Got: hi
Got: more
Got: from
Got: messages
Got: for
Got: the
Got: thread
Got: you

시스템에 따라 다른 순서로 값이 표시될 수 있습니다. 이것이 동시성을 흥미롭게 만들고 어렵게 만드는 이유입니다. thread::sleep을 실험하여 서로 다른 스레드에 다양한 값을 제공하면 각 실행이 더욱 비결정적이며 매번 다른 출력을 생성합니다.

이제 채널이 어떻게 작동하는지 살펴보았으므로 다른 동시성 방법을 살펴보겠습니다.

요약

축하합니다! 스레드 간 데이터 전송을 위한 메시지 전달 사용 랩을 완료했습니다. LabEx 에서 더 많은 랩을 연습하여 실력을 향상시킬 수 있습니다.