206 lines
5.9 KiB
Python
206 lines
5.9 KiB
Python
import cython
|
|
from cython.cimports.av.audio.format import get_audio_format
|
|
from cython.cimports.av.audio.layout import get_audio_layout
|
|
from cython.cimports.av.audio.plane import AudioPlane
|
|
from cython.cimports.av.error import err_check
|
|
from cython.cimports.av.utils import check_ndarray
|
|
|
|
_cinit_bypass_sentinel = object()
|
|
|
|
|
|
@cython.cfunc
|
|
def alloc_audio_frame() -> AudioFrame:
|
|
return AudioFrame(_cinit_bypass_sentinel)
|
|
|
|
|
|
format_dtypes = {
|
|
"dbl": "f8",
|
|
"dblp": "f8",
|
|
"flt": "f4",
|
|
"fltp": "f4",
|
|
"s16": "i2",
|
|
"s16p": "i2",
|
|
"s32": "i4",
|
|
"s32p": "i4",
|
|
"u8": "u1",
|
|
"u8p": "u1",
|
|
}
|
|
|
|
|
|
@cython.cclass
|
|
class AudioFrame(Frame):
|
|
"""A frame of audio."""
|
|
|
|
def __cinit__(self, format="s16", layout="stereo", samples=0, align=1):
|
|
if format is _cinit_bypass_sentinel:
|
|
return
|
|
|
|
cy_format: AudioFormat = AudioFormat(format)
|
|
cy_layout: AudioLayout = AudioLayout(layout)
|
|
self._init(cy_format.sample_fmt, cy_layout.layout, samples, align)
|
|
|
|
@cython.cfunc
|
|
def _init(
|
|
self,
|
|
format: lib.AVSampleFormat,
|
|
layout: lib.AVChannelLayout,
|
|
nb_samples: cython.uint,
|
|
align: cython.uint,
|
|
):
|
|
self.ptr.nb_samples = nb_samples
|
|
self.ptr.format = format
|
|
self.ptr.ch_layout = layout
|
|
|
|
# Sometimes this is called twice. Oh well.
|
|
self._init_user_attributes()
|
|
|
|
if self.layout.nb_channels != 0 and nb_samples:
|
|
# Cleanup the old buffer.
|
|
lib.av_freep(cython.address(self._buffer))
|
|
|
|
# Get a new one.
|
|
self._buffer_size = err_check(
|
|
lib.av_samples_get_buffer_size(
|
|
cython.NULL, self.layout.nb_channels, nb_samples, format, align
|
|
)
|
|
)
|
|
self._buffer = cython.cast(
|
|
cython.pointer[uint8_t], lib.av_malloc(self._buffer_size)
|
|
)
|
|
if not self._buffer:
|
|
raise MemoryError("cannot allocate AudioFrame buffer")
|
|
|
|
# Connect the data pointers to the buffer.
|
|
err_check(
|
|
lib.avcodec_fill_audio_frame(
|
|
self.ptr,
|
|
self.layout.nb_channels,
|
|
cython.cast(lib.AVSampleFormat, self.ptr.format),
|
|
self._buffer,
|
|
self._buffer_size,
|
|
align,
|
|
)
|
|
)
|
|
|
|
def __dealloc__(self):
|
|
lib.av_freep(cython.address(self._buffer))
|
|
|
|
@cython.cfunc
|
|
def _init_user_attributes(self):
|
|
self.layout = get_audio_layout(self.ptr.ch_layout)
|
|
self.format = get_audio_format(cython.cast(lib.AVSampleFormat, self.ptr.format))
|
|
|
|
def __repr__(self):
|
|
return (
|
|
f"<av.{self.__class__.__name__} pts={self.pts}, {self.samples} "
|
|
f"samples at {self.rate}Hz, {self.layout.name}, {self.format.name} at 0x{id(self):x}>"
|
|
)
|
|
|
|
@staticmethod
|
|
def from_ndarray(array, format="s16", layout="stereo"):
|
|
"""
|
|
Construct a frame from a numpy array.
|
|
"""
|
|
import numpy as np
|
|
|
|
py_format = format if isinstance(format, AudioFormat) else AudioFormat(format)
|
|
py_layout = layout if isinstance(layout, AudioLayout) else AudioLayout(layout)
|
|
format = py_format.name
|
|
|
|
# map avcodec type to numpy type
|
|
try:
|
|
dtype = np.dtype(format_dtypes[format])
|
|
except KeyError:
|
|
raise ValueError(
|
|
f"Conversion from numpy array with format `{format}` is not yet supported"
|
|
)
|
|
|
|
# check input format
|
|
nb_channels = py_layout.nb_channels
|
|
check_ndarray(array, dtype, 2)
|
|
if py_format.is_planar:
|
|
if array.shape[0] != nb_channels:
|
|
raise ValueError(
|
|
f"Expected planar `array.shape[0]` to equal `{nb_channels}` but got `{array.shape[0]}`"
|
|
)
|
|
samples = array.shape[1]
|
|
else:
|
|
if array.shape[0] != 1:
|
|
raise ValueError(
|
|
f"Expected packed `array.shape[0]` to equal `1` but got `{array.shape[0]}`"
|
|
)
|
|
samples = array.shape[1] // nb_channels
|
|
|
|
frame = AudioFrame(format=py_format, layout=py_layout, samples=samples)
|
|
for i, plane in enumerate(frame.planes):
|
|
plane.update(array[i, :])
|
|
return frame
|
|
|
|
@property
|
|
def planes(self):
|
|
"""
|
|
A tuple of :class:`~av.audio.plane.AudioPlane`.
|
|
|
|
:type: tuple
|
|
"""
|
|
plane_count: cython.int = 0
|
|
while self.ptr.extended_data[plane_count]:
|
|
plane_count += 1
|
|
|
|
return tuple([AudioPlane(self, i) for i in range(plane_count)])
|
|
|
|
@property
|
|
def samples(self):
|
|
"""
|
|
Number of audio samples (per channel).
|
|
|
|
:type: int
|
|
"""
|
|
return self.ptr.nb_samples
|
|
|
|
@property
|
|
def sample_rate(self):
|
|
"""
|
|
Sample rate of the audio data, in samples per second.
|
|
|
|
:type: int
|
|
"""
|
|
return self.ptr.sample_rate
|
|
|
|
@sample_rate.setter
|
|
def sample_rate(self, value):
|
|
self.ptr.sample_rate = value
|
|
|
|
@property
|
|
def rate(self):
|
|
"""Another name for :attr:`sample_rate`."""
|
|
return self.ptr.sample_rate
|
|
|
|
@rate.setter
|
|
def rate(self, value):
|
|
self.ptr.sample_rate = value
|
|
|
|
def to_ndarray(self):
|
|
"""Get a numpy array of this frame.
|
|
|
|
.. note:: Numpy must be installed.
|
|
|
|
"""
|
|
import numpy as np
|
|
|
|
try:
|
|
dtype = np.dtype(format_dtypes[self.format.name])
|
|
except KeyError:
|
|
raise ValueError(
|
|
f"Conversion from {self.format.name!r} format to numpy array is not supported."
|
|
)
|
|
|
|
if self.format.is_planar:
|
|
count = self.samples
|
|
else:
|
|
count = self.samples * self.layout.nb_channels
|
|
|
|
return np.vstack(
|
|
[np.frombuffer(x, dtype=dtype, count=count) for x in self.planes]
|
|
)
|