Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions docs/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ requests==2.32.3
# via sphinx
roman-numerals-py==3.1.0 ; python_full_version >= '3.11'
# via sphinx
ruff==0.11.4
ruff==0.11.5
scipy==1.15.2
# via
# pyriodicity
Expand Down Expand Up @@ -116,11 +116,11 @@ tomli==2.2.1 ; python_full_version <= '3.11'
# numpydoc
# pytest
# sphinx
typing-extensions==4.13.1
typing-extensions==4.13.2
# via
# beautifulsoup4
# pydata-sphinx-theme
tzdata==2025.2
# via pandas
urllib3==2.3.0
urllib3==2.4.0
# via requests
Empty file.
196 changes: 196 additions & 0 deletions pyriodicity/_internal/online_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
from typing import Literal, Optional, Union

import numpy as np
from numpy.typing import ArrayLike, NDArray
from scipy.signal import detrend, get_window, ShortTimeFFT


class OnlineHelper:
"""
Online helper class for efficient signal processing using sliding windows.

Implements an efficient sliding window approach for computing real-time FFT
and ACF of a signal stream. Uses twiddle factors and incremental updates
to minimize computational complexity.

Parameters
----------
window_size : int
Size of the sliding window for signal processing.
buffer_size : int, optional, default = 2 * window_size
Size of the samples buffer. Must be at least equal to window_size.
window_func : float or str or tuple, optional, default = 'boxcar'
Window function to apply. See ``scipy.signal.get_window`` for accepted formats
of the ``window`` parameter.
detrend_func : {'constant', 'linear'}, optional, default = 'linear'
The kind of detrending to apply. If None, no detrending is applied.

Attributes
----------
window : NDArray
The window function array.
window_buffer : NDArray
Circular buffer storing the last ``window_size`` windowed signal samples.
buffer : NDArray
Circular buffer storing the last ``buffer_size`` samples.
rfft : NDArray
Current FFT spectrum of the windowed signal.

See Also
--------
scipy.signal.get_window
Get a window function with the specified parameters.

Raises
------
ValueError
If history_size is less than window_size.
"""

def __init__(
self,
window_size: int,
buffer_size: Optional[int] = None,
window_func: Union[str, float, tuple] = "boxcar",
detrend_func: Optional[Literal["constant", "linear"]] = "linear",
):
# Initialize size attributes
self.window_size = window_size
self.buffer_size = 2 * window_size if buffer_size is None else buffer_size
if self.buffer_size < window_size:
raise ValueError(
f"History size ({self.buffer_size}) must be at least "
f"equal to window size ({window_size})"
)

# Initialize the buffers
self.window_buffer = np.zeros(window_size)
self.buffer = np.zeros(self.buffer_size)
self._buffer_index = 0

# Initialize the detrend function attribute
self._detrend_func = detrend_func

# Get the window
self.window = get_window(window=window_func, Nx=window_size)

