Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow retrieval of binary data from WSClient output stream #1471

Closed
FabianNiehaus opened this issue May 17, 2021 · 25 comments · Fixed by #2194 · May be fixed by #1805
Closed

Allow retrieval of binary data from WSClient output stream #1471

FabianNiehaus opened this issue May 17, 2021 · 25 comments · Fixed by #2194 · May be fixed by #1805
Assignees
Labels
kind/feature Categorizes issue or PR as related to a new feature.

Comments

@FabianNiehaus
Copy link

What is the feature and why do you need it:

The API client currently does not offer any capabilities for copying files from Pods / containers (similiar to kubectl cp).
We have created a workaround by opening a stream to the Pod, creating a tar archive of the files to copy, and outputting the data to stdout. Once retrieved, the data is then extracted from the archive.

However, this does not work for binary files due to how the WSClient handles incoming data.

kubernetes\stream\ws_client.py:161-178 (kubernetes v17.17.0)

    def update(self, timeout=0):
        """Update channel buffers with at most one complete frame of input."""
        if not self.is_open():
            return
        if not self.sock.connected:
            self._connected = False
            return
        r, _, _ = select.select(
            (self.sock.sock, ), (), (), timeout)
        if r:
            op_code, frame = self.sock.recv_data_frame(True)
            if op_code == ABNF.OPCODE_CLOSE:
                self._connected = False
                return
            elif op_code == ABNF.OPCODE_BINARY or op_code == ABNF.OPCODE_TEXT:
                data = frame.data
                if six.PY3:
                    data = data.decode("utf-8", "replace")

The last line always tries to decode to UTF-8 while replacing all characters that cannot be properly decoded.
In the case of binary PCAP files, this results in corrupted data.

Describe the solution you'd like to see:
I think that changing the signature of update to allow the user the following options might resolve the issue:

  1. Choose a different error handling than replace. According to the docs, strict and ignore are options as well. I tested out ignore and it results in the desired output when decoding and then encoding again, which both byte object being the same.
  2. Allow the user to skip conversion alltogether. This would mean adding a flag which would cause skipping of
    if six.PY3:
        data = data.decode("utf-8", "replace")

If needed, I can implement the agreed on solution myself and open a pull request.

@FabianNiehaus FabianNiehaus added the kind/feature Categorizes issue or PR as related to a new feature. label May 17, 2021
@roycaihw
Copy link
Member

It's a reasonable request. We'd like to review it if you could send a PR

/assign @FabianNiehaus

@roycaihw
Copy link
Member

cc @yliaog

@FabianNiehaus
Copy link
Author

After some trial and error, I realized that wsclient.update() is run regardless of the channel queried. This means that while running wsclient.peak_stderr(), wsclient.update() may retrieve data for the stdout channel. This means that the channel that should be retrieved as bytes need to be set on a wider scope to ensure consitency and avoid bloated method signatures.

I will try to add an instance variable raw_channels = () to the WSClient class, and adjust the decoding logic to skip decoding for channels in that list.

@FabianNiehaus
Copy link
Author

FabianNiehaus commented May 26, 2021

Turns out that during wsclient.update(), the (usually utf-8 encoded) data is passed on to StringIO.write(). This method apparently cannot deal with byte objects, resulting in an exception being thrown.

Code:

            elif op_code == ABNF.OPCODE_BINARY or op_code == ABNF.OPCODE_TEXT:
                data = frame.data
                if len(data) > 1:
                    channel = data[0]
                    if six.PY3 and channel not in self.raw_channels:
                        data = data.decode("utf-8", "replace")
                    data = data[1:]
                    if data:
                        if channel in [STDOUT_CHANNEL, STDERR_CHANNEL]:
                            # keeping all messages in the order they received
                            # for non-blocking call.
                            self._all.write(data)  # !!! Data passed to StringIO object !!!
                        if channel not in self._channels:
                            self._channels[channel] = data
                        else:
                            self._channels[channel] += data

Error:

Traceback (most recent call last):
  [...]
  File "/home/fabian/kubernetes-client/python/kubernetes/stream/ws_client.py", line 139, in read_stderr
    return self.read_channel(STDERR_CHANNEL, timeout=timeout)
  File "/home/fabian/kubernetes-client/python/kubernetes/stream/ws_client.py", line 81, in read_channel
    ret = self.peek_channel(channel, timeout)
  File "/home/fabian/kubernetes-client/python/kubernetes/stream/ws_client.py", line 73, in peek_channel
    self.update(timeout=timeout)
  File "/home/fabian/kubernetes-client/python/kubernetes/stream/ws_client.py", line 192, in update
    self._all.write(data)
