Skip to content

A SaaS Solution for Neutrino Event Reconstruction using the Skymap Scanner

License

Notifications You must be signed in to change notification settings

WIPACrepo/SkyDriver

Repository files navigation

GitHub release (latest by date including pre-releases) Lines of code GitHub issues GitHub pull requests

SkyDriver v1

A SaaS Solution for Neutrino Event Reconstruction using the Skymap Scanner

Overview

SkyDriver automates the entire scanning of an event: starting all servers and workers, transferring all needed data, and finally, all tear-down. SkyDriver also includes a database for storing scan requests, progress reports, and results. The computational engine for a scan is the Skymap Scanner. The main interface is a REST server with several routes and methods.

One of many workflows may be:

  1. Request a scan (POST @ /scan)
  2. Monitor the scanning status (GET @ /scan/SCAN_ID/status)
  3. Check for progress updates (GET @ /scan/SCAN_ID/manifest)
  4. Check for partial results (GET @ /scan/SCAN_ID/result)
  5. Get a final result (GET @ /scan/SCAN_ID/result)
  6. Make plots

Another workflow:

  1. Find a scan id for a particular run and event (GET @ /scans/find)
  2. Get the scan's manifest and result (GET @ /scan/SCAN_ID)

 

Getting Started

Users interface with SkyDriver via REST calls, so first, you will need to get a connection. This example uses wipac-rest-tools:

from rest_tools.client import RestClient, SavedDeviceGrantAuth

def get_rest_client() -> RestClient:
    """Get REST client for talking to SkyDriver.

    This will present a QR code in the terminal for initial validation.
    """

    # NOTE: If your script will not be interactive (like a cron job),
    # then you need to first run your script manually to validate using
    # the QR code in the terminal.

    return SavedDeviceGrantAuth(
        "https://skydriver.icecube.aq",
        token_url="https://keycloak.icecube.wisc.edu/auth/realms/IceCube",
        filename="device-refresh-token",
        client_id="skydriver-external",
        retries=0,
    )

rc = get_rest_client()

Now, you can make all the REST calls needed:

rc.request_seq(method, path, args_dict)

Two Quick Examples

To request a new scan (see POST @ /scan):

manifest = rc.request_seq("POST", "/scan", {"docker_tag": ...})
print(json.dumps(manifest))

To see your scan's status (see GET @ /scan/SCAN_ID/status):

status = rc.request_seq("GET", f"/scan/{scan_id}/status")
print(json.dumps(status))

Refer to the REST API section for comprehensive documentation detailing the available interactions with SkyDriver.

Also, see Using a Scan Result Outside of SkyDriver.

 

REST API

Documentation for the public-facing routes and method

 

/scan - POST


Launch a new scan of an event

Arguments

Argument Type Required/Default Description
"docker_tag" str [REQUIRED] the docker tag of the Skymap Scanner image (must be in CVMFS). Ex: v3.1.4, v3.5, v3, latest, eqscan-6207146 (branch-based tag)
"cluster" dict or list [REQUIRED] the worker cluster(s) to use along with the number of workers for each: Example: {"sub-2": 1234}. NOTE: To request a schedd more than once, provide a list of 2-lists instead (Ex: [ ["sub-2", 56], ["sub-2", 1234] ])
"reco_algo" bool [REQUIRED] which reco algorithm to use (see Skymap Scanner)
"event_i3live_json" dict or str [REQUIRED] Realtime's JSON event format
"nsides" dict [REQUIRED] the nside progression to use (see Skymap Scanner)
"real_or_simulated_event" str [REQUIRED] whether this event is real or simulated. Ex: real, simulated
"max_pixel_reco_time" int [REQUIRED] the max amount of time (seconds) each pixel's reco should take (accurate values will evict pixels from slow workers thereby re-delivering to faster workers -- slow workers are unavoidable due to non-deterministic errors)
"max_worker_runtime" int default: 4*60*60 the max amount of time (second) each client worker can work for (larger values are needed as the event size increases AND the workforce size decreases)
"skyscan_mq_client_timeout_wait_for_first_message" int default: image's default value how long a client can wait for its first message (pixel) before giving up and exiting
"scanner_server_memory" str default: 1024M how much memory for the scanner server to request
"worker_memory" str default: 8G how much memory per client worker to request
"worker_disk" str default: 1G how much disk per client worker to request
"debug_mode" str or list default: None what debug mode(s) to use: "client-logs" collects the scanner clients' stderr/stdout including icetray logs (scans are limited in # of workers)
"predictive_scanning_threshold" float default: 1.0 the predictive scanning threshold [0.1, 1.0] (see Skymap Scanner)
"priority" int default: 0 the relative priority of this scan -- higher values indicate higher priority. NOTE: Values >= 10 are reserved for Realtime alert scans (these scan requests are not throttled). Also, see HTCondor jobs
"classifiers" dict[str, str | bool | float | int] default: {} a user-defined collection of labels, attributes, etc. -- this is constrained in size and is intended for user-defined metadata only
"manifest_projection" list default: all fields but these which Manifest fields to include in the response (include * to include all fields)