# Compute the twiddle factors
self._twiddle = np.exp(
-2j * np.pi * np.arange(window_size // 2 + 1) / window_size
)

# Initialize the FFT spectrum and frequencies
self.rfft = np.fft.rfft(self.window_buffer)
self.freq = np.fft.rfftfreq(self.window_size)

# Initialize Short Time FFT
self._stft = ShortTimeFFT.from_window(
win_param=window_func, fs=1.0, nperseg=self.window_size, noverlap=0
)

def update(
self,
data: Union[np.floating, ArrayLike],
return_value: Literal["rfft", "acf"] = "rfft",
) -> NDArray:
"""
Update the signal buffer with new samples and compute transforms.

Processes new samples through the circular buffer, updating the FFT spectrum
efficiently using twiddle factors. Can return either the FFT or ACF of
the current window.

Parameters
----------
data : numpy.floating or array_like
New samples to process. Can be a single value or an array of values.
return_value : {'rfft', 'acf'}, optional, default = 'rfft'
The type of transform to return:
- 'rfft': Return the real FFT spectrum
- 'acf': Return the autocorrelation function

Returns
-------
NDArray
The requested transform (FFT spectrum or ACF) of the current window.

Raises
------
ValueError
If return_value is not one of {'rfft', 'acf'}.
"""

data_array = np.asarray(data).flat
n_samples = len(data_array)

for i in range(0, n_samples, self.window_size):
batch = data_array[i : i + self.window_size]
batch_size = len(batch)

# Update history buffer
self.buffer = np.roll(self.buffer, -batch_size)
self.buffer[-batch_size:] = batch

# Get old samples and update window buffer
old_samples = self.window_buffer[:batch_size].copy()
self.window_buffer = np.roll(self.window_buffer, -batch_size)
self.window_buffer[-batch_size:] = batch

# Detrend if needed
if self._detrend_func is not None:
detrended_buffer = detrend(
np.insert(self.window_buffer, 0, old_samples),
type=self._detrend_func,
)
old_samples = detrended_buffer[:batch_size]
batch = detrended_buffer[-batch_size:]

# Apply window
old_samples *= self.window[:batch_size]
self.window = np.roll(self.window, -batch_size)
batch *= self.window[-batch_size:]

# Update spectrum using twiddle factors for the batch
shift_indices = np.arange(batch_size)[:, None] * np.arange(
self.window_size // 2 + 1
)
shifts = np.exp(-2j * np.pi * shift_indices / self.window_size)
delta_spectrum = (batch - old_samples) @ shifts
self.rfft = self._twiddle**batch_size * (self.rfft + delta_spectrum)

# Update buffer index
self._buffer_index = (self._buffer_index + batch_size) % self.buffer_size

# Recompute spectrum when buffer is filled
if self._buffer_index == 0:
stft_buffer = (
self.buffer
if self._detrend_func is None
else detrend(self.buffer, type=self._detrend_func)
)
S = self._stft.stft(stft_buffer)
self.rfft = S[:, -1]

match return_value:
case "rfft":
return self.rfft
case "acf":
return self.get_acf()
case _:
raise ValueError(f"Unsupported return_value '{return_value}'")

Check warning on line 180 in pyriodicity/_internal/online_helper.py

View check run for this annotation

Codecov / codecov/patch

pyriodicity/_internal/online_helper.py#L179-L180

Added lines #L179 - L180 were not covered by tests

def get_acf(self) -> NDArray:
"""
Get the current autocorrelation function array.

Computes the ACF from the current FFT spectrum using the inverse FFT
and normalizes it by the zero-lag autocorrelation.

Returns
-------
NDArray
The normalized autocorrelation function array of the current window buffer.
"""

acf_arr = np.fft.irfft(self.rfft)
return np.zeros_like(acf_arr) if acf_arr[0] == 0 else acf_arr / acf_arr[0]
80 changes: 68 additions & 12 deletions pyriodicity/tools/_tools.py → pyriodicity/_internal/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,25 @@

@staticmethod
def to_1d_array(x: ArrayLike) -> NDArray:
"""
Convert input to a contiguous 1-dimensional numpy array.

Parameters
----------
x : array_like
Input array to be converted. Must be squeezable to 1-d.

Returns
-------
NDArray
A contiguous 1-dimensional numpy array of type double.

Raises
------
ValueError
If the input cannot be squeezed to 1-d.
"""

x = np.ascontiguousarray(np.squeeze(np.asarray(x)), dtype=np.double)
if x.ndim != 1:
raise ValueError("x must be a 1-dimensional array")
Expand All @@ -15,12 +34,51 @@ def to_1d_array(x: ArrayLike) -> NDArray:

@staticmethod
def apply_window(x: ArrayLike, window_func: Union[str, float, tuple]) -> NDArray:
"""
Apply a window function to the input array.

Parameters
----------
x : array_like
Input array. Must be squeezable to 1-d.
window_func : float, str, tuple
Window function to apply. See ``scipy.signal.get_window`` for accepted formats
of the ``window`` parameter.

Returns
-------
NDArray
Input array with the window function applied.

See Also
--------
scipy.signal.get_window
Get a window function.
"""

x = to_1d_array(x)
return x * get_window(window=window_func, Nx=len(x))


@staticmethod
def acf(x: ArrayLike) -> NDArray:
"""
Compute the autocorrelation function of a signal.

Uses FFT to compute the autocorrelation efficiently.

Parameters
----------
x : array_like
Input array. Must be squeezable to 1-d.

Returns
-------
NDArray
The normalized autocorrelation function of the input.
Length is equal to the input length.
"""

x = to_1d_array(x)
n = len(x)
fft = np.fft.fft(x, n=n * 2)
Expand Down Expand Up @@ -53,25 +111,23 @@ def power_threshold(
It determines the cutoff point in the sorted list of the maximum
power values from the periodograms of the permuted data.
Value must be between 0 and 100 inclusive.
window_func : float, tuple, array_like default = 'boxcar'
Window function to be applied to the time series. Check
``window`` parameter documentation for ``scipy.signal.get_window``
function for more information on the accepted formats of this
parameter.
window_func : float, str, tuple, optional, default = 'boxcar'
Window function to apply. See ``scipy.signal.get_window`` for accepted formats
of the ``window`` parameter.
detrend_func : {'constant', 'linear'}, optional, default = 'linear'
The kind of detrending to be applied on the signal. If None, no detrending
is applied.
The kind of detrending to apply. If None, no detrending is applied.

Returns
-------
numpy.floating
Power threshold of the target data.

See Also
--------
scipy.signal.periodogram
Estimate power spectral density using a periodogram.

Returns
-------
float
Power threshold of the target data.
"""

max_powers = []
while len(max_powers) < k:
_, pxx = periodogram(
Expand Down
9 changes: 7 additions & 2 deletions pyriodicity/online/online_acf.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from numpy.typing import ArrayLike, NDArray
from scipy.signal import find_peaks

from pyriodicity.tools import OnlineHelper
from .._internal.online_helper import OnlineHelper


class OnlineACFPeriodicityDetector:
Expand All @@ -18,6 +18,8 @@ class OnlineACFPeriodicityDetector:
----------
window_size : int
Size of the sliding window for the ACF computation.
buffer_size : int, optional, default = 2 * window_size
Size of the samples buffer. Must be at least equal to window_size.
window_func : float, str, tuple, optional, default = 'boxcar'
Window function to apply. See ``scipy.signal.get_window`` for accepted formats
of the ``window`` parameter.
Expand All @@ -41,11 +43,14 @@ class OnlineACFPeriodicityDetector:
def __init__(
self,
window_size: int,
buffer_size: Optional[int] = None,
window_func: Union[float, str, tuple] = "boxcar",
detrend_func: Optional[Literal["constant", "linear"]] = "linear",
):
self.window_size = window_size
self.online_helper = OnlineHelper(window_size, window_func, detrend_func)
self.online_helper = OnlineHelper(
window_size, buffer_size, window_func, detrend_func
)

def detect(
self,
Expand Down
Loading