Skip to content

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Problem with Concurrency Request: High Response time #5846

Closed
9 tasks done
Pazzeo opened this issue Jan 6, 2023 · 11 comments
Closed
9 tasks done

Problem with Concurrency Request: High Response time #5846

Pazzeo opened this issue Jan 6, 2023 · 11 comments
Labels
question Question or problem question-migrate

Comments

@Pazzeo
Copy link

Pazzeo commented Jan 6, 2023

First Check

  • I added a very descriptive title to this issue.
  • I used the GitHub search to find a similar issue and didn't find it.
  • I searched the FastAPI documentation, with the integrated search.
  • I already searched in Google "How to X in FastAPI" and didn't find any information.
  • I already read and followed all the tutorial in the docs and didn't find an answer.
  • I already checked if it is not related to FastAPI but to Pydantic.
  • I already checked if it is not related to FastAPI but to Swagger UI.
  • I already checked if it is not related to FastAPI but to ReDoc.

Commit to Help

  • I commit to help with one of those options 👆

Example Code

main.py:

import os
import sys
import json
import time
from lib.UserID import UserID

from model.ModelUserToken import ModelUserToken

from fastapi import FastAPI, Depends, Body, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.concurrency import run_in_threadpool
from pydantic import BaseModel
from prometheus_fastapi_instrumentator import Instrumentator
import resource
import uuid
import loguru


#Load the key of the tenant
with open(os.path.join(sys.path[0], "/app/data/key.json"), "r") as f:
    data = json.load(f)

kid_crm = data['CRM']['credentialId']
signature_base64_crm = data['CRM']['signingKey']
key_base64_crm = data['CRM']['encryptionKey']

app = FastAPI()

logger = loguru.logger
logger.remove()
logger.add(sys.stdout, format="{time} - {level} - ({extra[request_id]}) {message} ", level="DEBUG")

origins = [
    '*'
    ]


app.add_middleware(
    CORSMiddleware,
    allow_origins=origins,
    allow_credentials=False,
    allow_methods=['GET','POST'],
    allow_headers=["*"]
)



@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
    request_id = str(uuid.uuid4())
    with logger.contextualize(request_id=request_id):
        logger.info("Request started")
        start_time = time.time()
        response = await call_next(request)
        process_time = time.time() - start_time
        logger.info("Request ended in " + str(process_time))
        response.headers["X-Request-ID"] = request_id
        response.headers["X-Process-Time"] = str(format(process_time))
        return response


class Response(BaseModel):
    token: str



result = {}




@app.get("/", summary="Test the availability of the service")
async def read_main():
    return {"msg": "I'm available!"}



@app.get("/token/UserID", tags=["UserID"], summary="Generate token", response_model=Response)
async def token(args: dict = Depends(ModelUserToken)):
    token = UserID(kid_crm, signature_base64_crm)
    result['token'] = await token.Create(args)
    return result


@app.on_event("startup")
async def startup():
    resource.setrlimit(resource.RLIMIT_NOFILE, (65536, 65536))
    Instrumentator(excluded_handlers=["/metrics", "/docs", "/openapi.json"]).instrument(app).expose(app, include_in_schema=False)

Description

Hello,

I'm facing an issue when I'm testing my fastAPI service simulating concurrenty requests (for example 1000 req/s).
The issue observed is on high response time, around 3s when normally the service is answering in less 0.025s.
The service is quite simple: it should generate a AWS token. So, I'm suspecting that I don't have define some fuctions in a proper way.
The service is deployed in a docker image, I'm using the tiangolo uvicorn-gunicorn-fastapi docker python 3.9.
The file I posted it is the main.py which is inside the directory app.
The app directory has the following structure:
the directory model/ contains the file ModelUserToken.py:

from typing import Optional 
from fastapi import Query
def ModelUserToken(
        userID: 
        Optional[str] = Query(
        None,
        description="Public userID",
        min_length=3,
        ),
        exp: Optional[int] = Query(
        None,
        description="Expiration of the token. Default NONE (no expiration). Max value is 90 seconds",
        gt=0,
        lte=90,
        )):
        return {"exp":exp, "userID": userID }

the directory lib/ has the files:
UserID.py

import time
from .tools import encryption, remove_none_values
from fastapi.concurrency import run_in_threadpool

class UserID(object):
    def __init__(self, kid, signature_base64):
        
        self.paylod = {}
        self.kid = kid
        self.signature_base64 = signature_base64

    async def Create(self, input):
    
        #Check if the token has an expiration time     
        if input["exp"] == None:
            self.payload = {
                "typ": "UserID",
                "ver": "1.0"
                }
        else:
            current_seconds = round(time.time()) 
            exp_token = current_seconds + input["exp"]
        
            self.payload = {
                "ver": "1.0",
                "typ": "UserID",
                "exp": exp_token

        }
        
        #remove the value exp
        del input['exp']
        
        clean_input = await run_in_threadpool(remove_none_values,input)
        #Update the self.payload if the clean_input exists
        if clean_input is not None:
            self.payload.update(clean_input)

        token = await run_in_threadpool(encryption, self.payload, self.signature_base64, self.kid)

        return token

the file tools.py

import jwt
import base64

def encryption(payload, signature, kid):
    #Generate the encrypted token
    encrypt = jwt.encode(
                payload,
                base64.b64decode(signature), 
                algorithm="HS256",
                headers={"kid": kid}
              )
    return encrypt


