-
Notifications
You must be signed in to change notification settings - Fork 57
/
fftw.py
123 lines (109 loc) · 6.43 KB
/
fftw.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import numpy as np
from numba import guvectorize
from pyfftw import FFTW
def dft(buf_in, buf_out):
"""
Perform discrete Fourier transforms using the FFTW library. FFTW optimizes
the FFT algorithm based on the size of the arrays, with SIMD parallelized
commands. This optimization requires initialization, so this is a factory
function that returns a Numba gufunc that performs the FFT. FFTW works on
fixed memory buffers, so you must tell it what memory to use ahead of time.
When using this with ProcessingChain, to ensure the correct buffers are used,
call ProcessingChain.get_variable('var_name') to give it the internal memory
buffer directly---with build_dsp, you can just give it the name, and it will
automatically happen. The possible dtypes for the input/outputs are:
- float32/float (size n) -> complex64 (size n/2+1)
- float64/double (size n) -> complex128 (size n/2+1)
- float128/longdouble (size n) -> complex256/clongdouble (size n/2+1)
- complex64 (size n) -> complex64 (size n )
- complex128 (size n) -> complex128 (size n )
- complex256/clongdouble (size n) -> complex256/clongdouble (size n )
"""
try:
dft_fun = FFTW(buf_in, buf_out, axes=(-1,), direction='FFTW_FORWARD')
except ValueError:
raise ValueError("""Incompatible array types/shapes. Allowed:
- float32/float (size n) -> complex64 (size n/2+1)
- float64/double (size n) -> complex128 (size n/2+1)
- float128/longdouble (size n) -> complex256/clongdouble (size n/2+1)
- complex64 (size n) -> complex64 (size n)
- complex128 (size n) -> complex128 (size n)
- complex256/clongdouble (size n) -> complex256/clongdouble (size n)""")
typesig = 'void(' + str(buf_in.dtype) + '[:, :], ' + str(buf_out.dtype) + '[:, :])'
sizesig = '(m, n)->(m, n)' if buf_in.shape == buf_out.shape else '(m, n),(m, l)'
@guvectorize([typesig], sizesig, forceobj=True)
def dft(wf_in, dft_out):
dft_fun(wf_in, dft_out)
return dft
def inv_dft(buf_in, buf_out):
"""
Perform inverse discrete Fourier transforms using the FFTW library. FFTW
optimizes the FFT algorithm based on the size of the arrays, with SIMD parallelized
commands. This optimization requires initialization, so this is a factory
function that returns a Numba gufunc that performs the FFT. FFTW works on
fixed memory buffers, so you must tell it what memory to use ahead of time.
When using this with ProcessingChain, to ensure the correct buffers are used,
call ProcessingChain.get_variable('var_name') to give it the internal memory
buffer directly---with build_dsp, you can just give it the name, and it will
automatically happen. The possible dtypes for the input/outputs are:
- complex64 (size n/2+1) -> float32/float (size n)
- complex128 (size n/2+1) -> float64/double (size n)
- complex256/clongdouble (size n/2+1) -> float128/longdouble (size n)
- complex64 (size n ) -> complex64 (size n)
- complex128 (size n ) -> complex128 (size n)
- complex256/clongdouble (size n ) -> complex256/clongdouble (size n)
"""
try:
idft_fun = FFTW(buf_in, buf_out, axes=(-1,), direction='FFTW_BACKWARD')
except ValueError:
raise ValueError("""Incompatible array types/shapes. Allowed:
- complex64 (size n/2+1) -> float32/float (size n)
- complex128 (size n/2+1) -> float64/double (size n)
- complex256/clongdouble (size n/2+1) -> float128/longdouble (size n)
- complex64 (size n) -> complex64 (size n)
- complex128 (size n) -> complex128 (size n)
- complex256/clongdouble (size n) -> complex256/clongdouble (size n)""")
typesig = 'void(' + str(buf_in.dtype) + '[:, :], ' + str(buf_out.dtype) + '[:, :])'
sizesig = '(m, n)->(m, n)' if buf_in.shape == buf_out.shape else '(m, n),(m, l)'
@guvectorize([typesig], sizesig, forceobj=True)
def inv_dft(wf_in, dft_out):
idft_fun(wf_in, dft_out)
return inv_dft
def psd(buf_in, buf_out):
"""
Perform discrete Fourier transforms using the FFTW library, and use it to get
the power spectral density. FFTW optimizes the FFT algorithm based on the
size of the arrays, with SIMD parallelized commands. This optimization
requires initialization, so this is a factory function that returns a Numba gufunc
that performs the FFT. FFTW works on fixed memory buffers, so you must
tell it what memory to use ahead of time. When using this with ProcessingChain,
to ensure the correct buffers are used, call ProcessingChain.get_variable('var_name')
to give it the internal memory buffer directly---with build_dsp, you can just
give it the name, and it will automatically happen. The possible dtypes for the
input/outputs are:
- complex64 (size n) -> float32/float (size n )
- complex128 (size n) -> float64/double (size n )
- complex256/clongdouble (size n) -> float128/longdouble (size n )
- float32/float (size n) -> float32/float (size n/2+1)
- float64/double (size n) -> float64/double (size n/2+1)
- float128/longdouble (size n) -> float128/longdouble (size n/2+1)
"""
# build intermediate array for the dft, which will be abs'd to get the PSD
buf_dft = np.ndarray(buf_out.shape, np.dtype('complex' + str(buf_out.dtype.itemsize * 16)))
try:
dft_fun = FFTW(buf_in, buf_dft, axes=(-1,), direction='FFTW_FORWARD')
except ValueError:
raise ValueError("""Incompatible array types/shapes. Allowed:
- complex64 (size n) -> float32/float (size n)
- complex128 (size n) -> float64/double (size n)
- complex256/clongdouble (size n) -> float128/longdouble (size n)
- float32/float (size n) -> float32/float (size n/2+1)
- float64/double (size n) -> float64/double (size n/2+1)
- float128/longdouble (size n) -> float128/longdouble (size n/2+1)""")
typesig = 'void(' + str(buf_in.dtype) + '[:, :], ' + str(buf_out.dtype) + '[:, :])'
sizesig = '(m, n)->(m, n)' if buf_in.shape == buf_out.shape else '(m, n),(m, l)'
@guvectorize([typesig], sizesig, forceobj=True)
def psd(wf_in, psd_out):
dft_fun(wf_in, buf_dft)
np.abs(buf_dft, psd_out)
return psd