Skip to content

Commit

Permalink
Merge pull request #35 from vertexclique/add-backend-selector
Browse files Browse the repository at this point in the history
Add backend selector
  • Loading branch information
vertexclique authored Jan 24, 2024
2 parents d19539b + f8df385 commit aec53b0
Show file tree
Hide file tree
Showing 11 changed files with 190 additions and 59 deletions.
90 changes: 48 additions & 42 deletions examples/h1-server-multishot.rs
Original file line number Diff line number Diff line change
@@ -1,55 +1,56 @@
use nuclei::*;
use std::net::TcpListener;
#[cfg(target_os = "linux")]
fn main() -> anyhow::Result<()> {
use nuclei::*;
use std::net::TcpListener;

use futures::stream::StreamExt;
use anyhow::Result;
use async_dup::Arc;
use futures::stream::StreamExt;
use anyhow::Result;
use async_dup::Arc;

use futures::prelude::*;
use http_types::{Request, Response, StatusCode};
use futures::prelude::*;
use http_types::{Request, Response, StatusCode};

/////////////////////////////////////////////////////////////////////////
////// NOTE: This example can only be run by IO_URING backend.
////// If you try to use epoll, it will not compile.
////// Reason is: Multishot based IO is only part of io_uring backend.
/////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////
////// NOTE: This example can only be run by IO_URING backend.
////// If you try to use epoll, it will not compile.
////// Reason is: Multishot based IO is only part of io_uring backend.
/////////////////////////////////////////////////////////////////////////


static DATA: &'static str = include_str!("data/quark-gluon-plasma");
static DATA: &'static str = include_str!("data/quark-gluon-plasma");

/// Serves a request and returns a response.
async fn serve(req: Request) -> http_types::Result<Response> {
// println!("Serving {}", req.url());
/// Serves a request and returns a response.
async fn serve(req: Request) -> http_types::Result<Response> {
// println!("Serving {}", req.url());

let mut res = Response::new(StatusCode::Ok);
res.insert_header("Content-Type", "text/plain");
res.set_body(DATA);
Ok(res)
}

/// Listens for incoming connections and serves them.
async fn listen(listener: Handle<TcpListener>) -> Result<()> {
// Format the full host address.
let host = format!("http://{}", listener.get_ref().local_addr()?);
println!("Listening on {}", host);

let mut streams = listener.accept_multi().await?;

while let Some(stream) = streams.next().await {
// Spawn a background task serving this connection.
let stream = Arc::new(stream);
spawn(async move {
if let Err(err) = async_h1::accept(stream, serve).await {
println!("Connection error: {:#?}", err);
}
})
.detach();
let mut res = Response::new(StatusCode::Ok);
res.insert_header("Content-Type", "text/plain");
res.set_body(DATA);
Ok(res)
}

Ok(())
}
/// Listens for incoming connections and serves them.
async fn listen(listener: Handle<TcpListener>) -> Result<()> {
// Format the full host address.
let host = format!("http://{}", listener.get_ref().local_addr()?);
println!("Listening on {}", host);

let mut streams = listener.accept_multi().await?;

while let Some(stream) = streams.next().await {
// Spawn a background task serving this connection.
let stream = Arc::new(stream);
spawn(async move {
if let Err(err) = async_h1::accept(stream, serve).await {
println!("Connection error: {:#?}", err);
}
})
.detach();
}

Ok(())
}

fn main() -> Result<()> {
spawn_blocking(|| drive(future::pending::<()>())).detach();

block_on(async {
Expand All @@ -59,3 +60,8 @@ fn main() -> Result<()> {
Ok(())
})
}

#[cfg(target_os = "macos")]
fn main() {
panic!("This example can only be run by IO_URING backend.");
}
72 changes: 72 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@

///
/// Nuclei's proactor configuration.
#[derive(Clone, Debug, Default)]
pub struct NucleiConfig {
/// **IO_URING Configuration** allows you to configure [io_uring](https://unixism.net/loti/what_is_io_uring.html) backend.
pub iouring: IoUringConfiguration
}

/// **IO_URING Configuration**
/// 5.19+ Kernel is required to operate IO_URING.
#[derive(Clone, Debug)]
pub struct IoUringConfiguration {
/// Allowed entries in both submission and completion queues.
///
/// **[default]**: By default this value is 2048.
pub queue_len: u32,
/// SQPOLL kernel thread wake up interval.
///
/// Before version 5.11 of the Linux kernel, to successfully use this feature, the application
/// must register a set of files to be used for IO through io_uring_register(2) using the
/// `IORING_REGISTER_FILES` opcode. Failure to do so will result in submitted IO being errored
/// with EBADF. The presence of this feature can be detected by the `IORING_FEAT_SQPOLL_NONFIXED`
/// feature flag. In version 5.11 and later, it is no longer necessary to register files to use
/// this feature. 5.11 also allows using this as non-root, if the user has the CAP_SYS_NICE
/// capability. In 5.13 this requirement was also relaxed, and no special privileges are needed
/// for SQPOLL in newer kernels. Certain stable kernels older than 5.13 may also support
/// unprivileged SQPOLL.
///
/// Decreasing this will put more pressure to kernel, increases cpu usage.
/// Increasing it will slow down completion push rate from kernel.
/// This config is in milliseconds.
///
/// **[default]**: If [None] then SQPOLL will be disabled.
pub sqpoll_wake_interval: Option<u32>,

/// Get and/or set the limit for number of io_uring worker threads per NUMA
/// node. `per_numa_bounded_worker_count` holds the limit for bounded workers,
/// which process I/O operations expected to be bound in time,
/// that is I/O on regular files or block devices. Passing `0` does not change
/// the current limit.
///
/// **[default]**: If [None] then Nuclei default value of will be used `256`.
///
/// If [None] passed, by default, the amount of bounded IO workers is limited to how
/// many SQ entries the ring was setup with, or 4 times the number of
/// online CPUs in the system, whichever is smaller.
pub per_numa_bounded_worker_count: Option<u32>,

/// This `per_numa_unbounded_worker_count` holds the limit for unbounded workers,
/// which carry out I/O operations that can never complete, for instance I/O
/// on sockets. Passing `0` does not change the current limit.
///
/// **[default]**: If [None] then Nuclei default value of will be used `512`.
///
/// If [None] passed unbounded workers will be limited by the process task limit,
/// as indicated by the rlimit [RLIMIT_NPROC](https://man7.org/linux/man-pages/man2/getrlimit.2.html) limit.
pub per_numa_unbounded_worker_count: Option<u32>,

// XXX: `redrive_kthread_wake` = bool, syncs queue changes so kernel threads got awakened. increased cpu usage.
}

impl Default for IoUringConfiguration {
fn default() -> Self {
Self {
queue_len: 1 << 11,
sqpoll_wake_interval: Some(2),
per_numa_bounded_worker_count: Some(1 << 8),
per_numa_unbounded_worker_count: Some(1 << 9),
}
}
}
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ mod submission_handler;
mod sys;
mod utils;
mod waker;
/// Nuclei's configuration options reside here.
pub mod config;

#[cfg(not(any(
target_os = "linux", // epoll, iouring
Expand Down
39 changes: 33 additions & 6 deletions src/proactor.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
use std::task::{Context, Poll};
use std::time::Duration;
use std::{future::Future, io};
use std::ops::DerefMut;

Check warning on line 4 in src/proactor.rs

View workflow job for this annotation

GitHub Actions / stable - x86_64-unknown-linux-gnu

unused import: `std::ops::DerefMut`

Check warning on line 4 in src/proactor.rs

View workflow job for this annotation

GitHub Actions / stable - x86_64-unknown-linux-gnu

unused import: `std::ops::DerefMut`

Check warning on line 4 in src/proactor.rs

View workflow job for this annotation

GitHub Actions / nightly - x86_64-unknown-linux-gnu

unused import: `std::ops::DerefMut`

Check warning on line 4 in src/proactor.rs

View workflow job for this annotation

GitHub Actions / nightly - x86_64-unknown-linux-gnu

unused import: `std::ops::DerefMut`

Check warning on line 4 in src/proactor.rs

View workflow job for this annotation

GitHub Actions / stable - x86_64-apple-darwin

unused import: `std::ops::DerefMut`

Check warning on line 4 in src/proactor.rs

View workflow job for this annotation

GitHub Actions / stable - x86_64-apple-darwin

unused import: `std::ops::DerefMut`

use once_cell::sync::Lazy;
use once_cell::sync::{Lazy, OnceCell};

Check warning on line 6 in src/proactor.rs

View workflow job for this annotation

GitHub Actions / stable - x86_64-unknown-linux-gnu

unused import: `Lazy`

Check warning on line 6 in src/proactor.rs

View workflow job for this annotation

GitHub Actions / stable - x86_64-unknown-linux-gnu

unused import: `Lazy`

Check warning on line 6 in src/proactor.rs

View workflow job for this annotation

GitHub Actions / nightly - x86_64-unknown-linux-gnu

unused import: `Lazy`

Check warning on line 6 in src/proactor.rs

View workflow job for this annotation

GitHub Actions / nightly - x86_64-unknown-linux-gnu

unused import: `Lazy`

Check warning on line 6 in src/proactor.rs

View workflow job for this annotation

GitHub Actions / stable - x86_64-apple-darwin

unused import: `Lazy`

Check warning on line 6 in src/proactor.rs

View workflow job for this annotation

GitHub Actions / stable - x86_64-apple-darwin

unused import: `Lazy`
use crate::config::NucleiConfig;

use super::syscore::*;
use super::waker::*;
use crate::spawn_blocking;
use crate::sys::IoBackend;

pub use super::handle::*;

Expand All @@ -16,13 +19,26 @@ pub struct Proactor(SysProactor);
unsafe impl Send for Proactor {}
unsafe impl Sync for Proactor {}

static mut PROACTOR: OnceCell<Proactor> = OnceCell::new();

impl Proactor {
/// Returns a reference to the proactor.
pub fn get() -> &'static Proactor {
static PROACTOR: Lazy<Proactor> =
Lazy::new(|| Proactor(SysProactor::new().expect("cannot initialize poll backend")));
unsafe {
&PROACTOR.get_or_init(|| {
Proactor(SysProactor::new(NucleiConfig::default()).expect("cannot initialize IO backend"))
})
}
}

&PROACTOR
/// Builds a proactor instance with given config and returns a reference to it.
pub fn with_config(config: NucleiConfig) -> &'static Proactor {
unsafe {
let proactor = Proactor(SysProactor::new(config.clone()).expect("cannot initialize IO backend"));
PROACTOR.set(proactor);

Check warning on line 38 in src/proactor.rs

View workflow job for this annotation

GitHub Actions / stable - x86_64-apple-darwin

unused `Result` that must be used

Check warning on line 38 in src/proactor.rs

View workflow job for this annotation

GitHub Actions / stable - x86_64-apple-darwin

unused `Result` that must be used
let proactor = Proactor(SysProactor::new(config).expect("cannot initialize IO backend"));
&PROACTOR.get_or_init(|| proactor)
}
}

/// Wakes the thread waiting on proactor.
Expand All @@ -35,14 +51,25 @@ impl Proactor {
self.0.wait(max_event_size, duration)
}

/// Get the IO backend that is used with Nuclei's proactor.
pub fn backend() -> IoBackend {
BACKEND
}

/// Get underlying proactor instance.
pub(crate) fn inner(&self) -> &SysProactor {
&self.0
}

#[cfg(all(feature = "iouring", target_os = "linux"))]
/// Get IO_URING backend probes
pub fn ring_params(&self) -> &rustix_uring::Parameters {
unsafe { IO_URING.as_ref().unwrap().params() }
}
}

///
/// IO driver that drives event systems
/// IO driver that drives underlying event systems
pub fn drive<T>(future: impl Future<Output = T>) -> T {
let p = Proactor::get();
let waker = waker_fn(move || {
Expand All @@ -69,4 +96,4 @@ pub fn drive<T>(future: impl Future<Output = T>) -> T {
// let _duration = Duration::from_millis(1);
let _ = driver.as_mut().poll(cx);
}
}
}
11 changes: 11 additions & 0 deletions src/sys.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
///
/// Backends that are possible to use with Nuclei
pub enum IoBackend {
/// BSD-like backend
Kqueue,
/// Linux backend
Epoll,
/// Linux backend
IoUring
}

#[cfg(unix)]
fn check_err(res: libc::c_int) -> Result<libc::c_int, std::io::Error> {
if res == -1 {
Expand Down
3 changes: 2 additions & 1 deletion src/syscore/bsd/kqueue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ macro_rules! syscall {
use socket2::SockAddr;
use std::mem;
use std::os::unix::net::SocketAddr as UnixSocketAddr;
use crate::config::NucleiConfig;

fn max_len() -> usize {
// The maximum read limit on most posix-like systems is `SSIZE_MAX`,
Expand Down Expand Up @@ -151,7 +152,7 @@ pub struct SysProactor {
}

impl SysProactor {
pub(crate) fn new() -> io::Result<SysProactor> {
pub(crate) fn new(config: NucleiConfig) -> io::Result<SysProactor> {
let kqueue_fd = kqueue()?;
syscall!(fcntl(kqueue_fd, libc::F_SETFD, libc::FD_CLOEXEC))?;
let (read_stream, write_stream) = UnixStream::pair()?;
Expand Down
2 changes: 2 additions & 0 deletions src/syscore/bsd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ pub(crate) use fs::*;
pub(crate) use kqueue::*;

pub(crate) use processor::*;

pub const BACKEND: crate::sys::IoBackend = crate::sys::IoBackend::Kqueue;
3 changes: 2 additions & 1 deletion src/syscore/linux/epoll/epoll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ macro_rules! syscall {
use socket2::SockAddr;
use std::mem;
use std::os::unix::net::SocketAddr as UnixSocketAddr;
use crate::config::NucleiConfig;

fn max_len() -> usize {
// The maximum read limit on most posix-like systems is `SSIZE_MAX`,
Expand Down Expand Up @@ -148,7 +149,7 @@ pub struct SysProactor {
}

impl SysProactor {
pub(crate) fn new() -> io::Result<SysProactor> {
pub(crate) fn new(config: NucleiConfig) -> io::Result<SysProactor> {
let epoll_fd: i32 = epoll_create1()?;
let event_fd: i32 = syscall!(eventfd(0, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK))?;
let event_fd_raw = event_fd;
Expand Down
2 changes: 2 additions & 0 deletions src/syscore/linux/epoll/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ pub(crate) use epoll::*;
pub(crate) use fs::*;

pub(crate) use processor::*;

pub const BACKEND: crate::sys::IoBackend = crate::sys::IoBackend::Epoll;
23 changes: 14 additions & 9 deletions src/syscore/linux/iouring/iouring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use crossbeam_channel::{Receiver, Sender, unbounded};
use rustix::io_uring::IoringOp;
use rustix_uring::{CompletionQueue, IoUring, SubmissionQueue, Submitter, squeue::Entry as SQEntry, cqueue::Entry as CQEntry};
use rustix_uring::cqueue::{more, sock_nonempty};
use crate::config::NucleiConfig;

fn max_len() -> usize {
// The maximum read limit on most posix-like systems is `SSIZE_MAX`,
Expand Down Expand Up @@ -136,8 +137,6 @@ pub(crate) fn shim_to_af_unix(sockaddr: &SockAddr) -> io::Result<UnixSocketAddr>
//// uring impl
///////////////////

const QUEUE_LEN: u32 = 1 << 11;

pub struct SysProactor {
sq: TTas<SubmissionQueue<'static>>,
cq: TTas<CompletionQueue<'static>>,
Expand All @@ -152,26 +151,32 @@ pub type RingTypes = (
Submitter<'static>,
);

static mut IO_URING: Option<IoUring> = None;
pub(crate) static mut IO_URING: Option<IoUring> = None;

impl SysProactor {
pub(crate) fn new() -> io::Result<SysProactor> {
pub(crate) fn new(config: NucleiConfig) -> io::Result<SysProactor> {
unsafe {
let ring = IoUring::builder()
.setup_sqpoll(2)
.build(QUEUE_LEN)
let mut rb = IoUring::builder();
config.iouring.sqpoll_wake_interval.map(|e| rb.setup_sqpoll(e));
let mut ring = rb.build(config.iouring.queue_len)
.expect("nuclei: uring can't be initialized");

IO_URING = Some(ring);

let (submitter, sq, cq) = IO_URING.as_mut().unwrap().split();
submitter.register_iowq_max_workers(&mut [QUEUE_LEN*8, 16])?;

match (config.iouring.per_numa_bounded_worker_count, config.iouring.per_numa_unbounded_worker_count) {
(Some(bw), Some(ubw)) => submitter.register_iowq_max_workers(&mut [bw, ubw])?,
(None, Some(ubw)) => submitter.register_iowq_max_workers(&mut [0, ubw])?,
(Some(bw), None) => submitter.register_iowq_max_workers(&mut [bw, 0])?,
(None, None) => submitter.register_iowq_max_workers(&mut [0, 0])?,
}

Ok(SysProactor {
sq: TTas::new(sq),
cq: TTas::new(cq),
sbmt: TTas::new(submitter),
submitters: TTas::new(HashMap::with_capacity(QUEUE_LEN as usize)),
submitters: TTas::new(HashMap::with_capacity(config.iouring.queue_len as usize)),
submitter_id: AtomicU64::default(),
})
}
Expand Down
2 changes: 2 additions & 0 deletions src/syscore/linux/iouring/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@ pub(crate) use fs::*;
pub(crate) use iouring::*;
pub(crate) use nethandle::*;
pub(crate) use processor::*;

pub const BACKEND: crate::sys::IoBackend = crate::sys::IoBackend::IoUring;

0 comments on commit aec53b0

Please sign in to comment.