Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Rust Mutex is not working when using a callback function from multiple C threads created by `fork`

I am using the C library Cuba which uses a callback function that is called from multiple threads created in C. The Cuba parallelization is based on the fork/wait POSIX functions instead of pthreads (arxiv.org/abs/1408.6373). It gives the current thread in the core parameter.

I am trying to log results from this callback function to a screen and a file. If I use println! I get expected output, but if I use slog the output is mangled when I use a Mutex drain. If I use the async drain I get no output at all.

Is the Mutex not locking since it can't see the function is actually called from another thread? I tried to recreate the problem with Rust threads, but couldn't. Preferably I'd like to get the async drain to work.

Below is an example program that gives the problematic behaviour. The callback gets the last parameter of the vegas function as one of its arguments. This is a vector of clones of loggers. This way, each core should have its own copy of the logger:

#[macro_use]
extern crate slog;
extern crate cuba;
extern crate slog_term;

use slog::Drain;

// this function is called from different c threads
// `core` indicates which thread
fn integrand(
    _x: &[f64],
    _f: &mut [f64],
    loggers: &mut Vec<slog::Logger>,
    _nvec: usize,
    core: i32,
) -> Result<(), &'static str> {
    info!(loggers[core as usize], "A\nB\nC");

    Ok(())
}

fn main() {
    let decorator = slog_term::TermDecorator::new().build();
    let drain = slog_term::CompactFormat::new(decorator).build();
    let drain = std::sync::Mutex::new(drain).fuse();

    let log = slog::Logger::root(drain, o!());

    let mut integrator = cuba::CubaIntegrator::new(integrand);
    integrator.set_cores(10, 1000); // set 10 cores

    integrator.vegas(
        1,
        1,
        cuba::CubaVerbosity::Progress,
        0,
        vec![log.clone(); 11],
    );
}

Output:

C 
INFO Mar 26A
B
C 10:27
:42.195 MarINFO 26  10:27A
B
C:42.195
 MarINFO 26  10:27A
B
C:42.195
 INFO A
B
C
Mar 26 10:27:42.196 INFO A
B
C
Mar 26 10:27:42.196 INFO A
B
C
like image 260
Ben Ruijl Avatar asked Jan 27 '23 13:01

Ben Ruijl


1 Answers

The Cuba C library has this to say:

Windows users: Cuba 3 and up uses fork(2) to parallelize the execution threads. This POSIX function is not part of the Windows API, however, and is furthermore used in an essential way such that it cannot be worked around simply with CreateProcess etc. The only feasible emulation seems to be available through Cygwin.

Here's a reproduction of the code. We fork and then the child and the parent attempt to hold the mutex while printing stuff out. A sleep is inserted to encourage the OS scheduler to try other threads:

use nix::unistd::{fork, ForkResult}; // 0.13.0
use std::{sync::Mutex, thread, time::Duration};

fn main() {
    let shared = Mutex::new(10);

    match fork() {
        Ok(ForkResult::Parent { .. }) => {
            let max = shared.lock().unwrap();
            for _ in 0..*max {
                println!("Parent");
                thread::sleep(Duration::from_millis(10));
            }
        }
        Ok(ForkResult::Child) => {
            let max = shared.lock().unwrap();
            for _ in 0..*max {
                println!("Child");
                thread::sleep(Duration::from_millis(10));
            }
        }
        Err(e) => {
            eprintln!("Error: {}", e);
        }
    }
}
$ cargo run

Parent
Child
Parent
Child
Child
Parent
Child
Parent
Child
Parent
Child
Parent
Child
Parent
Child
Parent
Child
Parent
Child
Parent

Using fork with threads is really a pain to deal with; I distinctly remember hunting down terrible problems related to this before. Two resources I found that go in depth:

  • Mutexes And fork()ing In Shared Libraries
  • Synchronization, Part 1: Mutex Locks

The latter says (emphasis mine):

Can I create mutex before fork-ing?

Yes - however the child and parent process will not share virtual memory and each one will have a mutex independent of the other.

(Advanced note: There are advanced options using shared memory that allow a child and parent to share a mutex if it's created with the correct options and uses a shared memory segment. See procs, fork(), and mutexes)


If I use the async drain I get no output at all.

See also:

  • Why doesn't a lazy_static slog::Logger print until a non-static logger is used?

I would not trust the Cuba Rust library. There are two main points:

  1. If there are threads being created, the user data generic type should have either a Sync or Send bound on it, restricting to only types that are safe to share / transfer the data between threads.

  2. The user data passed to the integrand function should not be a &mut. A fundamental Rust concept is that there can only be a single mutable reference to any piece of data at any time. Cuba trivially allows you to circumvent this.

Here's an attempted reproduction of the Cuba Rust and C libraries:

#[macro_use]
extern crate slog;

use slog::Drain;

fn integrand(loggers: &mut Vec<slog::Logger>, core: i32) {
    info!(loggers[core as usize], "A\nB\nC\n{}", core);
}

fn main() {
    let decorator = slog_term::TermDecorator::new().build();
    let drain = slog_term::CompactFormat::new(decorator).build();
    let drain = std::sync::Mutex::new(drain).fuse();

    let log = slog::Logger::root(drain, o!());

    let logs = vec![log.clone(); 11];

    cuba_repro(logs, integrand);
}

use std::{ffi::c_void, thread};

type Integrand<T> = fn(&mut T, i32);

fn cuba_repro<T>(mut user_data: T, mut integrand: Integrand<T>) {
    // From the `vegas` method
    let user_data_ptr = &mut user_data as *mut _ as *mut c_void;
    let integrand_ptr = &mut integrand as *mut _ as *mut c_void;

    unsafe { cuba_repro_ffi::<T>(user_data_ptr, integrand_ptr) }
}

unsafe fn cuba_repro_ffi<T>(user_data: *const c_void, integrand: *const c_void) {
    let user_data = FfiDoesNotCareAboutSendOrSync(user_data);
    let integrand = FfiDoesNotCareAboutSendOrSync(integrand);

    let threads: Vec<_> = (0..4).map(move |i| {
        thread::spawn(move || {
            // C doesn't care about this pedantry
            let user_data = &mut *(user_data.0 as *const T as *mut T);
            let integrand = &mut *(integrand.0 as *const Integrand<T> as *mut Integrand<T>);

            // From the `c_integrand` function
            let k: &mut T = &mut *(user_data as *mut _);
            let _ignored = integrand(k, i);
        })
    }).collect();

    for t in threads { t.join().unwrap() }
}

#[derive(Copy, Clone)]
struct FfiDoesNotCareAboutSendOrSync<T>(T);
unsafe impl<T> Send for FfiDoesNotCareAboutSendOrSync<T> {}
unsafe impl<T> Sync for FfiDoesNotCareAboutSendOrSync<T> {}

I had to make numerous changes to get the Rust compiler to ignore the huge amount of unsafety and rule-breaking that the Cuba library and related FFI is performing.

This code example does actually print out 4 log statements each in order, so this is not a complete answer. However, I'm fairly certain that the Cuba library is triggering undefined behavior, which means that any outcome is possible, including apparently working.

like image 87
Shepmaster Avatar answered Feb 02 '23 02:02

Shepmaster