def remove_none_values(input):
    """
    Given a dictionary, dict, remove None values
    If the dictionary includes nested dictionaries, investigate and remove None values there too.
    The same is done if the dictionary includes nested lists
    """
    cleaned_dict = {}
    for key, value in input.items():
       # print(type(value),key)
        if isinstance(value, dict):
            nested_dict = remove_none_values(value)
            if len(nested_dict.keys()) > 0:
                cleaned_dict[key] = nested_dict
        elif isinstance(value,list):
            NewList=[]
            for dict_values in value:
                result = {key: value for key, value in dict_values.items() if value is not None}
                NewList.append(result)
            cleaned_dict[key] =  NewList
        elif value is not None:
            cleaned_dict[key] = value
    return cleaned_dict

Then, I'm using this script to test the performance with 1000 req/s

import random
import asyncio
import httpx
import json

async def request():
    async with httpx.AsyncClient() as client:
        r = await client.get("http://10.20.21.21:9080/token/UserID")
        request_id = r.headers.get("x-request-id")
        status_code = r.status_code
        print(f"Received response {status_code} for request_id={request_id}")

async def run():
    await asyncio.gather(*[request() for _ in range(1000)])

if __name__ == "__main__":
    asyncio.run(run())

During the execution, I'm observing that all requests have a time response between 1s and 2.5s.
Could you help me where the problem could be ?

Thanks in advance,
PAz

Operating System

Linux

Operating System Details

Docker Image uvicorn-gunicorn-fastapi with gunicorn 20.1.0 and 8 workers

FastAPI Version

0.88.0

Python Version

3.9.16

Additional Context

No response

@Pazzeo Pazzeo added the question Question or problem label Jan 6, 2023
@jgould22
Copy link

jgould22 commented Jan 6, 2023

A few of questions

  • 1000 reqs/s can be a lot or a little depending on the hardware you are running
  • Are you running your bench mark on the same hardware as your server
  • You are using logru which I dont know much about but make sure it doesnt block the event loop (I would remove it entirely to see if that is the issue, logging can slow things down a lot if not done correctly)

@Pazzeo
Copy link
Author

Pazzeo commented Jan 6, 2023

Hi @jgould22

The test script is executed from another server. The service fastApi is executed in a dedicated server.
logru was introduced after to debug a little what was happening. I don’t think it is introducing any blocking point, the issue was present also before.

Thanks
Paz

@mouloud11
Copy link

mouloud11 commented Jan 7, 2023

Hi @Pazzeo

Did you check around run_in_threadpool ? Openning thread is slow and too much thread can lead to a crash.
You should try to look this into that.

@iudeen
Copy link
Contributor

iudeen commented Jan 7, 2023

Can you try the same with sync route and see if same issue exist? Usually I face such issues when I mix Sync and Async functions

@jgould22
Copy link

jgould22 commented Jan 7, 2023

Did you check around run_in_threadpool ? Openning thread is slow and too much thread can lead to a crash.

Also very likely the default any io Capacity limiter is 40 iirc.

Here is the call https://github.com/encode/starlette/blob/master/starlette/concurrency.py#L35 and the docs indicate that if it is None https://anyio.readthedocs.io/en/stable/api.html#anyio.to_thread.run_sync then the default is used.

@iudeen
Copy link
Contributor

iudeen commented Jan 7, 2023

Yes agreed with @jgould22

You can modify the limit. Refer this comment

@Pazzeo
Copy link
Author

Pazzeo commented Jan 8, 2023

Hi @iudeen , @jgould22

First, thanks a lot for the help. In these days, I have tried what you proposed, but I don't observe too much improvements.
Currently, I'm reaching the best performance without using the await run_in_threadpool and removing the await in the calling of token.Create.
I have just left the async in the def token. With this setup, I have always response time between the 1 and the 1.5 seconds.
I'm not able to understand where is the bottleneck of this code.
The gunicorn is running with 8 workers and during my tests I've monitored the processes and the workload.
It seems that the job is properly distributed bitween the workers: the consumption of cpu is well distributes between the pids.

I don't know how I can improve my code to have better response time with concurrent requests.
Do you have any suggestions?

Paz

@Kludex
Copy link
Sponsor Collaborator

Kludex commented Jan 8, 2023

  1. There's no reason to use run_in_threadpool on the snippet in the description.
  2. There's a bit drop of performance on the middleware, since it's not pure ASGI middleware, but not relevant.

The problem on your benchmark is on the client side. Use a single AsyncClient and pass it to the task that is going to run multiple times.

@Pazzeo
Copy link
Author

Pazzeo commented Jan 12, 2023

Hi @Kludex

Indeed, I have removed the run_in_threadpool and the service is working better.
About the client side, how do you simulate the 1000 req/s? I though that the implementation was correct

Paz

@luebke-dev
Copy link

luebke-dev commented Jan 25, 2023

@Pazzeo

in your code you create 1000 instances of the httpx.AsyncClient, but you should create one and let that one make the 1000 requests like this

import random
import asyncio
import httpx
import json

async def request(client: httpx.AsyncClient):
    r = await client.get("http://10.20.21.21:9080/token/UserID")
    request_id = r.headers.get("x-request-id")
    status_code = r.status_code
    print(f"Received response {status_code} for request_id={request_id}")

async def run():
    client = httpx.AsyncClient()
    await asyncio.gather(*[request(client) for _ in range(1000)])

if __name__ == "__main__":
    asyncio.run(run())

@raphaelauv
Copy link
Contributor

a full example of how using and TEST a fastapi endpoint with a httpx client -> https://github.com/raphaelauv/fastAPI-httpx-example

Repository owner locked and limited conversation to collaborators Feb 24, 2023
@tiangolo tiangolo converted this issue into discussion #6059 Feb 24, 2023

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

Labels
question Question or problem question-migrate
Projects
None yet
Development

No branches or pull requests

8 participants