Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve logging test and increase coverage on server.py #1743

Merged
merged 24 commits into from Nov 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion setup.cfg
Expand Up @@ -82,7 +82,7 @@ plugins =

[coverage:report]
precision = 2
fail_under = 97.92
fail_under = 98.30
show_missing = true
skip_covered = true
exclude_lines =
Expand Down
31 changes: 31 additions & 0 deletions tests/middleware/test_logging.py
@@ -1,5 +1,7 @@
import contextlib
import logging
import socket
import sys

import httpx
import pytest
Expand Down Expand Up @@ -155,6 +157,35 @@ async def test_default_logging(use_colors, caplog, logging_config):
assert "Shutting down" in messages.pop(0)


@pytest.mark.anyio
@pytest.mark.skipif(sys.platform == "win32", reason="require unix-like system")
async def test_running_log_using_uds(caplog, short_socket_name): # pragma: py-win32
config = Config(app=app, uds=short_socket_name)
with caplog_for_logger(caplog, "uvicorn.access"):
async with run_server(config):
...

messages = [record.message for record in caplog.records if "uvicorn" in record.name]
assert (
f"Uvicorn running on unix socket {short_socket_name} (Press CTRL+C to quit)"
in messages
)


@pytest.mark.anyio
@pytest.mark.skipif(sys.platform == "win32", reason="require unix-like system")
async def test_running_log_using_fd(caplog): # pragma: py-win32
with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
fd = sock.fileno()
config = Config(app=app, fd=fd)
with caplog_for_logger(caplog, "uvicorn.access"):
async with run_server(config):
...
sockname = sock.getsockname()
messages = [record.message for record in caplog.records if "uvicorn" in record.name]
assert f"Uvicorn running on socket {sockname} (Press CTRL+C to quit)" in messages


@pytest.mark.anyio
async def test_unknown_status_code(caplog):
async def app(scope, receive, send):
Expand Down
14 changes: 8 additions & 6 deletions uvicorn/server.py
Expand Up @@ -102,7 +102,9 @@ async def startup(self, sockets: list = None) -> None:
# Explicitly passed a list of open sockets.
# We use this when the server is run from a Gunicorn worker.

def _share_socket(sock: socket.SocketType) -> socket.SocketType:
def _share_socket(
sock: socket.SocketType,
) -> socket.SocketType: # pragma py-linux pragma: py-darwin
# Windows requires the socket be explicitly shared across
# multiple workers (processes).
from socket import fromshare # type: ignore
Expand All @@ -113,14 +115,14 @@ def _share_socket(sock: socket.SocketType) -> socket.SocketType:
self.servers = []
for sock in sockets:
if config.workers > 1 and platform.system() == "Windows":
sock = _share_socket(sock)
sock = _share_socket(sock) # pragma py-linux pragma: py-darwin
server = await loop.create_server(
create_protocol, sock=sock, ssl=config.ssl, backlog=config.backlog
)
self.servers.append(server)
listeners = sockets

elif config.fd is not None:
elif config.fd is not None: # pragma: py-win32
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add # pragma: py-win32 because we use socket.AF_UNIX in all instances.

Copy link
Sponsor Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, you mean in the line after... Got it. Yep!

# Use an existing socket, from a file descriptor.
sock = socket.fromfd(config.fd, socket.AF_UNIX, socket.SOCK_STREAM)
server = await loop.create_server(
Expand All @@ -130,7 +132,7 @@ def _share_socket(sock: socket.SocketType) -> socket.SocketType:
listeners = server.sockets
self.servers = [server]

elif config.uds is not None:
elif config.uds is not None: # pragma: py-win32
# Create a socket using UNIX domain socket.
uds_perms = 0o666
if os.path.exists(config.uds):
Expand Down Expand Up @@ -174,14 +176,14 @@ def _share_socket(sock: socket.SocketType) -> socket.SocketType:
def _log_started_message(self, listeners: Sequence[socket.SocketType]) -> None:
config = self.config

if config.fd is not None:
if config.fd is not None: # pragma: py-win32
sock = listeners[0]
logger.info(
"Uvicorn running on socket %s (Press CTRL+C to quit)",
sock.getsockname(),
)

elif config.uds is not None:
elif config.uds is not None: # pragma: py-win32
logger.info(
"Uvicorn running on unix socket %s (Press CTRL+C to quit)", config.uds
)
Expand Down