SkyDriver Effects

  • Creates and starts a new Skymap Scanner instance spread across many client workers
  • The new scanner will send updates routinely and when the scan completes (see GET (manifest) and GET (result))

Returns

dict - Manifest

 

/scan/SCAN_ID/manifest - GET


Retrieve the manifest of a scan

Arguments

Argument Type Required/Default Description
"include_deleted" bool default: False Not normally needed -- True prevents a 404 error if the scan was deleted (aborted)

SkyDriver Effects

None

Returns

dict - Manifest

 

/scan/SCAN_ID/result - GET


Retrieve the result of a scan

Arguments

Argument Type Required/Default Description
"include_deleted" bool default: False Not normally needed -- True prevents a 404 error if the scan was deleted (aborted)

SkyDriver Effects

None

Returns

dict - Result

 

/scan/SCAN_ID - GET


Retrieve the manifest and result of a scan

Arguments

Argument Type Required/Default Description
"include_deleted" bool default: False Not normally needed -- True prevents a 404 error if the scan was deleted (aborted)

SkyDriver Effects

None

Returns

{
    "manifest": Manifest dict,
    "result": Result dict,
}

 

/scan/SCAN_ID - DELETE


Abort a scan and/or mark scan (manifest and result) as "deleted"

Arguments

Argument Type Required/Default Description
"delete_completed_scan" bool default: False whether to mark a completed scan as "deleted" -- this is not needed for aborting an ongoing scan
"manifest_projection" list default: all fields but these which Manifest fields to include in the response (include * to include all fields)

SkyDriver Effects

  • The Skymap Scanner instance is stopped and removed
  • The scan's manifest and result are marked as "deleted" in the database

Returns

{
    "manifest": Manifest dict,
    "result": Result dict,
}

 

/scans/find - POST


Retrieve scan manifests corresponding to a specific search query

Arguments