TypeError: string argument expected, got 'bytes'

I will try to avoid this by converting the byte sequence into a string without decoding, to it can be cast back later.

@hatharom
Copy link

+1 for the issue
Without this copying is totally useless.

@FabianNiehaus
Copy link
Author

FabianNiehaus commented Aug 16, 2021

Alright, I finally got around to working on this again.

As stated before, the use of StringIO in the client bars me from storing the raw bytes.

There were two issues to solve: Which data type to use for conversion and under which conditions to convert.

Data type selection:

  • Converting bytes to string and reversing: Python lets me convert raw bytes to a string, keeping the bytecodes (i.e. \x01 etc.). However, I was not able to find any way to convert this string object back into a bytes object without encoding.
  • Not converting at all: While this is easily doable, StringIO absolutely requires an str object.
  • Converting bytes to hex and reversing: This worked flawlessly with built-in tools. By adding the common prefix 0x, hexadecimal output can also be detected programatically. Finally, this works with StringIO and thus allows consistent use of read_all while also being safe to output to console or handle as a string.

Conditions for conversion:

  • Converting on decode failure: While converting automatically when decoding fails seems like a comfortable and transparent way to handle all scenarios, there is one issue. Large binary files may be broken into chunks for transmission. Some of these chunks may be decodable, while other may fail to decode. This leaves us with an incosistent result.
  • Converting based on user-defined channels list: While this requires the user to know what he wants to copy beforehand, it serves as decent solution for the problem stated beforehand.

For now, I have a working solution. I am however not quite satisfied with the results, as it is not really intuitive to use and requires some knowledge about the inner workings of the API client.

Click to view code for WSClient
    def __init__(self, configuration, url, headers, capture_all):
        """A websocket client with support for channels.

            Exec command uses different channels for different streams. for
        example, 0 is stdin, 1 is stdout and 2 is stderr. Some other API calls
        like port forwarding can forward different pods' streams to different
        channels.
        """
        self._connected = False
        self._channels = {}
        if capture_all:
            self._all = StringIO()
        else:
            self._all = _IgnoredIO()
        self.sock = create_websocket(configuration, url, headers)
        self._connected = True

        # channels to be dumped to hex rather than utf-8 decoded
        # to be set during runtime / after client creation
        self.hexdump_channels = []

    def update(self, timeout=0):
        """Update channel buffers with at most one complete frame of input."""
        if not self.is_open():
            return
        if not self.sock.connected:
            self._connected = False
            return
        r, _, _ = select.select(
            (self.sock.sock, ), (), (), timeout)
        if r:
            op_code, frame = self.sock.recv_data_frame(True)
            if op_code == ABNF.OPCODE_CLOSE:
                self._connected = False
                return
            elif op_code == ABNF.OPCODE_BINARY or op_code == ABNF.OPCODE_TEXT:
                data = frame.data
                if six.PY3:
                    data = data.decode("utf-8", "ignore")
                if len(data) > 1:
                    channel = ord(data[0])
                    data = data[1:]
                    if data:
                        if channel in self.hexdump_channels:
                            # retrieve raw data from stream as hex
                            data = '0x' + frame.data[1:].hex()
                        if channel in [STDOUT_CHANNEL, STDERR_CHANNEL]:
                            # keeping all messages in the order they received
                            # for non-blocking call.
                            self._all.write(data)
                        if channel not in self._channels:
                            self._channels[channel] = data
                        else:
                            self._channels[channel] += data

An alternative approach:
The most elegant solution would of course be to add a copy method to the client, similar to the functionality offered by kubectl.

@FabianNiehaus
Copy link
Author

I adjusted my code for copying to be more generalistic. However, I don't quite know where to add it to the project. CoreV1Api seems to be auto-generated, so this handwritten code should probably be added somewhere else.

Click to view code
import os
import tarfile
import tempfile
import threading
from pathlib import Path

import kubernetes
from kubernetes.client import CoreV1Api


