-
-
Notifications
You must be signed in to change notification settings - Fork 9.5k
/
entropy.pyx
155 lines (130 loc) · 4.75 KB
/
entropy.pyx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
cimport numpy as np
import numpy as np
from libc.stdint cimport uint32_t, uint64_t
__all__ = ['random_entropy', 'seed_by_array']
np.import_array()
cdef extern from "src/splitmix64/splitmix64.h":
cdef uint64_t splitmix64_next(uint64_t *state) nogil
cdef extern from "src/entropy/entropy.h":
cdef bint entropy_getbytes(void* dest, size_t size)
cdef bint entropy_fallback_getbytes(void *dest, size_t size)
cdef Py_ssize_t compute_numel(size):
cdef Py_ssize_t i, n = 1
if isinstance(size, tuple):
for i in range(len(size)):
n *= size[i]
else:
n = size
return n
def seed_by_array(object seed, Py_ssize_t n):
"""
Transforms a seed array into an initial state
Parameters
----------
seed: ndarray, 1d, uint64
Array to use. If seed is a scalar, promote to array.
n : int
Number of 64-bit unsigned integers required
Notes
-----
Uses splitmix64 to perform the transformation
"""
cdef uint64_t seed_copy = 0
cdef uint64_t[::1] seed_array
cdef uint64_t[::1] initial_state
cdef Py_ssize_t seed_size, iter_bound
cdef int i, loc = 0
if hasattr(seed, 'squeeze'):
seed = seed.squeeze()
arr = np.asarray(seed)
if arr.shape == ():
err_msg = 'Scalar seeds must be integers between 0 and 2**64 - 1'
if not np.isreal(arr):
raise TypeError(err_msg)
int_seed = int(seed)
if int_seed != seed:
raise TypeError(err_msg)
if int_seed < 0 or int_seed > 2**64 - 1:
raise ValueError(err_msg)
seed_array = np.array([int_seed], dtype=np.uint64)
elif issubclass(arr.dtype.type, np.inexact):
raise TypeError('seed array must be integers')
else:
err_msg = "Seed values must be integers between 0 and 2**64 - 1"
obj = np.asarray(seed).astype(np.object)
if obj.ndim != 1:
raise ValueError('Array-valued seeds must be 1-dimensional')
if not np.isreal(obj).all():
raise TypeError(err_msg)
if ((obj > int(2**64 - 1)) | (obj < 0)).any():
raise ValueError(err_msg)
try:
obj_int = obj.astype(np.uint64, casting='unsafe')
except ValueError:
raise ValueError(err_msg)
if not (obj == obj_int).all():
raise TypeError(err_msg)
seed_array = obj_int
seed_size = seed_array.shape[0]
iter_bound = n if n > seed_size else seed_size
initial_state = <np.ndarray>np.empty(n, dtype=np.uint64)
for i in range(iter_bound):
if i < seed_size:
seed_copy ^= seed_array[i]
initial_state[loc] = splitmix64_next(&seed_copy)
loc += 1
if loc == n:
loc = 0
return np.array(initial_state)
def random_entropy(size=None, source='system'):
"""
random_entropy(size=None, source='system')
Read entropy from the system cryptographic provider
Parameters
----------
size : int or tuple of ints, optional
Output shape. If the given shape is, e.g., ``(m, n, k)``, then
``m * n * k`` samples are drawn. Default is None, in which case a
single value is returned.
source : str {'system', 'fallback'}
Source of entropy. 'system' uses system cryptographic pool.
'fallback' uses a hash of the time and process id.
Returns
-------
entropy : scalar or array
Entropy bits in 32-bit unsigned integers. A scalar is returned if size
is `None`.
Notes
-----
On Unix-like machines, reads from ``/dev/urandom``. On Windows machines
reads from the RSA algorithm provided by the cryptographic service
provider.
This function reads from the system entropy pool and so samples are
not reproducible. In particular, it does *NOT* make use of a
BitGenerator, and so ``seed`` and setting ``state`` have no
effect.
Raises RuntimeError if the command fails.
"""
cdef bint success = True
cdef Py_ssize_t n = 0
cdef uint32_t random = 0
cdef uint32_t [:] randoms
if source not in ('system', 'fallback'):
raise ValueError('Unknown value in source.')
if size is None:
if source == 'system':
success = entropy_getbytes(<void *>&random, 4)
else:
success = entropy_fallback_getbytes(<void *>&random, 4)
else:
n = compute_numel(size)
randoms = np.zeros(n, dtype=np.uint32)
if source == 'system':
success = entropy_getbytes(<void *>(&randoms[0]), 4 * n)
else:
success = entropy_fallback_getbytes(<void *>(&randoms[0]), 4 * n)
if not success:
raise RuntimeError('Unable to read from system cryptographic provider')
if n == 0:
return random
return np.asarray(randoms).reshape(size)