Skip to content

Support pipewire as host#1093

Open
Decodetalkers wants to merge 34 commits intoRustAudio:masterfrom
Decodetalkers:pipewire
Open

Support pipewire as host#1093
Decodetalkers wants to merge 34 commits intoRustAudio:masterfrom
Decodetalkers:pipewire

Conversation

@Decodetalkers
Copy link

@Decodetalkers Decodetalkers commented Jan 6, 2026

Add support to pipewire

You can test it with pipewire feature open

Pipewire support use config to define rates. So the default config of cpal with pipewire can be changed through config like following.

cat ~/.config/pipewire/pipewire.conf.d/custom-rates.conf
context.properties = {
    default.clock.rate          = 48000
    default.clock.allowed-rates = [ 44100 48000 88200 96000 176400 192000 ]
}

Still problems left:

*once we do 'pipewire::init', we can only dequeue it after the whole thread. even put the function to another thread, the function still works.. But seems we can run init many times.. (Ok, seems it is not a problem, because in when we call init, the init action will only be called once. I think it will be ok)
*The crates by pipewire need edition 2024. I think that should be another pr.

@Decodetalkers Decodetalkers marked this pull request as draft January 6, 2026 12:20
@Decodetalkers Decodetalkers changed the title Pipewire support Support pipewire as host Jan 6, 2026
@Decodetalkers Decodetalkers marked this pull request as ready for review January 6, 2026 14:41
@Decodetalkers Decodetalkers force-pushed the pipewire branch 4 times, most recently from 9e2ef9f to fd59a29 Compare January 6, 2026 14:57
@Decodetalkers
Copy link
Author

Seems pipewire need edition 2024..

@Decodetalkers
Copy link
Author

@roderickvd can you help review this pr? Thanks, and when can this crate be upgraded to edition 2024? I would also want to help

@roderickvd
Copy link
Member

@roderickvd can you help review this pr?

Definitely will help you review it. Need a bit more time.

Thanks, and when can this crate be upgraded to edition 2024? I would also want to help

Actually cpal itself doesn't need to be upgraded to Rust 2024, it just needs a MSRV of Rust of 1.85 or higher to support dependencies that are Rust 2024 already.

When we can I'd like to stick cpal to Rust 2021 so we keep our MSRV down.

@SuperKenVery
Copy link
Contributor

Super cool man! Happy to see cpal having better linux support, opening the possibility of loopback recording on Linux!

@Decodetalkers Decodetalkers force-pushed the pipewire branch 3 times, most recently from 07bc999 to 6cea2ad Compare January 9, 2026 22:31
@Decodetalkers
Copy link
Author

ok, only one ci that I cannot fix

@Decodetalkers Decodetalkers force-pushed the pipewire branch 3 times, most recently from a81e104 to 5a3817c Compare January 10, 2026 16:19
@Decodetalkers
Copy link
Author

ok cross-rs based on ubuntu20.04, so

@Decodetalkers Decodetalkers force-pushed the pipewire branch 9 times, most recently from 721beb0 to 424d78f Compare January 11, 2026 09:53
@Be-ing
Copy link
Contributor

Be-ing commented Jan 15, 2026

How does this differ from #692?

Decodetalkers and others added 24 commits February 11, 2026 21:28
because cross-rs is based on ubuntu20.04, so it does not contains
pipewire
Replace boolean check with match expression to properly handle both
timeout expiration and channel receive errors, returning a more
accurate StreamConfigNotSupported error instead of DeviceNotAvailable.
…escriptions

Device IDs now use node.name (e.g. alsa_output.pci-..., bluez_output.80_99_...)
which is unique and stable across restarts, instead of nick_name which can have
duplicates. Device descriptions now include interface_type (Bluetooth, PCI, USB),
address (BT MAC or ALSA path), driver (factory.name), and improved device_type
mapping (Speaker, Headset).
@vasyl-protsiv
Copy link

I was testing this branch and noticed a problem with Stream dropping. I expected that dropping an output stream would stop playback and free all resources (threads). However, on my machine the output callback is still being called, and the spawned threads never finish.
The playback issue can be fixed by explicitly calling pause before dropping, but the existing threads continue running indefinitely.

This can be tested by this slightly modified beep example:

beep_drop.rs
//! Plays a simple 440 Hz sine wave (beep) tone.
//!
//! This example demonstrates:
//! - Selecting audio hosts (with optional JACK support on Linux)
//! - Selecting devices by ID or using the default output device
//! - Querying the default output configuration
//! - Building and running an output stream with typed samples
//! - Generating audio data in the stream callback
//!
//! Run with: `cargo run --example beep`
//! With JACK (Linux): `cargo run --example beep --features jack -- --jack`
//! With specific device: `cargo run --example beep -- --device "wasapi:device_id"`