class MyClass:

    def copy_from_pod(self, name, namespace, source, destination, **kwargs):
        """copy_from_pod  # noqa: E501

        copy file from a pod  # noqa: E501
        This method makes a synchronous HTTP request by default. To make an
        asynchronous HTTP request, please pass async_req=True
        >>> thread = api.copy_from_node(name, namespace, source, destination, async_req=True)
        >>> result = thread.get()

        :param async_req bool: execute request asynchronously
        :param str name: name of the PodExecOptions (required)
        :param str namespace: object name and auth scope, such as for teams and projects (required)
        :param str source: File path to retrieve from the pod.
        :param str destination: Destination file name on the local system.

        :return: str
                 If the method is called asynchronously,
                 returns the request thread.
        """

        def cp():
            # -c    create a new archive
            # -m    don't extract file modified time
            # -f    use archive file or device ARCHIVE
            exec_command = ["tar", "cmf", "-", source]
            with tempfile.TemporaryFile(mode="w+b") as tar_buffer:
                wsclient = kubernetes.stream.stream(
                    CoreV1Api().connect_get_namespaced_pod_exec,
                    name=name,
                    namespace=namespace,
                    command=exec_command,
                    stdout=True,
                    stderr=True,
                    _preload_content=False,
                )

                wsclient.hexdump_channels = [1]
                out = ''
                while wsclient.is_open():
                    wsclient.update(timeout=1)
                    if wsclient.peek_stdout():
                        out += wsclient.read_stdout()
                    if wsclient.peek_stderr():
                        err: str = wsclient.read_stderr()
                        raise RuntimeError(err)
                wsclient.close()

                tar_buffer.write(bytes.fromhex(out[2:]))  # strip 0x
                tar_buffer.seek(0)

                destination_dir = Path(destination).parent
                if not destination_dir.exists():
                    os.makedirs(destination_dir)

                with tarfile.open(fileobj=tar_buffer, mode="r:") as tar:
                    for member in tar.getmembers():
                        if member.isdir():
                            continue
                        tar.makefile(member, Path(destination))

        request_thread = threading.Thread(target=cp)
        request_thread.start()

        if kwargs.get("async_req") is True:
            return request_thread
        else:
            request_thread.join()
            if Path(destination).is_absolute():
                return destination
            else:
                return str(Path(os.getcwd(), destination).absolute())

@roycaihw Do you have a suggestion?

Also, am I correct that this code would need to be made compatible with Python 2.7? Even though that is EOL by now?

@k8s-triage-robot
Copy link

The Kubernetes project currently lacks enough contributors to adequately respond to all issues and PRs.

This bot triages issues and PRs according to the following rules:

  • After 90d of inactivity, lifecycle/stale is applied
  • After 30d of inactivity since lifecycle/stale was applied, lifecycle/rotten is applied
  • After 30d of inactivity since lifecycle/rotten was applied, the issue is closed

You can:

  • Mark this issue or PR as fresh with /remove-lifecycle stale
  • Mark this issue or PR as rotten with /lifecycle rotten
  • Close this issue or PR with /close
  • Offer to help out with Issue Triage

Please send feedback to sig-contributor-experience at kubernetes/community.

/lifecycle stale

@k8s-ci-robot k8s-ci-robot added the lifecycle/stale Denotes an issue or PR has remained open with no activity and has become stale. label Nov 14, 2021
@FabianNiehaus
Copy link
Author

I adjusted my code for copying to be more generalistic. However, I don't quite know where to add it to the project. CoreV1Api seems to be auto-generated, so this handwritten code should probably be added somewhere else.
Click to view code

@roycaihw Do you have a suggestion?

Also, am I correct that this code would need to be made compatible with Python 2.7? Even though that is EOL by now?

@roycaihw Any inputs on this?

/remove-lifecycle stale

@k8s-ci-robot k8s-ci-robot removed the lifecycle/stale Denotes an issue or PR has remained open with no activity and has become stale. label Nov 15, 2021
@k8s-triage-robot
Copy link

The Kubernetes project currently lacks enough contributors to adequately respond to all issues and PRs.

This bot triages issues and PRs according to the following rules:

  • After 90d of inactivity, lifecycle/stale is applied
  • After 30d of inactivity since lifecycle/stale was applied, lifecycle/rotten is applied
  • After 30d of inactivity since lifecycle/rotten was applied, the issue is closed

