forked from bytebeamio/rumqtt
-
Notifications
You must be signed in to change notification settings - Fork 0
/
console.rs
120 lines (110 loc) · 4.43 KB
/
console.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
use crate::link::local::{Link, LinkRx};
use crate::router::{Event, MetricsRequest};
use crate::{ConnectionId, ConsoleSettings};
use flume::Sender;
use rouille::{router, try_or_400};
use std::sync::Arc;
use tracing::info;
#[derive(Debug)]
pub struct ConsoleLink {
config: ConsoleSettings,
connection_id: ConnectionId,
router_tx: Sender<(ConnectionId, Event)>,
link_rx: LinkRx,
}
impl ConsoleLink {
/// Requires the corresponding Router to be running to complete
pub fn new(config: ConsoleSettings, router_tx: Sender<(ConnectionId, Event)>) -> ConsoleLink {
let tx = router_tx.clone();
let (link_tx, link_rx, _ack) = Link::new(None, "console", tx, true, None, true).unwrap();
let connection_id = link_tx.connection_id;
ConsoleLink {
config,
router_tx,
link_rx,
connection_id,
}
}
}
#[tracing::instrument]
pub fn start(console: Arc<ConsoleLink>) {
let address = console.config.listen.clone();
rouille::start_server(address, move |request| {
router!(request,
(GET) (/) => {
rouille::Response::redirect_302("/config")
},
(GET) (/config) => {
rouille::Response::json(&console.config.clone())
},
(GET) (/router) => {
let event = Event::Metrics(MetricsRequest::Router);
let message = (console.connection_id, event);
if console.router_tx.send(message).is_err() {
return rouille::Response::empty_404()
}
let v = console.link_rx.metrics();
rouille::Response::json(&v)
},
(GET) (/device/{id: String}) => {
let event = Event::Metrics(MetricsRequest::Connection(id));
let message = (console.connection_id, event);
if console.router_tx.send(message).is_err() {
return rouille::Response::empty_404()
}
let v = console.link_rx.metrics();
rouille::Response::json(&v)
},
(GET) (/subscriptions) => {
let event = Event::Metrics(MetricsRequest::Subscriptions);
let message = (console.connection_id, event);
if console.router_tx.send(message).is_err() {
return rouille::Response::empty_404()
}
let v = console.link_rx.metrics();
rouille::Response::json(&v)
},
(GET) (/subscription/{filter: String}) => {
let filter = filter.replace('.', "/");
let event = Event::Metrics(MetricsRequest::Subscription(filter));
let message = (console.connection_id, event);
if console.router_tx.send(message).is_err() {
return rouille::Response::empty_404()
}
let v = console.link_rx.metrics();
rouille::Response::json(&v)
},
(GET) (/waiters/{filter: String}) => {
let filter = filter.replace('.', "/");
let event = Event::Metrics(MetricsRequest::Waiters(filter));
let message = (console.connection_id, event);
if console.router_tx.send(message).is_err() {
return rouille::Response::empty_404()
}
let v = console.link_rx.metrics();
rouille::Response::json(&v)
},
(GET) (/readyqueue) => {
let event = Event::Metrics(MetricsRequest::ReadyQueue);
let message = (console.connection_id, event);
if console.router_tx.send(message).is_err() {
return rouille::Response::empty_404()
}
let v = console.link_rx.metrics();
rouille::Response::json(&v)
},
(POST) (/logs) => {
info!("Reloading tracing filter");
let data = try_or_400!(rouille::input::plain_text_body(request));
if let Some(handle) = &console.config.filter_handle {
if handle.reload(&data).is_err() {
return rouille::Response::empty_400();
}
return rouille::Response::text(data);
}
rouille::Response::empty_404()
},
_ => rouille::Response::empty_404()
)
});
}