use clap::Parser;
use cpal::{
    traits::{DeviceTrait, HostTrait, StreamTrait},
    FromSample, Sample, SizedSample, I24,
};

#[derive(Parser, Debug)]
#[command(version, about = "CPAL beep example", long_about = None)]
struct Opt {
    /// The audio device to use
    #[arg(short, long)]
    device: Option<String>,

    /// Use the JACK host
    #[cfg(all(
        any(
            target_os = "linux",
            target_os = "dragonfly",
            target_os = "freebsd",
            target_os = "netbsd"
        ),
        feature = "jack"
    ))]
    #[arg(short, long)]
    #[allow(dead_code)]
    jack: bool,
    /// Use the pipewire host
    #[cfg(all(
        any(
            target_os = "linux",
            target_os = "dragonfly",
            target_os = "freebsd",
            target_os = "netbsd"
        ),
        feature = "pipewire"
    ))]
    #[arg(short, long)]
    #[allow(dead_code)]
    pipewire: bool,
}

fn main() -> anyhow::Result<()> {
    let opt = Opt::parse();

    // Conditionally compile with jack if the feature is specified.
    #[cfg(all(
        any(
            target_os = "linux",
            target_os = "dragonfly",
            target_os = "freebsd",
            target_os = "netbsd"
        ),
        feature = "jack",
        not(feature = "pipewire")
    ))]
    // Manually check for flags. Can be passed through cargo with -- e.g.
    // cargo run --release --example beep --features jack -- --jack
    let host = if opt.jack {
        cpal::host_from_id(cpal::available_hosts()
            .into_iter()
            .find(|id| *id == cpal::HostId::Jack)
            .expect(
                "make sure --features jack is specified. only works on OSes where jack is available",
            )).expect("jack host unavailable")
    } else {
        cpal::default_host()
    };
    // Conditionally compile with jack if the feature is specified.
    #[cfg(all(
        any(
            target_os = "linux",
            target_os = "dragonfly",
            target_os = "freebsd",
            target_os = "netbsd"
        ),
        feature = "pipewire"
    ))]
    // Manually check for flags. Can be passed through cargo with -- e.g.
    // cargo run --release --example beep --features jack -- --jack
    let host = if opt.pipewire {
        cpal::host_from_id(cpal::available_hosts()
            .into_iter()
            .find(|id| *id == cpal::HostId::PipeWire)
            .expect(
                "make sure --features pipewire is specified. only works on OSes where jack is available",
            )).expect("jack host unavailable")
    } else {
        cpal::default_host()
    };

    #[cfg(any(
        not(any(
            target_os = "linux",
            target_os = "dragonfly",
            target_os = "freebsd",
            target_os = "netbsd"
        )),
        not(any(feature = "jack", feature = "pipewire"))
    ))]
    let host = cpal::default_host();

    let device = if let Some(device) = opt.device {
        let id = &device.parse().expect("failed to parse device id");
        host.device_by_id(id)
    } else {
        host.default_output_device()
    }
    .expect("failed to find output device");
    println!("Output device: {}", device.id()?);

    let config = device.default_output_config().unwrap();
    println!("Default output config: {config:?}");

    match config.sample_format() {
        cpal::SampleFormat::I8 => run::<i8>(&device, &config.into()),
        cpal::SampleFormat::I16 => run::<i16>(&device, &config.into()),
        cpal::SampleFormat::I24 => run::<I24>(&device, &config.into()),
        cpal::SampleFormat::I32 => run::<i32>(&device, &config.into()),
        // cpal::SampleFormat::I48 => run::<I48>(&device, &config.into()),
        cpal::SampleFormat::I64 => run::<i64>(&device, &config.into()),
        cpal::SampleFormat::U8 => run::<u8>(&device, &config.into()),
        cpal::SampleFormat::U16 => run::<u16>(&device, &config.into()),
        // cpal::SampleFormat::U24 => run::<U24>(&device, &config.into()),
        cpal::SampleFormat::U32 => run::<u32>(&device, &config.into()),
        // cpal::SampleFormat::U48 => run::<U48>(&device, &config.into()),
        cpal::SampleFormat::U64 => run::<u64>(&device, &config.into()),
        cpal::SampleFormat::F32 => run::<f32>(&device, &config.into()),
        cpal::SampleFormat::F64 => run::<f64>(&device, &config.into()),
        sample_format => panic!("Unsupported sample format '{sample_format}'"),
    }
}

