This repository has been archived by the owner on Nov 19, 2023. It is now read-only.
/
utilities.py
70 lines (49 loc) · 2.5 KB
/
utilities.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import argparse
from kortex_api.TCPTransport import TCPTransport
from kortex_api.UDPTransport import UDPTransport
from kortex_api.RouterClient import RouterClient, RouterClientSendOptions
from kortex_api.SessionManager import SessionManager
from kortex_api.autogen.messages import Session_pb2
class DeviceConnection:
TCP_PORT = 10000
UDP_PORT = 10001
@staticmethod
def createTcpConnection(ip="192.168.1.10", username="admin", password="admin"):
"""
returns RouterClient required to create services and send requests to device or sub-devices,
"""
return DeviceConnection(ip, port=DeviceConnection.TCP_PORT, credentials=(username, password))
@staticmethod
def createUdpConnection(ip="192.168.1.10", username="admin", password="admin"):
"""
returns RouterClient that allows to create services and send requests to a device or its sub-devices @ 1khz.
"""
return DeviceConnection(ip, port=DeviceConnection.UDP_PORT, credentials=(username, password))
def __init__(self, ipAddress, port=TCP_PORT, credentials = ("","")):
self.ipAddress = ipAddress
self.port = port
self.credentials = credentials
self.sessionManager = None
# Setup API
self.transport = TCPTransport() if port == DeviceConnection.TCP_PORT else UDPTransport()
self.router = RouterClient(self.transport, RouterClient.basicErrorCallback)
# Called when entering 'with' statement
def __enter__(self):
self.transport.connect(self.ipAddress, self.port)
if (self.credentials[0] != ""):
session_info = Session_pb2.CreateSessionInfo()
session_info.username = self.credentials[0]
session_info.password = self.credentials[1]
session_info.session_inactivity_timeout = 10000 # (milliseconds)
session_info.connection_inactivity_timeout = 2000 # (milliseconds)
self.sessionManager = SessionManager(self.router)
print("Logging as", self.credentials[0], "on device", self.ipAddress)
self.sessionManager.CreateSession(session_info)
return self.router
# Called when exiting 'with' statement
def __exit__(self, exc_type, exc_value, traceback):
if self.sessionManager != None:
router_options = RouterClientSendOptions()
router_options.timeout_ms = 1000
self.sessionManager.CloseSession(router_options)
self.transport.disconnect()