You can:

  • Mark this issue or PR as fresh with /remove-lifecycle stale
  • Mark this issue or PR as rotten with /lifecycle rotten
  • Close this issue or PR with /close
  • Offer to help out with Issue Triage

Please send feedback to sig-contributor-experience at kubernetes/community.

/lifecycle stale

@k8s-ci-robot k8s-ci-robot added the lifecycle/stale Denotes an issue or PR has remained open with no activity and has become stale. label Feb 13, 2022
@k8s-triage-robot
Copy link

The Kubernetes project currently lacks enough active contributors to adequately respond to all issues and PRs.

This bot triages issues and PRs according to the following rules:

  • After 90d of inactivity, lifecycle/stale is applied
  • After 30d of inactivity since lifecycle/stale was applied, lifecycle/rotten is applied
  • After 30d of inactivity since lifecycle/rotten was applied, the issue is closed

You can:

  • Mark this issue or PR as fresh with /remove-lifecycle rotten
  • Close this issue or PR with /close
  • Offer to help out with Issue Triage

Please send feedback to sig-contributor-experience at kubernetes/community.

/lifecycle rotten

@k8s-ci-robot k8s-ci-robot added lifecycle/rotten Denotes an issue or PR that has aged beyond stale and will be auto-closed. and removed lifecycle/stale Denotes an issue or PR has remained open with no activity and has become stale. labels Mar 15, 2022
@motey
Copy link

motey commented Mar 25, 2022

/remove-lifecycle rotten

@k8s-ci-robot k8s-ci-robot removed the lifecycle/rotten Denotes an issue or PR that has aged beyond stale and will be auto-closed. label Mar 25, 2022
@k8s-triage-robot
Copy link

The Kubernetes project currently lacks enough contributors to adequately respond to all issues and PRs.

This bot triages issues and PRs according to the following rules:

  • After 90d of inactivity, lifecycle/stale is applied
  • After 30d of inactivity since lifecycle/stale was applied, lifecycle/rotten is applied
  • After 30d of inactivity since lifecycle/rotten was applied, the issue is closed

You can:

  • Mark this issue or PR as fresh with /remove-lifecycle stale
  • Mark this issue or PR as rotten with /lifecycle rotten
  • Close this issue or PR with /close
  • Offer to help out with Issue Triage

Please send feedback to sig-contributor-experience at kubernetes/community.

/lifecycle stale

@k8s-ci-robot k8s-ci-robot added the lifecycle/stale Denotes an issue or PR has remained open with no activity and has become stale. label Jun 23, 2022
@k8s-triage-robot
Copy link

The Kubernetes project currently lacks enough active contributors to adequately respond to all issues and PRs.

This bot triages issues and PRs according to the following rules:

  • After 90d of inactivity, lifecycle/stale is applied
  • After 30d of inactivity since lifecycle/stale was applied, lifecycle/rotten is applied
  • After 30d of inactivity since lifecycle/rotten was applied, the issue is closed

You can:

  • Mark this issue or PR as fresh with /remove-lifecycle rotten
  • Close this issue or PR with /close
  • Offer to help out with Issue Triage

Please send feedback to sig-contributor-experience at kubernetes/community.

/lifecycle rotten

@k8s-ci-robot k8s-ci-robot added lifecycle/rotten Denotes an issue or PR that has aged beyond stale and will be auto-closed. and removed lifecycle/stale Denotes an issue or PR has remained open with no activity and has become stale. labels Jul 23, 2022
@motey
Copy link

motey commented Jul 23, 2022

/remove-lifecycle rotten
there is still a pending PR

@k8s-ci-robot k8s-ci-robot removed the lifecycle/rotten Denotes an issue or PR that has aged beyond stale and will be auto-closed. label Jul 23, 2022
@motey
Copy link

motey commented Sep 28, 2022

@roycaihw can you give some hints here #1471 (comment)

There is a solution but it just lacks at knowledge on how to integrate it.
This feature would enable so much more stuff.

@k8s-triage-robot
Copy link

The Kubernetes project currently lacks enough contributors to adequately respond to all issues and PRs.

This bot triages issues and PRs according to the following rules:

  • After 90d of inactivity, lifecycle/stale is applied
  • After 30d of inactivity since lifecycle/stale was applied, lifecycle/rotten is applied
  • After 30d of inactivity since lifecycle/rotten was applied, the issue is closed