pub fn run<T>(device: &cpal::Device, config: &cpal::StreamConfig) -> Result<(), anyhow::Error>
where
    T: SizedSample + FromSample<f32>,
{
    let sample_rate = config.sample_rate as f32;
    let channels = config.channels as usize;

    // Produce a sinusoid of maximum amplitude.
    let mut sample_clock = 0f32;
    let mut next_value = move || {
        sample_clock = (sample_clock + 1.0) % sample_rate;
        (sample_clock * 440.0 * 2.0 * std::f32::consts::PI / sample_rate).sin()
    };

    let err_fn = |err| eprintln!("an error occurred on stream: {err}");
    loop {
        let stream = device.build_output_stream(
            config,
            move |data: &mut [T], _: &cpal::OutputCallbackInfo| {
                println!("callback called");
                write_data(data, channels, &mut next_value)
            },
            err_fn,
            None,
        )?;
        stream.play()?;

        std::thread::sleep(std::time::Duration::from_millis(1000));
        // stream.pause();  // Uncommenting this stops playback, but the already‑started threads keep running
        drop(stream);
        println!("Stream dropped");
        std::thread::sleep(std::time::Duration::from_millis(2000));
    }

    Ok(())
}

fn write_data<T>(output: &mut [T], channels: usize, next_sample: &mut dyn FnMut() -> f32)
where
    T: Sample + FromSample<f32>,
{
    for frame in output.chunks_mut(channels) {
        let value: T = T::from_sample(next_sample());
        for sample in frame.iter_mut() {
            *sample = value;
        }
    }
}

Running this with

cargo r -r --example beep_drop --features pipewire -- --pipewire

you can see how the number of threads increases with

ps -T -p `pidof beep_drop`

Note: I don't experience these problems when using ALSA

@Decodetalkers
Copy link
Author

I was testing this branch and noticed a problem with Stream dropping. I expected that dropping an output stream would stop playback and free all resources (threads). However, on my machine the output callback is still being called, and the spawned threads never finish. The playback issue can be fixed by explicitly calling pause before dropping, but the existing threads continue running indefinitely.

This can be tested by this slightly modified beep example:
beep_drop.rs

//! Plays a simple 440 Hz sine wave (beep) tone.
//!
//! This example demonstrates:
//! - Selecting audio hosts (with optional JACK support on Linux)
//! - Selecting devices by ID or using the default output device
//! - Querying the default output configuration
//! - Building and running an output stream with typed samples
//! - Generating audio data in the stream callback
//!
//! Run with: `cargo run --example beep`
//! With JACK (Linux): `cargo run --example beep --features jack -- --jack`
//! With specific device: `cargo run --example beep -- --device "wasapi:device_id"`

use clap::Parser;
use cpal::{
    traits::{DeviceTrait, HostTrait, StreamTrait},
    FromSample, Sample, SizedSample, I24,
};

#[derive(Parser, Debug)]
#[command(version, about = "CPAL beep example", long_about = None)]
struct Opt {
    /// The audio device to use
    #[arg(short, long)]
    device: Option<String>,

    /// Use the JACK host
    #[cfg(all(
        any(
            target_os = "linux",
            target_os = "dragonfly",
            target_os = "freebsd",
            target_os = "netbsd"
        ),
        feature = "jack"
    ))]
    #[arg(short, long)]
    #[allow(dead_code)]
    jack: bool,
    /// Use the pipewire host
    #[cfg(all(
        any(
            target_os = "linux",
            target_os = "dragonfly",
            target_os = "freebsd",
            target_os = "netbsd"
        ),
        feature = "pipewire"
    ))]
    #[arg(short, long)]
    #[allow(dead_code)]
    pipewire: bool,
}

