-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
121 lines (95 loc) · 3.55 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import asyncio
import logging
from fastapi import APIRouter, FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
import orjson
import uvicorn
from operationsgateway_api.src.config import Config
from operationsgateway_api.src.constants import LOG_CONFIG_LOCATION, ROUTE_MAPPINGS
import operationsgateway_api.src.experiments.runners as runners
from operationsgateway_api.src.mongo.connection import ConnectionInstance
from operationsgateway_api.src.routes import (
auth,
channels,
experiments,
images,
ingest_data,
records,
waveforms,
)
# Add custom response class to deal with NaN values ingested into MongoDB
# https://github.com/ral-facilities/operationsgateway-api/pull/9 explains the reasoning
# behind this change
class ORJSONResponse(JSONResponse):
media_type = "application/json"
def render(self, content) -> bytes:
return orjson.dumps(content)
api_description = """
This API is the backend to OperationsGateway that allows users to:
- Ingest HDF files containing scalar, image and waveform data into a MongoDB instance
- Query MongoDB to get records containing data channels, using typical database filters
- Get waveform data and full-size images via specific endpoints
"""
app = FastAPI(
title="OperationsGateway API",
description=api_description,
default_response_class=ORJSONResponse,
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
def setup_logger():
# Disable excessive debug logging from suds when contacting the Scheduler via WSDL
logging.getLogger("zeep").setLevel(logging.INFO)
logging.config.fileConfig(LOG_CONFIG_LOCATION)
setup_logger()
log = logging.getLogger()
log.info("Logging now setup")
@app.on_event("startup")
async def get_experiments_on_startup():
if Config.config.experiments.scheduler_background_task_enabled:
log.info(
"Creating task for Scheduler system to be contacted for experiment details",
)
asyncio.create_task(runners.scheduler_runner.start_task())
else:
log.info("Scheduler background task has not been enabled")
@app.on_event("startup")
async def startup_mongodb_client():
ConnectionInstance()
@app.on_event("shutdown")
async def close_mongodb_client():
ConnectionInstance.db_connection.mongo_client.close()
def add_router_to_app(api_router: APIRouter):
# add this router to the FastAPI app
app.include_router(api_router)
# add an entry for each of the routes (endpoints) in that router to a dictionary
# that maps the function for the endpoint to the endpoint "path" and "method" eg.
# <function get_full_image at 0x7...9d0>, '/images/{record_id}/{channel_name} GET'
# so that the mappings can be used for authorisation
for route in api_router.routes:
ROUTE_MAPPINGS[route.endpoint] = f"{route.path} {list(route.methods)[0]}"
# Adding endpoints to FastAPI app
add_router_to_app(images.router)
add_router_to_app(ingest_data.router)
add_router_to_app(records.router)
add_router_to_app(waveforms.router)
add_router_to_app(auth.router)
add_router_to_app(channels.router)
add_router_to_app(experiments.router)
log.debug("ROUTE_MAPPINGS contents:")
for item in ROUTE_MAPPINGS.items():
log.debug(item)
if __name__ == "__main__":
uvicorn.run(
"operationsgateway_api.src.main:app",
host=Config.config.app.host,
port=Config.config.app.port,
reload=Config.config.app.reload,
log_config=LOG_CONFIG_LOCATION,
)