Skip to content

Commit

Permalink
Excluded into separate tab
Browse files Browse the repository at this point in the history
  • Loading branch information
mtsokol committed Sep 18, 2020
1 parent fa22526 commit ca04936
Show file tree
Hide file tree
Showing 9 changed files with 175 additions and 83 deletions.
20 changes: 0 additions & 20 deletions src/akka/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@ pub fn get_deadletters(url: &String, window: u64) -> Result<(DeadLettersSnapshot
get_deadletters_async(url, window)
}

pub fn get_akka_cluster_status(url: &String) -> Result<Vec<ClusterMember>, String> {
get_akka_cluster_status_async(url)
}

#[tokio::main]
async fn get_deadletters_async(url: &String, window: u64) -> Result<(DeadLettersSnapshot, DeadLettersWindow), String> {
let url = format!("{}?window={}", url, window);
Expand Down Expand Up @@ -82,19 +78,3 @@ async fn get_actor_system_status_async(url: &String, timeout: u64) -> Result<Act
return Err(format!("Request to get actor count failed with status {}", response.status()))
}
}

#[tokio::main]
async fn get_akka_cluster_status_async(url: &String) -> Result<Vec<ClusterMember>, String> {
let response = reqwest::get(url).await.map_err(|e| e.to_string())?;
if response.status().is_success() {
let response_body: HashMap<String, Value> = response.json().await.map_err(|e| e.to_string())?;
if let Some(w) = response_body.get("members") {
let members: Vec<ClusterMember> = serde_json::from_value(w.to_owned()).unwrap();
Ok(members)
} else {
Err(format!("Request to get cluster status failed while deserializing, response body is: {:?}", response_body))
}
} else {
Err(format!("Request to get cluster status failed with status {}", response.status()))
}
}
18 changes: 0 additions & 18 deletions src/akka/model.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
extern crate chrono;

use chrono::prelude::*;
use std::fmt;
use serde::Deserialize;

#[derive(Clone)]
pub struct AkkaSettings {
pub tree_address: String,
pub status_address: String,
pub dead_letters_address: String,
pub cluster_status_address: Option<String>,
pub tree_timeout: u64,
pub status_timeout: u64,
pub dead_letters_window: u64,
Expand Down Expand Up @@ -99,16 +97,6 @@ pub struct ActorSystemStatus {
pub start_time: u64,
}

#[derive(Deserialize, Debug)]
pub struct ClusterMember {
pub node: String,
#[serde(rename(deserialize = "nodeUid"))]
pub node_uid: String,
pub status: String,
pub roles: Vec<String>
}


impl DeadLettersWindow {
pub fn max(&self) -> u32 {
vec![self.dead_letters.count, self.unhandled.count, self.dropped.count].iter().max().map(|x| x.to_owned()).unwrap_or(0)
Expand Down Expand Up @@ -176,9 +164,3 @@ impl DeadLettersUIMessage {
NaiveDateTime::from_timestamp((self.timestamp / 1000) as i64, 0)
}
}

impl fmt::Display for ClusterMember {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "id: {}, status: {}", self.node_uid, self.status)
}
}
17 changes: 17 additions & 0 deletions src/akka_cluster/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use reqwest;
use crate::akka_cluster::model::ClusterStatus;

pub fn get_akka_cluster_status(url: &String) -> Result<ClusterStatus, String> {
get_akka_cluster_status_async(url)
}

#[tokio::main]
async fn get_akka_cluster_status_async(url: &String) -> Result<ClusterStatus, String> {
let response = reqwest::get(url).await.map_err(|e| e.to_string())?;
if response.status().is_success() {
let cluster_status: ClusterStatus = response.json().await.map_err(|e| e.to_string())?;
Ok(cluster_status)
} else {
Err(format!("Request to get cluster status failed with status {}", response.status()))
}
}
2 changes: 2 additions & 0 deletions src/akka_cluster/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod model;
pub mod client;
32 changes: 32 additions & 0 deletions src/akka_cluster/model.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use std::fmt;
use serde::Deserialize;

#[derive(Clone)]
pub struct AkkaClusterSettings {
pub cluster_status_address: String,
}

#[derive(Deserialize, Debug)]
pub struct ClusterStatus {
#[serde(rename(deserialize = "selfNode"))]
pub self_node: String,
pub members: Vec<ClusterMember>,
pub unreachable: Vec<String>,
pub leader: String,
pub oldest: String,
}

#[derive(Deserialize, Debug)]
pub struct ClusterMember {
pub node: String,
#[serde(rename(deserialize = "nodeUid"))]
pub node_uid: String,
pub status: String,
pub roles: Vec<String>,
}

impl fmt::Display for ClusterMember {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "==> node: {}\n uid: {}\n status: {}\n roles: {}", self.node, self.node_uid, self.status, self.roles.join(", "))
}
}
55 changes: 43 additions & 12 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ use std::iter::Iterator;

