Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add append and extend #1050

Merged
merged 26 commits into from Dec 6, 2022
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
21 changes: 20 additions & 1 deletion src/neptune/new/attributes/namespace.py
Expand Up @@ -13,14 +13,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import argparse
from collections.abc import MutableMapping
from typing import (
TYPE_CHECKING,
Any,
Collection,
Dict,
Iterable,
Iterator,
List,
Mapping,
Optional,
Union,
)

Expand Down Expand Up @@ -63,6 +67,19 @@ def __len__(self) -> int:
def __iter__(self) -> Iterator[str]:
yield from self._attributes.__iter__()

def extend(
self,
value: Union[Any, Iterable[Any]],
aniezurawski marked this conversation as resolved.
Show resolved Hide resolved
steps: Optional[Collection[float]] = None,
timestamps: Optional[Collection[float]] = None,
wait: bool = False,
**kwargs,
) -> None:
if not isinstance(value, NamespaceVal):
value = NamespaceVal(value)
for k, v in value.value.items():
self._container[f"{self._str_path}/{k}"].extend(v, steps, timestamps, wait, **kwargs)

def to_dict(self) -> Dict[str, Any]:
result = {}
for key, value in self._attributes.items():
Expand All @@ -73,7 +90,9 @@ def to_dict(self) -> Dict[str, Any]:
return result

def assign(self, value: Union[NamespaceVal, dict, Mapping], wait: bool = False):
if not isinstance(value, NamespaceVal):
if isinstance(value, argparse.Namespace):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is bugfix for assigning argparse.Namespace type as neptune.Namespace when attribute is already defined.

Due to changes I've made in Handler.assign I had to fix it now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact I had to change Handler.assign implementation again, but bugfix left...

value = NamespaceVal(vars(value))
elif not isinstance(value, NamespaceVal):
value = NamespaceVal(value)

for k, v in value.value.items():
Expand Down
22 changes: 9 additions & 13 deletions src/neptune/new/attributes/series/file_series.py
Expand Up @@ -35,30 +35,26 @@
)
from neptune.new.internal.types.file_types import FileType
from neptune.new.internal.utils import base64_encode
from neptune.new.internal.utils.iteration import get_batches
from neptune.new.internal.utils.limits import image_size_exceeds_limit_for_logging
from neptune.new.types import File
from neptune.new.types.series.file_series import FileSeries as FileSeriesVal

Val = FileSeriesVal
Data = File
LogOperation = LogImages


