Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Application on OSX cannot spawn more than 2048 threads

I have a Rust application on on OSX firing up a large amount of threads as can be seen in the code below, however, after looking at how many max threads my version of OSX is allowed to create via the sysctl kern.num_taskthreads command, I can see that it is kern.num_taskthreads: 2048 which explains why I can't spin up over 2048 threads.

How do I go about getting past this hard limit?

let threads = 300000;
let requests = 1;

for _x in 0..threads {
    println!("{}", _x);
    let request_clone = request.clone();

    let handle = thread::spawn(move || {
        for _y in 0..requests {
            request_clone.lock().unwrap().push((request::Request::new(request::Request::create_request())));
        }
    });

    child_threads.push(handle);
}
like image 418
Jacob Clark Avatar asked Feb 10 '23 23:02

Jacob Clark


2 Answers

Before starting, I'd encourage you to read about the C10K problem. When you get into this scale, there's a lot more things you need to keep in mind.

That being said, I'd suggest looking at mio...

a lightweight IO library for Rust with a focus on adding as little overhead as possible over the OS abstractions.

Specifically, mio provides an event loop, which allows you to handle a large number of connections without spawning threads. Unfortunately, I don't know of a HTTP library that currently supports mio. You could create one and be a hero to the Rust community!

like image 128
Shepmaster Avatar answered Feb 13 '23 04:02

Shepmaster


Not sure how helpful this will be, but I was trying to create a small pool of threads that will create connections and then send them over to an event loop via a channel for reading.

I'm sure this code is probably pretty bad, but here it is anyways for examples. It uses the Hyper library, like you mentioned.

extern crate hyper;

use std::io::Read;
use std::thread;
use std::thread::{JoinHandle};
use std::sync::{Arc, Mutex};
use std::sync::mpsc::channel;

use hyper::Client;
use hyper::client::Response;
use hyper::header::Connection;

const TARGET: i32 = 100;
const THREADS: i32 = 10;

struct ResponseWithString {
    index: i32,
    response: Response,
    data: Vec<u8>,
    complete: bool
}

fn main() {
    // Create a client.
    let url: &'static str = "http://www.gooogle.com/";

    let mut threads = Vec::<JoinHandle<()>>::with_capacity((TARGET * 2) as usize);
    let conn_count = Arc::new(Mutex::new(0));
    let (tx, rx) = channel::<ResponseWithString>();

    for _ in 0..THREADS {
        // Move var references into thread context
        let conn_count = conn_count.clone();
        let tx = tx.clone();

        let t = thread::spawn(move || {
            loop {
                let idx: i32;
                {
                    // Lock, increment, and release
                    let mut count = conn_count.lock().unwrap();
                    *count += 1;
                    idx = *count;
                }
                if idx > TARGET {
                    break;
                }

                let mut client = Client::new();

                // Creating an outgoing request.
                println!("Creating connection {}...", idx);
                let res = client.get(url)                       // Get URL...
                                .header(Connection::close())    // Set headers...
                                .send().unwrap();               // Fire!

                println!("Pushing response {}...", idx);
                tx.send(ResponseWithString {
                    index: idx,
                    response: res,
                    data: Vec::<u8>::with_capacity(1024),
                    complete: false
                }).unwrap();
            }
        });
        threads.push(t);
    }

    let mut responses = Vec::<ResponseWithString>::with_capacity(TARGET as usize);
    let mut buf: [u8; 1024] = [0; 1024];
    let mut completed_count = 0;
    loop {
        if completed_count >= TARGET {
            break; // No more work!
        }

        match rx.try_recv() {
            Ok(r) => {
                println!("Incoming response! {}", r.index);
                responses.push(r)
            },
            _ => { }
        }

        for r in &mut responses {
            if r.complete {
                continue;
            }

            // Read the Response.
            let res = &mut r.response;
            let data = &mut r.data;
            let idx = &r.index;

            match res.read(&mut buf) {
                Ok(i) => {
                        if i == 0 {
                            println!("No more data! {}", idx);
                            r.complete = true;
                            completed_count += 1;
                        }
                        else {
                            println!("Got data! {} => {}", idx, i);
                            for x in 0..i {
                                data.push(buf[x]);
                            }
                        }
                    }
                Err(e) => {
                    panic!("Oh no! {} {}", idx, e);
                }
            }
        }
    }
}
like image 40
jocull Avatar answered Feb 13 '23 02:02

jocull