use tui::widgets::ListState;

use crate::akka::model::{ActorTreeNode, AkkaSettings, DeadLettersSnapshot, DeadLettersWindow, DeadLettersUIMessage, ActorSystemStatus, ClusterMember};
use crate::akka::model::{ActorTreeNode, AkkaSettings, DeadLettersSnapshot, DeadLettersWindow, DeadLettersUIMessage, ActorSystemStatus};
use crate::jmx::model::{HikariMetrics, JMXConnectionSettings, SlickConfig, SlickMetrics};
use crate::widgets::tree;
use crate::zio::model::{Fiber, FiberCount, FiberStatus};
use crate::akka_cluster::model::{AkkaClusterSettings, ClusterStatus};

pub struct UIFiber {
pub label: String,
Expand All @@ -18,6 +19,7 @@ pub enum AppTabKind {
ZMX,
Slick,
Akka,
AkkaCluster,
}

#[derive(Clone)]
Expand Down Expand Up @@ -201,7 +203,6 @@ pub struct AkkaTab {
pub actors: StatefulList<String>,
pub actor_counts: VecDeque<u64>,
pub system_status: ActorSystemStatus,
pub cluster_status: Option<Vec<ClusterMember>>,
pub dead_letters_messages: DeadLettersSnapshot,
pub dead_letters_windows: VecDeque<DeadLettersWindow>,
pub dead_letters_tabs: TabsState<DeadLettersTabKind>,
Expand All @@ -212,7 +213,7 @@ impl AkkaTab {
pub const MAX_ACTOR_COUNT_MEASURES: usize = 25;
pub const MAX_DEAD_LETTERS_WINDOW_MEASURES: usize = 100;

pub fn new(is_cluster_enabled: bool) -> AkkaTab {
pub fn new() -> AkkaTab {
AkkaTab {
actors: StatefulList::with_items(vec![]),
actor_counts: VecDeque::new(),
Expand All @@ -236,7 +237,6 @@ impl AkkaTab {
uptime: 0,
start_time: 0,
},
cluster_status: if is_cluster_enabled { Some(vec![]) } else { None },
}
}

Expand Down Expand Up @@ -286,9 +286,27 @@ impl AkkaTab {

self.dead_letters_log = StatefulList::with_items(ui_messages)
}
}

pub struct AkkaClusterTab {
pub cluster_status: ClusterStatus,
}

impl AkkaClusterTab {
pub fn new() -> AkkaClusterTab {
AkkaClusterTab {
cluster_status: ClusterStatus {
self_node: "none".to_owned(),
members: vec![],
unreachable: vec![],
leader: "none".to_owned(),
oldest: "none".to_owned(),
}
}
}

pub fn update_cluster_status(&mut self, v: Vec<ClusterMember>) {
self.cluster_status = Some(v)
pub fn update_cluster_status(&mut self, cs: ClusterStatus) {
self.cluster_status = cs
}
}

Expand Down Expand Up @@ -348,14 +366,16 @@ pub struct App<'a> {
pub zmx: Option<ZMXTab>,
pub slick: Option<SlickTab>,
pub akka: Option<AkkaTab>,
pub akka_cluster: Option<AkkaClusterTab>,
}

impl<'a> App<'a> {
pub fn new(
title: &'a str,
zio_zmx_addr: Option<String>,
jmx: Option<JMXConnectionSettings>,
akka: Option<AkkaSettings>) -> App<'a> {
akka: Option<AkkaSettings>,
akka_cluster: Option<AkkaClusterSettings>) -> App<'a> {
let mut tabs: Vec<Tab<AppTabKind>> = vec![];

if let Some(_) = zio_zmx_addr {
Expand All @@ -370,30 +390,37 @@ impl<'a> App<'a> {
tabs.push(Tab { kind: AppTabKind::Akka, title: "Akka".to_owned() })
}

if let Some(_) = akka_cluster {
tabs.push(Tab { kind: AppTabKind::AkkaCluster, title: "Cluster".to_owned() })
}

App {
title,
should_quit: false,
exit_reason: None,
tabs: TabsState::new(tabs),
zmx: zio_zmx_addr.map(|_| ZMXTab::new()),
slick: jmx.map(|_| SlickTab::new()),
akka: akka.map(|akka| AkkaTab::new(akka.cluster_status_address.is_some())),
akka: akka.map(|_| AkkaTab::new()),
akka_cluster: akka_cluster.map(|_| AkkaClusterTab::new()),
}
}

pub fn on_up(&mut self) {
match self.tabs.current().kind {
AppTabKind::ZMX => self.zmx.as_mut().unwrap().select_prev_fiber(),
AppTabKind::Slick => {}
AppTabKind::Akka => self.akka.as_mut().unwrap().dead_letters_log.previous()
AppTabKind::Akka => self.akka.as_mut().unwrap().dead_letters_log.previous(),
AppTabKind::AkkaCluster => {}
}
}

pub fn on_down(&mut self) {
match self.tabs.current().kind {
AppTabKind::ZMX => self.zmx.as_mut().unwrap().select_next_fiber(),
AppTabKind::Slick => {}
AppTabKind::Akka => self.akka.as_mut().unwrap().dead_letters_log.next()
AppTabKind::Akka => self.akka.as_mut().unwrap().dead_letters_log.next(),
AppTabKind::AkkaCluster => {}
}
}

Expand All @@ -413,7 +440,8 @@ impl<'a> App<'a> {
let akka = self.akka.as_mut().unwrap();
akka.dead_letters_tabs.next();
akka.reload_dead_letters_log();
}
},
AppTabKind::AkkaCluster => {}
}
}