Argument Type Required/Default Description
"filter" dict [REQUIRED] a MongoDB-syntax filter for Manifest
"include_deleted" bool default: False whether to include deleted scans (overwritten by filter's is_deleted)
"manifest_projection" list default: all fields but these which Manifest fields to include in the response (include * to include all fields)
Example

One simple "filter" may be:

{
    "filter": {
       "event_metadata.run_id": 123456789,
       "event_metadata.event_id": 987654321,
       "event_metadata.is_real_event": True,
    }
}

See https://www.mongodb.com/docs/manual/tutorial/query-documents/ for more complex queries.

SkyDriver Effects

None

Returns

{
    "manifests": list[Manifest dict],
}

 

/scans/backlog - GET


Retrieve entire backlog list

Arguments

None

SkyDriver Effects

None

Returns

{
    "entries": [
        {
            "scan_id": str,
            "timestamp": float,
            "pending_timestamp": float
        },
        ...
    ]
}

 

/scan/SCAN_ID/status - GET


Retrieve the status of a scan

Arguments

Argument Type Required/Default Description
"include_pod_statuses" bool False whether to include the k8s pod statuses for the clientmanager & central server -- expends additional resources

SkyDriver Effects

None

Returns

{
    "scan_state": str,  # a short human-readable code
    "is_deleted": bool,
    "scan_complete": bool,  # workforce is done
    "pods": {  # field is included only if `include_pod_statuses == True`
        "pod_status": dict,  # a large k8s status object
        "pod_status_message": str,  # a human-readable message explaining the pod status retrieval
    }
    "clusters": list,  # same as Manifest's clusters field
}
Scan State Codes

There are several codes for scan_state:

  • Successful state
    • SCAN_FINISHED_SUCCESSFULLY
  • Non-finished scan states (in reverse order of occurrence)
    • IN_PROGRESS__PARTIAL_RESULT_GENERATED
    • IN_PROGRESS__WAITING_ON_FIRST_PIXEL_RECO
    • PENDING__WAITING_ON_CLUSTER_STARTUP or PENDING__WAITING_ON_SCANNER_SERVER_STARTUP
    • PENDING__PRESTARTUP
  • The above non-finished states have equivalents in the case that the scan failed and/or aborted
    • STOPPED__PARTIAL_RESULT_GENERATED
    • STOPPED__WAITING_ON_FIRST_PIXEL_RECO
    • STOPPED__WAITING_ON_CLUSTER_STARTUP or STOPPED__WAITING_ON_SCANNER_SERVER_STARTUP
    • STOPPED__PRESTARTUP
    • NOTE: a failed scan my not have an above code automatically, and may need a DELETE request to get the code. Until then, it will retain an non-finished state code.

 

/scan/SCAN_ID/logs - GET


Retrieve the logs of a scan's pod: central server & client starter(s)

Arguments

None

SkyDriver Effects

None

Returns

{
    "pod_container_logs": str | list[ dict[str,str] ],  # list
    "pod_container_logs_message": str,  # a human-readable message explaining the log retrieval
}

 

Return Types


Manifest

A dictionary containing non-physics metadata on a scan

Pseudo-code:

{
    scan_id: str,

    timestamp: float,
    is_deleted: bool,

    event_i3live_json_dict: dict,
    scanner_server_args: str,

    priority: int,

    classifiers: dict[str, str | bool | float | int]

    event_i3live_json_dict__hash: str,  # a deterministic hash of the event json

    ewms_task: {
        tms_args: list[str],
        env_vars: dict[str, dict[str, Any]],
        clusters: [  # 2 types: condor & k8s -- different 'location' sub-fields
            {
                orchestrator: 'condor',
                location: {
                    collector: str,
                    schedd: str,
                },
                cluster_id: int,
                n_workers: int,
                starter_info: dict,
                statuses: {
                    'Completed': {  # condor job status
                        'FatalError': int,  # pilot status value -> # of jobs
                        'Done': int,  # pilot status value -> # of jobs
                        ...
                    },
                    'Running': {
                        'Tasking': int,
                        ...
                    }
                    ...
                },
                top_task_errors: dict[str, int],  # error message -> # of jobs
            },
            ...
            {
                orchestrator: 'k8s',
                location: {
                    host: str,
                    namespace: str,
                },
                cluster_id: int,
                n_workers: int,
                starter_info: dict,
            },
            ...
        ],
        # signifies scanner is done (server and worker cluster(s))
        complete: bool,
    },

    # found/created during first few seconds of scanning
    event_metadata: {
        run_id: int,
        event_id: int,
        event_type: str,
        mjd: float,
        is_real_event: bool,  # as opposed to simulation
    },
    scan_metadata: dict | None,

    # updated during scanning, multiple times (initially will be 'None')
    progress: {
        summary: str,
        epilogue: str,
        tallies: dict,
        processing_stats: {
            start: dict,
            runtime: dict,
            rate: dict,
            end: str,
            finished: bool,
            predictions: dict,
        },
        predictive_scanning_threshold: float,
        last_updated: str,
    },

    # timestamp of any update to manifest -- also see `progress.last_updated`
    last_updated: float,
}
Manifest Fields Excluded by Default in Response

Some routes/methods respond with the scan's manifest. This is a large dictionary, so by default, all but GET @ /scan/SCAN_ID/manifest exclude these fields:

  • event_i3live_json_dict

See https://github.com/search?q=repo%3AWIPACrepo%2FSkyDriver+DEFAULT_EXCLUDED_MANIFEST_FIELDS&type=code

Result

A dictionary containing the scan result

Pseudo-code:

{
    scan_id: str,

    skyscan_result: dict,  # serialized version of 'skyreader.SkyScanResult'
    is_final: bool,  # is this result the final result?
}

 

Using a Scan Result Outside of SkyDriver

Making Plots with a Scan's Result (using the scan_id)

See skyreader's plot_skydriver_scan_result.py

Creating a SkyScanResult Instance from a Scan's Result (using the scan_id)

Also, see skyreader's plot_skydriver_scan_result.py