-
Notifications
You must be signed in to change notification settings - Fork 0
/
bot_rider_server.py
89 lines (73 loc) · 3.07 KB
/
bot_rider_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import urllib.parse
import http.server
import socketserver
import json
import traceback
import sys
import serial
import re
import os
import getopt
PORT = 8000
opts, args = getopt.getopt(sys.argv[1:], "p:", ["cvd="])
for o, a in opts:
if o == "-p":
PORT = int(a)
elif o == "--cvd":
os.chdir(a)
last_request = None
ser = serial.Serial('/dev/ttyAMA0', baudrate = 115200, timeout=0.1)
status_pattern = re.compile("MANUAL: ([-+]?\d+), L ([-+]?\d+), F ([-+]?\d+), R ([-+]?\d+), dir ([-+]?\d+), speed ([-+]?\d+), left_mapped ([-+]?\d+), right_mapped ([-+]?\d+)")
class RequestHandler(http.server.SimpleHTTPRequestHandler):
def do_GET(self):
print("Request!")
print(self.path)
parsed_path = urllib.parse.urlparse(self.path)
if parsed_path.path == "/status":
queries = urllib.parse.parse_qs(parsed_path.query)
response = {}
if "speed" in queries:
print("speed: %s" % str(queries["speed"][0]))
ser.write((u"s%ds" % int(float(queries["speed"][0]))).encode("UTF-8"))
response["speed"] = queries["speed"]
if "dir" in queries:
print("dir: %s" % str(queries["dir"][0]))
ser.write((u"d%dd" % int(float(queries["dir"][0]))).encode("UTF-8"))
response["dir"] = queries["dir"]
ser.reset_input_buffer()
ser.write(b"q")
r = str(ser.readline())
print("Status: %s" % (r,))
status_table = {"raw_status": str(r)}
try:
s = re.search(status_pattern, r).groups()
status_table["manual"] = int(s[0])
status_table["dist_l"] = int(s[1])
status_table["dist_f"] = int(s[2])
status_table["dist_r"] = int(s[3])
status_table["dir"] = int(s[4])
status_table["speed"] = int(s[5])
status_table["left_mapped"] = int(s[6])
status_table["right_mapped"] = int(s[7])
except Exception as e:
exc_type, exc_value, exc_traceback = sys.exc_info()
traceback.print_exception(exc_type, exc_value, exc_traceback)
response["status"] = status_table
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.send_header('Access-Control-Allow-Origin', 'http://127.0.0.1:8000')
self.end_headers()
self.wfile.write(json.dumps(response).encode("UTF-8"))
elif self.path == "/toggle_manual":
ser.write(b"a")
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.send_header('Access-Control-Allow-Origin', 'http://127.0.0.1:8000')
self.end_headers()
self.wfile.write("{}".encode("UTF-8"))
else:
http.server.SimpleHTTPRequestHandler.do_GET(self)
socketserver.TCPServer.allow_reuse_address = True
httpd = socketserver.TCPServer(("", PORT), RequestHandler)
print("serving at port", PORT)
httpd.serve_forever()