You can:

  • Mark this issue or PR as fresh with /remove-lifecycle stale
  • Mark this issue or PR as rotten with /lifecycle rotten
  • Close this issue or PR with /close
  • Offer to help out with Issue Triage

Please send feedback to sig-contributor-experience at kubernetes/community.

/lifecycle stale

@k8s-ci-robot k8s-ci-robot added the lifecycle/stale Denotes an issue or PR has remained open with no activity and has become stale. label Dec 27, 2022
@motey
Copy link

motey commented Dec 27, 2022

These bots suck
/remove-lifecycle stale

@k8s-ci-robot k8s-ci-robot removed the lifecycle/stale Denotes an issue or PR has remained open with no activity and has become stale. label Dec 27, 2022
@k8s-triage-robot
Copy link

The Kubernetes project currently lacks enough contributors to adequately respond to all issues.

This bot triages un-triaged issues according to the following rules:

  • After 90d of inactivity, lifecycle/stale is applied
  • After 30d of inactivity since lifecycle/stale was applied, lifecycle/rotten is applied
  • After 30d of inactivity since lifecycle/rotten was applied, the issue is closed

You can:

  • Mark this issue as fresh with /remove-lifecycle stale
  • Close this issue with /close
  • Offer to help out with Issue Triage

Please send feedback to sig-contributor-experience at kubernetes/community.

/lifecycle stale

@k8s-ci-robot k8s-ci-robot added the lifecycle/stale Denotes an issue or PR has remained open with no activity and has become stale. label Mar 27, 2023
@motey
Copy link

motey commented Mar 27, 2023

/remove-lifecycle stale

@k8s-ci-robot k8s-ci-robot removed the lifecycle/stale Denotes an issue or PR has remained open with no activity and has become stale. label Mar 27, 2023
@k8s-triage-robot
Copy link

The Kubernetes project currently lacks enough contributors to adequately respond to all issues.

This bot triages un-triaged issues according to the following rules:

  • After 90d of inactivity, lifecycle/stale is applied
  • After 30d of inactivity since lifecycle/stale was applied, lifecycle/rotten is applied
  • After 30d of inactivity since lifecycle/rotten was applied, the issue is closed

You can:

  • Mark this issue as fresh with /remove-lifecycle stale
  • Close this issue with /close
  • Offer to help out with Issue Triage

Please send feedback to sig-contributor-experience at kubernetes/community.

/lifecycle stale

@k8s-ci-robot k8s-ci-robot added the lifecycle/stale Denotes an issue or PR has remained open with no activity and has become stale. label Jun 25, 2023
@motey
Copy link

motey commented Jun 27, 2023

/remove-lifecycle stale

@k8s-ci-robot k8s-ci-robot removed the lifecycle/stale Denotes an issue or PR has remained open with no activity and has become stale. label Jun 27, 2023
@k8s-triage-robot
Copy link

The Kubernetes project currently lacks enough contributors to adequately respond to all issues.

This bot triages un-triaged issues according to the following rules:

  • After 90d of inactivity, lifecycle/stale is applied
  • After 30d of inactivity since lifecycle/stale was applied, lifecycle/rotten is applied
  • After 30d of inactivity since lifecycle/rotten was applied, the issue is closed

You can:

  • Mark this issue as fresh with /remove-lifecycle stale
  • Close this issue with /close
  • Offer to help out with Issue Triage

Please send feedback to sig-contributor-experience at kubernetes/community.

/lifecycle stale

@k8s-ci-robot k8s-ci-robot added the lifecycle/stale Denotes an issue or PR has remained open with no activity and has become stale. label Jan 23, 2024
@jonmeredith
Copy link

/remove-lifecycle stale

@k8s-ci-robot k8s-ci-robot removed the lifecycle/stale Denotes an issue or PR has remained open with no activity and has become stale. label Feb 5, 2024
@meln5674
Copy link
Contributor

It looks like any attempt at this was abandoned. I have eadfcaa which takes the approach of adding a backward-compatible option to specify to use binary data for all channels. It is passing existing tests, as well as new ones which work with binary data. I am going to try it "in the wild", and open a PR if I don't find any egregious issues.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/feature Categorizes issue or PR as related to a new feature.
Projects
None yet
8 participants