fn main() -> anyhow::Result<()> {
    let opt = Opt::parse();

    // Conditionally compile with jack if the feature is specified.
    #[cfg(all(
        any(
            target_os = "linux",
            target_os = "dragonfly",
            target_os = "freebsd",
            target_os = "netbsd"
        ),
        feature = "jack",
        not(feature = "pipewire")
    ))]
    // Manually check for flags. Can be passed through cargo with -- e.g.
    // cargo run --release --example beep --features jack -- --jack
    let host = if opt.jack {
        cpal::host_from_id(cpal::available_hosts()
            .into_iter()
            .find(|id| *id == cpal::HostId::Jack)
            .expect(
                "make sure --features jack is specified. only works on OSes where jack is available",
            )).expect("jack host unavailable")
    } else {
        cpal::default_host()
    };
    // Conditionally compile with jack if the feature is specified.
    #[cfg(all(
        any(
            target_os = "linux",
            target_os = "dragonfly",
            target_os = "freebsd",
            target_os = "netbsd"
        ),
        feature = "pipewire"
    ))]
    // Manually check for flags. Can be passed through cargo with -- e.g.
    // cargo run --release --example beep --features jack -- --jack
    let host = if opt.pipewire {
        cpal::host_from_id(cpal::available_hosts()
            .into_iter()
            .find(|id| *id == cpal::HostId::PipeWire)
            .expect(
                "make sure --features pipewire is specified. only works on OSes where jack is available",
            )).expect("jack host unavailable")
    } else {
        cpal::default_host()
    };

    #[cfg(any(
        not(any(
            target_os = "linux",
            target_os = "dragonfly",
            target_os = "freebsd",
            target_os = "netbsd"
        )),
        not(any(feature = "jack", feature = "pipewire"))
    ))]
    let host = cpal::default_host();

    let device = if let Some(device) = opt.device {
        let id = &device.parse().expect("failed to parse device id");
        host.device_by_id(id)
    } else {
        host.default_output_device()
    }
    .expect("failed to find output device");
    println!("Output device: {}", device.id()?);

    let config = device.default_output_config().unwrap();
    println!("Default output config: {config:?}");

    match config.sample_format() {
        cpal::SampleFormat::I8 => run::<i8>(&device, &config.into()),
        cpal::SampleFormat::I16 => run::<i16>(&device, &config.into()),
        cpal::SampleFormat::I24 => run::<I24>(&device, &config.into()),
        cpal::SampleFormat::I32 => run::<i32>(&device, &config.into()),
        // cpal::SampleFormat::I48 => run::<I48>(&device, &config.into()),
        cpal::SampleFormat::I64 => run::<i64>(&device, &config.into()),
        cpal::SampleFormat::U8 => run::<u8>(&device, &config.into()),
        cpal::SampleFormat::U16 => run::<u16>(&device, &config.into()),
        // cpal::SampleFormat::U24 => run::<U24>(&device, &config.into()),
        cpal::SampleFormat::U32 => run::<u32>(&device, &config.into()),
        // cpal::SampleFormat::U48 => run::<U48>(&device, &config.into()),
        cpal::SampleFormat::U64 => run::<u64>(&device, &config.into()),
        cpal::SampleFormat::F32 => run::<f32>(&device, &config.into()),
        cpal::SampleFormat::F64 => run::<f64>(&device, &config.into()),
        sample_format => panic!("Unsupported sample format '{sample_format}'"),
    }
}

pub fn run<T>(device: &cpal::Device, config: &cpal::StreamConfig) -> Result<(), anyhow::Error>
where
    T: SizedSample + FromSample<f32>,
{
    let sample_rate = config.sample_rate as f32;
    let channels = config.channels as usize;

    // Produce a sinusoid of maximum amplitude.
    let mut sample_clock = 0f32;
    let mut next_value = move || {
        sample_clock = (sample_clock + 1.0) % sample_rate;
        (sample_clock * 440.0 * 2.0 * std::f32::consts::PI / sample_rate).sin()
    };

    let err_fn = |err| eprintln!("an error occurred on stream: {err}");
    loop {
        let stream = device.build_output_stream(
            config,
            move |data: &mut [T], _: &cpal::OutputCallbackInfo| {
                println!("callback called");
                write_data(data, channels, &mut next_value)
            },
            err_fn,
            None,
        )?;
        stream.play()?;

        std::thread::sleep(std::time::Duration::from_millis(1000));
        // stream.pause();  // Uncommenting this stops playback, but the already‑started threads keep running
        drop(stream);
        println!("Stream dropped");
        std::thread::sleep(std::time::Duration::from_millis(2000));
    }

    Ok(())
}

fn write_data<T>(output: &mut [T], channels: usize, next_sample: &mut dyn FnMut() -> f32)
where
    T: Sample + FromSample<f32>,
{
    for frame in output.chunks_mut(channels) {
        let value: T = T::from_sample(next_sample());
        for sample in frame.iter_mut() {
            *sample = value;
        }
    }
}

Running this with

cargo r -r --example beep_drop --features pipewire -- --pipewire

you can see how the number of threads increases with

ps -T -p `pidof beep_drop`

Note: I don't experience these problems when using ALSA

fixed

@vasyl-protsiv
Copy link

vasyl-protsiv commented Feb 15, 2026

fixed

Thanks for quick response. Unfortunately, I am still having issue with hanging threads. The mainloop.run() never returns, so the thread never exits.
I am not sure if this is the correct way to do it, but I was able to fix it by explicitly calling mainloop.quit():

                let _ = pw_init_tx.send(true);
                let mainloop_weak = mainloop.downgrade();
                let _receiver = pw_play_rv.attach(mainloop.loop_(), move |play| match play {
                    StreamCommand::Toggle(state) => {
                        let _ = stream.set_active(state);
                    }
                    StreamCommand::Stop => {
                        let _ = stream.disconnect();
                        if let Some(mainloop) = mainloop_weak.upgrade() {
                            mainloop.quit();
                        }
                    }
                });

Another problem I've noticed is that there seem to be no way to set the buffer size. Currently StreamConfig.buffer_size appears to be ignored.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants