CONCURRENCY

 CONCURRENCY

  • In most operating system an executed program is run in a process and you can inpendepent parts that runs simultaneously, the feature that runs these simultaneously parts are called threads.
  • Using threads will increase the performance but will lead to increasing complexity leading to following problems:
    • Race condition, where threads are data accessing data or resources in an inconsistent manner.
    • Deadlock, where two threads are waiting for each other thus preventing both threads from continuing.
    • Bugs that happen only in certain condition and are hard to replicate and fix.
  • However in rust by leveraging ownership and type checking many concurrency errors are compile time errors rather than run time errors. This is called fearless concurrency.
  • You can create a simple simple program with thread like below:
  • use std::thread;
    use std::time::Duration;
    
    fn main() {
        thread::spawn(|| {
            for i in 1..10 {
                println!("hi number {} from the spawned thread!", i);
                thread::sleep(Duration::from_millis(1));
            }
        });
    
        for i in 1..5 {
            println!("hi number {} from the main thread!", i);
            thread::sleep(Duration::from_millis(1));
        }
    }
    
  • In the above example the spawned thread will shutdown on completion of the main thread, regardless of whether or not the spawned is completed.
  • In order to fix this issue we can use join handle which is shown in the below example: 
  • use std::thread;
    use std::time::Duration;
    
    fn main() {
        let handle = thread::spawn(|| {
            for i in 1..10 {
                println!("hi number {} from the spawned thread!", i);
                thread::sleep(Duration::from_millis(1));
            }
        });
    
        for i in 1..5 {
            println!("hi number {} from the main thread!", i);
            thread::sleep(Duration::from_millis(1));
        }
    
        handle.join().unwrap();
    }
    
  • the location of handle.join() affect the way threads run if it used before main thread, it runs the main thread only after the spawned thread.
  • use std::thread;
    
    fn main() {
        let v = vec![1, 2, 3];
    
        let handle = thread::spawn(move || {
            println!("Here's a vector: {:?}", v);
        });
    
        handle.join().unwrap();
    }
    
  • In the above example the move keyword allows us to have access to the vector rather than using a reference to it.

MESSAGE PASSING

  • One way of threads communicating with each other is using message passing. 
  • To implement message passing rust makes use of channels.
  • Channel is a concept which is used to send data from a thread to another.
  • A channel will have a transmitter and a reciever.
  • use std::sync::mpsc;
    use std::thread;
    
    fn main() {
        let (tx, rx) = mpsc::channel();
    
        thread::spawn(move || {
            let val = String::from("hi");
            tx.send(val).unwrap();
        });
    
        let received = rx.recv().unwrap();
        println!("Got: {}", received);
    }
    
    
  • In the above example we create trasmitter (tx) and reciever(rx) and use it in a thread.
  • The reciever has two methods recv and try_recv:
    • recv will block the main thread until the value is recieved.
    • try_recv method doesn't block the main thread.
  • Both methods will receive data in Result<T, E>.
  • You can't access the value in thread after transmitting it.
  • You can create multiple producer like below:
  • use std::sync::mpsc;
    use std::thread;
    use std::time::Duration;
    
    fn main() {
    
        let (tx, rx) = mpsc::channel();
    
        let tx1 = tx.clone();
        thread::spawn(move || {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("thread"),
            ];
    
            for val in vals {
                tx1.send(val).unwrap();
                thread::sleep(Duration::from_secs(1));
            }
        });
    
        thread::spawn(move || {
            let vals = vec![
                String::from("more"),
                String::from("messages"),
                String::from("for"),
                String::from("you"),
            ];
    
            for val in vals {
                tx.send(val).unwrap();
                thread::sleep(Duration::from_secs(1));
            }
        });
    
        for received in rx {
            println!("Got: {}", received);
        }
    
    }
    

SHARED STATE

  • Shared state is a way of multiple threads accessing the same shared data.
  • While channels are similar to single ownership, shared state is like multiple ownership.
  • Mutex allows only one thread to access some data at any given time.
  • To access data in mutex, a thread must fire a signal that it wants access by asking to acquire the mutex's lock.
  • It has two rules:
    • Must acquire lock before accessing the data.
    • And when you are done with the data the mutex guards, you must unlock it so that other threads can use it.
  • use std::sync::{Arc, Mutex};
    use std::thread;
    
    fn main() {
        let counter = Arc::new(Mutex::new(0));
        let mut handles = vec![];
    
        for _ in 0..10 {
            let counter = Arc::clone(&counter);
            let handle = thread::spawn(move || {
                let mut num = counter.lock().unwrap();
    
                *num += 1;
            });
            handles.push(handle);
        }
    
        for handle in handles {
            handle.join().unwrap();
        }
    
        println!("Result: {}", *counter.lock().unwrap());
    }
    
  • In the above example we use a simple example of mutex, which is wrapped in Arc( Atominc reference count) as we can't use Rc for multiple thread purpose. Arc and Rc has the same API

Comments