From 50255ba5df16d3d3a105fbd972e9c2e631ed8b99 Mon Sep 17 00:00:00 2001 From: Dymstro Date: Sat, 1 Jun 2024 18:28:44 +0200 Subject: [PATCH] Added systemd user service and made script exit more gracefully --- README.md | 49 +++++++++++++++++++++++++++++-- nova-chatmix.service | 12 ++++++++ nova.py | 70 ++++++++++++++++++++++++++++++++------------ 3 files changed, 111 insertions(+), 20 deletions(-) create mode 100644 nova-chatmix.service mode change 100644 => 100755 nova.py diff --git a/README.md b/README.md index 7c95fbf..8f0a6be 100644 --- a/README.md +++ b/README.md @@ -78,10 +78,17 @@ For this project I created a simple Python program to both enable the controls a - PipeWire - pactl -On Fedora 39 (assuming PipeWire is already setup) these can be installed with: +On Fedora (assuming PipeWire is already setup) these can be installed with: ``` sudo dnf install pulseaudio-utils python3 python3-pyusb ``` +### Install + +Clone this repo and cd into it +``` +git clone https://git.dymstro.nl/Dymstro/nova-chatmix-linux.git +cd nova-chatmix-linux +``` To be able to run the script as a non-root user, some udev rules need to be applied. This will allow regular users to access the base station USB device. @@ -93,12 +100,50 @@ sudo udevadm control --reload-rules sudo udevadm trigger ``` +Check if your audio device matches the one on line 49 of `nova.py` +``` +pactl list sinks short +# The output should look something like this: +# 47 alsa_output.pci-0000_0c_00.4.iec958-stereo PipeWire s32le 2ch 48000Hz SUSPENDED +# 77 alsa_output.pci-0000_0a_00.1.hdmi-stereo PipeWire s32le 2ch 48000Hz SUSPENDED +# 92 alsa_output.usb-SteelSeries_Arctis_Nova_Pro_Wireless-00.iec958-stereo PipeWire s24le 2ch 48000Hz RUNNING +``` + +In `nova.py`: +``` +## Lines 48-50 +# PW_ORIGINAL_SINK = ( +# "alsa_output.usb-SteelSeries_Arctis_Nova_Pro_Wireless-00.7.iec958-stereo" # Edit this line if needed +# ) +``` + +If you want to run this script on startup you can add and enable the systemd service +``` +## The systemd service expects the script in .local/bin +# Create the folder if it doesn't exist +mkdir -p ~/.local/bin +# Copy the script to the expected location +cp -i nova.py ~/.local/bin + +# Create systemd user unit folder if it doesn't exist +mkdir -p ~/.config/systemd/user +# Install the service file +cp nova-chatmix.service ~/.config/systemd/user/ +# Reload systemd configuration +systemctl --user daemon-reload +# Enable and start the service +systemctl --user enable nova-chatmix --now +``` + +### Run + You can now run the python script to use ChatMix. This will create 2 virtual sound devices: - NovaGame for game/general audio - NovaChat for chat audio ``` +# You do not need to run this if you installed the systemd unit! python nova.py ``` @@ -128,7 +173,7 @@ nova.print_output(True) ## What's Next -Currently none of this is very polished, it should work, but that's about it. I would like to make this a bit more integrated, so that it just works without having to run the python script every time. +Currently none of this is very polished, it should work, but that's about it. ~~I would like to make this a bit more integrated, so that it just works without having to run the python script every time.~~ I added a systemd service to automate starting the script. There are also some other projects that try to make headset features work on linux, for example [HeadsetControl](https://github.com/Sapd/HeadsetControl). I might want to try and get some of this implemented in there. diff --git a/nova-chatmix.service b/nova-chatmix.service new file mode 100644 index 0000000..5a980ce --- /dev/null +++ b/nova-chatmix.service @@ -0,0 +1,12 @@ +[Unit] +Description=This will enable ChatMix for the Steelseries Arctis Nova Pro Wireless +After=pipewire.service pipewire-pulse.service +Wants=network-online.target + +[Service] +Restart=no +Type=simple +ExecStart=%h/.local/bin/nova.py + +[Install] +WantedBy=default.target diff --git a/nova.py b/nova.py old mode 100644 new mode 100755 index 286d57e..707a39d --- a/nova.py +++ b/nova.py @@ -1,8 +1,10 @@ #!/usr/bin/python3 -# Licensed under the EUPL +# Licensed under the 0BSD + +from subprocess import Popen +from signal import signal, SIGINT, SIGTERM -import subprocess from usb.core import find, USBTimeoutError @@ -50,6 +52,18 @@ class NovaProWireless: PW_GAME_SINK = "NovaGame" PW_CHAT_SINK = "NovaChat" + # PipeWire virtual sink processes + PW_LOOPBACK_GAME_PROCESS = None + PW_LOOPBACK_CHAT_PROCESS = None + + # Keeps track of enabled features for when close() is called + CHATMIX_CONTROLS_ENABLED = False + SONAR_ICON_ENABLED = False + CHATMIX_ENABLED = False + + # Stops processes when program exits + CLOSE = False + # Selects correct device, and makes sure we can control it def __init__(self): self.dev = find(idVendor=self.VID, idProduct=self.PID) @@ -62,18 +76,21 @@ class NovaProWireless: def _create_msgdata(self, data: tuple[int]) -> bytes: return bytes(data).ljust(self.MSGLEN, b"0") - # Enables chatmix - def enable_chatmix(self): + # Enables/Disables chatmix controls + def set_chatmix_controls(self, state: bool): self.dev.write( self.ENDPOINT_TX, - self._create_msgdata((self.TX, self.OPT_CHATMIX_ENABLE, 1)), + self._create_msgdata((self.TX, self.OPT_CHATMIX_ENABLE, int(state))), ) + self.CHATMIX_CONTROLS_ENABLED = state - # Enables Sonar Icon - def enable_sonar_icon(self): + # Enables/Disables Sonar Icon + def set_sonar_icon(self, state: bool): self.dev.write( - self.ENDPOINT_TX, self._create_msgdata((self.TX, self.OPT_SONAR_ICON, 1)) + self.ENDPOINT_TX, + self._create_msgdata((self.TX, self.OPT_SONAR_ICON, int(state))), ) + self.SONAR_ICON_ENABLED = state # Sets Volume def set_volume(self, attenuation: int): @@ -90,7 +107,7 @@ class NovaProWireless: ) # Create virtual pipewire loopback sinks, and redirect them to the real headset sink - def _enable_virtual_sinks(self): + def _start_virtual_sinks(self): cmd = [ "pw-loopback", "-P", @@ -98,15 +115,16 @@ class NovaProWireless: "--capture-props=media.class=Audio/Sink", "-n", ] - subprocess.Popen(cmd + [self.PW_GAME_SINK]) - subprocess.Popen(cmd + [self.PW_CHAT_SINK]) + self.PW_LOOPBACK_GAME_PROCESS = Popen(cmd + [self.PW_GAME_SINK]) + self.PW_LOOPBACK_CHAT_PROCESS = Popen(cmd + [self.PW_CHAT_SINK]) # ChatMix implementation # Continuously read from base station and ignore everything but ChatMix messages (OPT_CHATMIX) # The .read method times out and returns an error. This error is catched and basically ignored. Timeout can be configured, but not turned off (I think). def chatmix(self): - self._enable_virtual_sinks() - while True: + self._start_virtual_sinks() + self.CHATMIX_ENABLED = True + while not self.CLOSE: try: msg = self.dev.read(self.ENDPOINT_RX, self.MSGLEN) if msg[1] != self.OPT_CHATMIX: @@ -120,15 +138,15 @@ class NovaProWireless: cmd = ["pactl", "set-sink-volume"] # Actually change volume. Everytime you turn the dial, both volumes are set to the correct level - subprocess.Popen(cmd + [f"input.{self.PW_GAME_SINK}", f"{gamevol}%"]) - subprocess.Popen(cmd + [f"input.{self.PW_CHAT_SINK}", f"{chatvol}%"]) + Popen(cmd + [f"input.{self.PW_GAME_SINK}", f"{gamevol}%"]) + Popen(cmd + [f"input.{self.PW_CHAT_SINK}", f"{chatvol}%"]) # Ignore timeout. except USBTimeoutError: continue # Prints output from base station. `debug` argument enables raw output. def print_output(self, debug: bool = False): - while True: + while not self.CLOSE: try: msg = self.dev.read(self.ENDPOINT_RX, self.MSGLEN) if debug: @@ -147,10 +165,26 @@ class NovaProWireless: except USBTimeoutError: continue + # Terminates processes and disables features + def close(self, signum, frame): + self.CLOSE = True + if self.CHATMIX_CONTROLS_ENABLED: + self.set_chatmix_controls(False) + if self.SONAR_ICON_ENABLED: + print("test") + self.set_sonar_icon(False) + if self.CHATMIX_ENABLED: + self.PW_LOOPBACK_GAME_PROCESS.terminate() + self.PW_LOOPBACK_CHAT_PROCESS.terminate() + # When run directly, just start the ChatMix implementation. (And activate the icon, just for fun) if __name__ == "__main__": nova = NovaProWireless() - nova.enable_sonar_icon() - nova.enable_chatmix() + nova.set_sonar_icon(True) + nova.set_chatmix_controls(True) + + signal(SIGINT, nova.close) + signal(SIGTERM, nova.close) + nova.chatmix()