Expand All @@ -425,7 +453,8 @@ impl<'a> App<'a> {
let akka = self.akka.as_mut().unwrap();
akka.dead_letters_tabs.previous();
akka.reload_dead_letters_log();
}
},
AppTabKind::AkkaCluster => {}
}
}

Expand All @@ -448,6 +477,7 @@ impl<'a> App<'a> {
AppTabKind::ZMX => self.zmx.as_mut().unwrap().scroll_up(),
AppTabKind::Slick => {}
AppTabKind::Akka => self.akka.as_mut().unwrap().select_prev_actor(),
AppTabKind::AkkaCluster => {}
}
}

Expand All @@ -456,6 +486,7 @@ impl<'a> App<'a> {
AppTabKind::ZMX => self.zmx.as_mut().unwrap().scroll_down(),
AppTabKind::Slick => {}
AppTabKind::Akka => self.akka.as_mut().unwrap().select_next_actor(),
AppTabKind::AkkaCluster => {}
}
}
}
Expand Down
17 changes: 11 additions & 6 deletions src/fetcher.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use jmx::MBeanClient;

use crate::akka;
use crate::akka::model::{ActorTreeNode, AkkaSettings, DeadLettersSnapshot, DeadLettersWindow, ActorSystemStatus, ClusterMember};
use crate::akka_cluster;
use crate::akka::model::{ActorTreeNode, AkkaSettings, DeadLettersSnapshot, DeadLettersWindow, ActorSystemStatus};
use crate::jmx::client::JMXClient;
use crate::jmx::model::{HikariMetrics, JMXConnectionSettings, SlickConfig, SlickMetrics};
use crate::zio::model::Fiber;
use crate::zio::zmx::{NetworkZMXClient, ZMXClient};
use crate::akka_cluster::model::{ClusterStatus, AkkaClusterSettings};

pub enum FetcherRequest {
FiberDump,
Expand All @@ -28,21 +30,23 @@ pub enum FetcherResponse {
ActorTree(Result<Vec<ActorTreeNode>, String>),
ActorSystemStatus(Result<ActorSystemStatus, String>),
DeadLetters(Result<(DeadLettersSnapshot, DeadLettersWindow), String>),
ClusterStatus(Result<Vec<ClusterMember>, String>),
ClusterStatus(Result<ClusterStatus, String>),
FatalFailure(String),
}

pub struct Fetcher {
pub zmx_client: Option<Box<dyn ZMXClient>>,
pub jmx: Option<JMXClient>,
pub akka_settings: Option<AkkaSettings>,
pub akka_cluster_settings: Option<AkkaClusterSettings>,
}

impl Fetcher {
pub fn new(
zio_zmx_addr: Option<String>,
jmx: Option<JMXConnectionSettings>,
akka: Option<AkkaSettings>) -> Result<Fetcher, String> {
akka: Option<AkkaSettings>,
akka_cluster: Option<AkkaClusterSettings>) -> Result<Fetcher, String> {
let jmx_client: Option<JMXClient> = match jmx {
None => Ok(None),
Some(conn) => {
Expand All @@ -66,6 +70,7 @@ impl Fetcher {
}),
jmx: jmx_client,
akka_settings: akka,
akka_cluster_settings: akka_cluster,
})
}

Expand Down Expand Up @@ -104,9 +109,9 @@ impl Fetcher {
.map_err(|e| format!("Error loading akka actor system status: {}", e))
}

pub fn get_akka_cluster_status(&self) -> Result<Vec<ClusterMember>, String> {
let s = self.akka_settings.as_ref().unwrap();
akka::client::get_akka_cluster_status(s.cluster_status_address.as_ref().unwrap())
pub fn get_akka_cluster_status(&self) -> Result<ClusterStatus, String> {
let s = self.akka_cluster_settings.as_ref().unwrap();
akka_cluster::client::get_akka_cluster_status(&s.cluster_status_address)
}

pub fn get_dead_letters(&self) -> Result<(DeadLettersSnapshot, DeadLettersWindow), String> {
Expand Down

0 comments on commit ca04936

Please sign in to comment.