class FileSeries(Series[Val, Data]):
def _get_log_operations_from_value(self, value: Val, step: Optional[float], timestamp: float) -> List[Operation]:
values = [
LogImages.ValueType(
ImageValue(
data=self._get_base64_image_content(val),
name=value.name,
description=value.description,
),
step=step,
ts=timestamp,
class FileSeries(Series[Val, Data, LogOperation], max_batch_size=1, operation_cls=LogOperation):
@classmethod
def _map_series_val(cls, value: Val) -> List[ImageValue]:
return [
ImageValue(
data=cls._get_base64_image_content(val),
name=value.name,
description=value.description,
)
for val in value.values
]
return [LogImages(self._path, chunk) for chunk in get_batches(values, batch_size=1)]

def _get_clear_operation(self) -> Operation:
return ClearImageLog(self._path)
Expand Down
11 changes: 4 additions & 7 deletions src/neptune/new/attributes/series/float_series.py
Expand Up @@ -15,7 +15,6 @@
#
from typing import (
Iterable,
List,
Optional,
Union,
)
Expand All @@ -30,15 +29,17 @@
Operation,
)
from neptune.new.internal.utils import verify_type
from neptune.new.internal.utils.iteration import get_batches
from neptune.new.internal.utils.logger import logger
from neptune.new.types.series.float_series import FloatSeries as FloatSeriesVal

Val = FloatSeriesVal
Data = Union[float, int]
LogOperation = LogFloats


class FloatSeries(Series[Val, Data], FetchableSeries[FloatSeriesValues]):
class FloatSeries(
Series[Val, Data, LogOperation], FetchableSeries[FloatSeriesValues], max_batch_size=100, operation_cls=LogOperation
):
def configure(
self,
min: Optional[Union[float, int]] = None,
Expand All @@ -52,10 +53,6 @@ def configure(
with self._container.lock():
self._enqueue_operation(ConfigFloatSeries(self._path, min, max, unit), wait)

def _get_log_operations_from_value(self, value: Val, step: Optional[float], timestamp: float) -> List[Operation]:
values = [LogFloats.ValueType(val, step=step, ts=timestamp) for val in value.values]
return [LogFloats(self._path, chunk) for chunk in get_batches(values, batch_size=100)]

def _get_clear_operation(self) -> Operation:
return ClearFloatLog(self._path)

Expand Down
87 changes: 71 additions & 16 deletions src/neptune/new/attributes/series/series.py
Expand Up @@ -15,7 +15,9 @@
#
import abc
import time
from itertools import cycle
from typing import (
Collection,
Generic,
Iterable,
List,
Expand All @@ -25,34 +27,60 @@
)

from neptune.new.attributes.attribute import Attribute
from neptune.new.internal.operation import Operation
from neptune.new.internal.operation import LogOperation
from neptune.new.internal.utils import (
is_collection,
verify_collection_type,
verify_type,
)
from neptune.new.internal.utils.iteration import get_batches
from neptune.new.types.series.series import Series as SeriesVal

Val = TypeVar("Val", bound=SeriesVal)
Data = TypeVar("Data")
ValTV = TypeVar("ValTV", bound=SeriesVal)
DataTV = TypeVar("DataTV")
LogOperationTV = TypeVar("LogOperationTV", bound=LogOperation)
aniezurawski marked this conversation as resolved.
Show resolved Hide resolved


class Series(Attribute, Generic[Val, Data]):
class Series(Attribute, Generic[ValTV, DataTV, LogOperationTV]):
def __init_subclass__(cls, max_batch_size: int, operation_cls: type(LogOperationTV)):
cls.max_batch_size = max_batch_size
cls.operation_cls = operation_cls

def clear(self, wait: bool = False) -> None:
self._clear_impl(wait)

@abc.abstractmethod
def _get_log_operations_from_value(self, value: Val, step: Optional[float], timestamp: float) -> List[Operation]:
pass
def _get_log_operations_from_value(
self, value: ValTV, *, steps: Union[None, Collection[float]], timestamps: Union[None, Collection[float]]
) -> List[LogOperationTV]:
if steps is None:
steps = cycle([None])
else:
assert len(value) == len(steps)
if timestamps is None:
timestamps = cycle([time.time()])
else:
assert len(value) == len(timestamps)

mapped_values = self._map_series_val(value)
values_with_step_and_ts = zip(mapped_values, steps, timestamps)
aniezurawski marked this conversation as resolved.
Show resolved Hide resolved
log_values = [self.operation_cls.ValueType(val, step=step, ts=ts) for val, step, ts in values_with_step_and_ts]
return [
self.operation_cls(self._path, chunk) for chunk in get_batches(log_values, batch_size=self.max_batch_size)
]

def _get_config_operation_from_value(self, value: Val) -> Optional[Operation]:
@classmethod
def _map_series_val(cls, value: ValTV) -> List[DataTV]:
return value.values

def _get_config_operation_from_value(self, value: ValTV) -> Optional[LogOperationTV]:
return None

@abc.abstractmethod
def _get_clear_operation(self) -> Operation:
def _get_clear_operation(self) -> LogOperationTV:
pass

@abc.abstractmethod
def _data_to_value(self, values: Iterable, **kwargs) -> Val:
def _data_to_value(self, values: Iterable, **kwargs) -> ValTV:
pass

@abc.abstractmethod
Expand All @@ -71,19 +99,19 @@ def assign(self, value, wait: bool = False) -> None:
self._enqueue_operation(clear_op, wait=wait)
else:
self._enqueue_operation(clear_op, wait=False)
ts = time.time()
ops = self._get_log_operations_from_value(value, None, ts)
ops = self._get_log_operations_from_value(value, steps=None, timestamps=None)
for op in ops:
self._enqueue_operation(op, wait=wait)

def log(
self,
value: Union[Data, Iterable[Data]],
value: Union[DataTV, Iterable[DataTV]],
step: Optional[float] = None,
timestamp: Optional[float] = None,
wait: bool = False,
**kwargs,
) -> None:
"""log is a deprecated method, this code should be removed in future"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want some warning?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No.

if is_collection(value):
if step is not None and len(value) > 1:
raise ValueError("Collection of values are not supported for explicitly defined 'step'.")
Expand All @@ -96,10 +124,37 @@ def log(
if timestamp is not None:
verify_type("timestamp", timestamp, (float, int))

if not timestamp:
timestamp = time.time()
steps = None if step is None else [step]
timestamps = None if timestamp is None else [timestamp] * len(value)

ops = self._get_log_operations_from_value(value, steps=steps, timestamps=timestamps)

ops = self._get_log_operations_from_value(value, step, timestamp)
with self._container.lock():
for op in ops:
self._enqueue_operation(op, wait)

def extend(
self,
values: Collection[DataTV],
steps: Optional[Collection[float]] = None,
timestamps: Optional[Collection[float]] = None,
wait: bool = False,
**kwargs,
) -> None:
value = self._data_to_value(values, **kwargs)

if steps is not None:
verify_collection_type("steps", steps, (float, int))
if len(steps) != len(values):
raise ValueError(f"Number of steps must be equal to number of values ({len(steps)} != {len(values)}")
if timestamps is not None:
verify_collection_type("timestamps", timestamps, (float, int))
if len(timestamps) != len(values):
raise ValueError(
f"Number of timestamps must be equal to number of values ({len(timestamps)} != {len(values)}"
)

ops = self._get_log_operations_from_value(value, steps=steps, timestamps=timestamps)

with self._container.lock():
for op in ops:
Expand Down
23 changes: 13 additions & 10 deletions src/neptune/new/attributes/series/string_series.py
Expand Up @@ -15,9 +15,10 @@
#
from typing import (
TYPE_CHECKING,
Collection,
Iterable,
List,
Optional,
Union,
)

from neptune.new.attributes.series.fetchable_series import FetchableSeries
Expand All @@ -28,28 +29,30 @@
LogStrings,
Operation,
)
from neptune.new.internal.utils.iteration import get_batches
from neptune.new.internal.utils.logger import logger
from neptune.new.internal.utils.paths import path_to_str
from neptune.new.types.series.string_series import MAX_STRING_SERIES_VALUE_LENGTH
from neptune.new.types.series.string_series import StringSeries as StringSeriesVal

if TYPE_CHECKING:
from neptune.new.metadata_containers import MetadataContainer

Val = StringSeriesVal
Data = str
LogOperation = LogStrings

MAX_STRING_SERIES_VALUE_LENGTH = 1000


class StringSeries(Series[Val, Data], FetchableSeries[StringSeriesValues]):
class StringSeries(
Series[Val, Data, LogOperation], FetchableSeries[StringSeriesValues], max_batch_size=10, operation_cls=LogOperation
):
def __init__(self, container: "MetadataContainer", path: List[str]):
super().__init__(container, path)
self._value_truncation_occurred = False

def _get_log_operations_from_value(self, value: Val, step: Optional[float], timestamp: float) -> List[Operation]:
values = [v[:MAX_STRING_SERIES_VALUE_LENGTH] for v in value.values]
if not self._value_truncation_occurred and any([len(v) > MAX_STRING_SERIES_VALUE_LENGTH for v in value.values]):
def _get_log_operations_from_value(
self, value: Val, *, steps: Union[None, Collection[float]], timestamps: Union[None, Collection[float]]
) -> List[LogOperation]:
if not self._value_truncation_occurred and value.truncated:
aniezurawski marked this conversation as resolved.
Show resolved Hide resolved
# the first truncation
self._value_truncation_occurred = True
logger.warning(
Expand All @@ -60,15 +63,15 @@ def _get_log_operations_from_value(self, value: Val, step: Optional[float], time
MAX_STRING_SERIES_VALUE_LENGTH,
)

values = [LogStrings.ValueType(val, step=step, ts=timestamp) for val in values]
return [LogStrings(self._path, chunk) for chunk in get_batches(values, batch_size=10)]
return super()._get_log_operations_from_value(value, steps=steps, timestamps=timestamps)

def _get_clear_operation(self) -> Operation:
return ClearStringLog(self._path)

def _data_to_value(self, values: Iterable, **kwargs) -> Val:
if kwargs:
logger.warning("Warning: unexpected arguments (%s) in StringSeries", kwargs)

return StringSeriesVal(values)

def _is_value_type(self, value) -> bool:
Expand Down
5 changes: 5 additions & 0 deletions src/neptune/new/exceptions.py
Expand Up @@ -1354,3 +1354,8 @@ def __init__(self):
{correct}Need help?{end}-> https://docs.neptune.ai/getting_help
"""
super().__init__(message.format(**STYLES))


class NeptuneUserApiInputException(NeptuneException):
def __init__(self, message):
super().__init__(message)