Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add encoded keyword for the HTTPFileSystem #1021

Merged
merged 6 commits into from
Sep 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
34 changes: 25 additions & 9 deletions fsspec/implementations/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import aiohttp
import requests
import yarl

from fsspec.asyn import AbstractAsyncStreamedFile, AsyncFileSystem, sync, sync_wrapper
from fsspec.callbacks import _DEFAULT_CALLBACK
Expand Down Expand Up @@ -53,6 +54,7 @@ def __init__(
loop=None,
client_kwargs=None,
get_client=get_client,
encoded=False,
**storage_options,
):
"""
Expand Down Expand Up @@ -90,6 +92,7 @@ def __init__(
self.cache_options = cache_options
self.client_kwargs = client_kwargs or {}
self.get_client = get_client
self.encoded = encoded
self.kwargs = storage_options
self._session = None

Expand All @@ -104,6 +107,9 @@ def __init__(
request_options.pop("skip_instance_cache", None)
self.kwargs = request_options

def encode_url(self, url):
return yarl.URL(url, encoded=self.encoded)

@staticmethod
def close_session(loop, session):
if loop is not None and loop.is_running():
Expand Down Expand Up @@ -143,7 +149,7 @@ async def _ls_real(self, url, detail=True, **kwargs):
kw.update(kwargs)
logger.debug(url)
session = await self.set_session()
async with session.get(url, **self.kwargs) as r:
async with session.get(self.encode_url(url), **self.kwargs) as r:
self._raise_not_found_for_status(r, url)
text = await r.text()
if self.simple_links:
Expand Down Expand Up @@ -217,7 +223,7 @@ async def _cat_file(self, url, start=None, end=None, **kwargs):
headers["Range"] = await self._process_limits(url, start, end)
kw["headers"] = headers
session = await self.set_session()
async with session.get(url, **kw) as r:
async with session.get(self.encode_url(url), **kw) as r:
out = await r.read()
self._raise_not_found_for_status(r, url)
return out
Expand All @@ -229,7 +235,7 @@ async def _get_file(
kw.update(kwargs)
logger.debug(rpath)
session = await self.set_session()
async with session.get(rpath, **self.kwargs) as r:
async with session.get(self.encode_url(rpath), **self.kwargs) as r:
try:
size = int(r.headers["content-length"])
except (ValueError, KeyError):
Expand Down Expand Up @@ -297,7 +303,7 @@ async def _exists(self, path, **kwargs):
try:
logger.debug(path)
session = await self.set_session()
r = await session.get(path, **kw)
r = await session.get(self.encode_url(path), **kw)
async with r:
return r.status < 400
except (requests.HTTPError, aiohttp.ClientError):
Expand Down Expand Up @@ -342,7 +348,7 @@ def _open(
if block_size and size:
return HTTPFile(
self,
path,
self.encode_url(path),
session=session,
block_size=block_size,
mode=mode,
Expand All @@ -354,7 +360,12 @@ def _open(
)
else:
return HTTPStreamFile(
self, path, mode=mode, loop=self.loop, session=session, **kw
self,
self.encode_url(path),
mode=mode,
loop=self.loop,
session=session,
**kw,
)

async def open_async(self, path, mode="rb", size=None, **kwargs):
Expand All @@ -365,7 +376,12 @@ async def open_async(self, path, mode="rb", size=None, **kwargs):
except FileNotFoundError:
pass
return AsyncStreamFile(
self, path, loop=self.loop, session=session, size=size, **kwargs
self,
self.encode_url(path),
loop=self.loop,
session=session,
size=size,
**kwargs,
)

def ukey(self, url):
Expand All @@ -389,7 +405,7 @@ async def _info(self, url, **kwargs):
try:
info.update(
await _file_info(
url,
self.encode_url(url),
size_policy=policy,
session=session,
**self.kwargs,
Expand Down Expand Up @@ -604,7 +620,7 @@ async def async_fetch_range(self, start, end):
kwargs = self.kwargs.copy()
headers = kwargs.pop("headers", {}).copy()
headers["Range"] = "bytes=%i-%i" % (start, end - 1)
logger.debug(self.url + " : " + headers["Range"])
logger.debug(str(self.url) + " : " + headers["Range"])
r = await self.session.get(self.url, headers=headers, **kwargs)
async with r:
if r.status == 416:
Expand Down
14 changes: 14 additions & 0 deletions fsspec/implementations/tests/test_http.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import asyncio
import io
import json
import os
import sys
import time

import aiohttp
import pytest

import fsspec.asyn
Expand Down Expand Up @@ -438,3 +440,15 @@ async def test_async_file(server):
out2 = await f.read()
assert data == out1 + out2
await fs._session.close()


def test_encoded(server):
fs = fsspec.filesystem("http", encoded=True)
out = fs.cat(server + "/Hello%3A%20G%C3%BCnter", headers={"give_path": "true"})
assert json.loads(out)["path"] == "/Hello%3A%20G%C3%BCnter"
with pytest.raises(aiohttp.client_exceptions.ClientError):
fs.cat(server + "/Hello: Günter", headers={"give_path": "true"})

fs = fsspec.filesystem("http", encoded=False)
out = fs.cat(server + "/Hello: Günter", headers={"give_path": "true"})
assert json.loads(out)["path"] == "/Hello:%20G%C3%BCnter"
3 changes: 3 additions & 0 deletions fsspec/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import contextlib
import json
import os
import threading
from collections import ChainMap
Expand Down Expand Up @@ -53,6 +54,8 @@ def _respond(self, code=200, headers=None, data=b""):
def do_GET(self):
file_path = self.path.rstrip("/")
file_data = self.files.get(file_path)
if "give_path" in self.headers:
return self._respond(200, data=json.dumps({"path": self.path}).encode())
if file_data is None:
return self._respond(404)
if "Range" in self.headers:
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ ignore =

[isort]
known_first_party=fsspec
known_third_party=aiohttp,dask,distributed,fuse,libarchive,numpy,panel,paramiko,pygit2,pytest,requests,s3fs,setuptools,smbclient
known_third_party=aiohttp,dask,distributed,fuse,libarchive,numpy,panel,paramiko,pygit2,pytest,requests,s3fs,setuptools,smbclient,yarl
multi_line_output=3
include_trailing_comma=True
force_grid_wrap=0
Expand Down