Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add akka cluster status panel #42

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 0 additions & 1 deletion src/akka/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ pub struct ActorSystemStatus {
pub start_time: u64,
}


impl DeadLettersWindow {
pub fn max(&self) -> u32 {
vec![self.dead_letters.count, self.unhandled.count, self.dropped.count].iter().max().map(|x| x.to_owned()).unwrap_or(0)
Expand Down
17 changes: 17 additions & 0 deletions src/akka_cluster/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use reqwest;
use crate::akka_cluster::model::ClusterStatus;

pub fn get_akka_cluster_status(url: &String) -> Result<ClusterStatus, String> {
get_akka_cluster_status_async(url)
}

#[tokio::main]
async fn get_akka_cluster_status_async(url: &String) -> Result<ClusterStatus, String> {
let response = reqwest::get(url).await.map_err(|e| e.to_string())?;
if response.status().is_success() {
let cluster_status: ClusterStatus = response.json().await.map_err(|e| e.to_string())?;
Ok(cluster_status)
} else {
Err(format!("Request to get cluster status failed with status {}", response.status()))
}
}
2 changes: 2 additions & 0 deletions src/akka_cluster/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod model;
pub mod client;
32 changes: 32 additions & 0 deletions src/akka_cluster/model.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use std::fmt;
use serde::Deserialize;

#[derive(Clone)]
pub struct AkkaClusterSettings {
pub cluster_status_address: String,
}

#[derive(Deserialize, Debug)]
pub struct ClusterStatus {
#[serde(rename(deserialize = "selfNode"))]
pub self_node: String,
pub members: Vec<ClusterMember>,
pub unreachable: Vec<String>,
pub leader: String,
pub oldest: String,
}

#[derive(Deserialize, Debug)]
pub struct ClusterMember {
pub node: String,
#[serde(rename(deserialize = "nodeUid"))]
pub node_uid: String,
pub status: String,
pub roles: Vec<String>,
}

impl fmt::Display for ClusterMember {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "==> node: {}\n uid: {}\n status: {}\n roles: {}", self.node, self.node_uid, self.status, self.roles.join(", "))
}
}
47 changes: 42 additions & 5 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::akka::model::{ActorTreeNode, AkkaSettings, DeadLettersSnapshot, DeadL
use crate::jmx::model::{HikariMetrics, JMXConnectionSettings, SlickConfig, SlickMetrics};
use crate::widgets::tree;
use crate::zio::model::{Fiber, FiberCount, FiberStatus};
use crate::akka_cluster::model::{AkkaClusterSettings, ClusterStatus};

pub struct UIFiber {
pub label: String,
Expand All @@ -18,6 +19,7 @@ pub enum AppTabKind {
ZMX,
Slick,
Akka,
AkkaCluster,
}

#[derive(Clone)]
Expand Down Expand Up @@ -286,6 +288,28 @@ impl AkkaTab {
}
}

pub struct AkkaClusterTab {
pub cluster_status: ClusterStatus,
}

impl AkkaClusterTab {
pub fn new() -> AkkaClusterTab {
AkkaClusterTab {
cluster_status: ClusterStatus {
self_node: "none".to_owned(),
members: vec![],
unreachable: vec![],
leader: "none".to_owned(),
oldest: "none".to_owned(),
}
}
}

pub fn update_cluster_status(&mut self, cs: ClusterStatus) {
self.cluster_status = cs
}
}

pub struct StatefulList<T> {
pub state: ListState,
pub items: Vec<T>,
Expand Down Expand Up @@ -342,14 +366,16 @@ pub struct App<'a> {
pub zmx: Option<ZMXTab>,
pub slick: Option<SlickTab>,
pub akka: Option<AkkaTab>,
pub akka_cluster: Option<AkkaClusterTab>,
}

impl<'a> App<'a> {
pub fn new(
title: &'a str,
zio_zmx_addr: Option<String>,
jmx: Option<JMXConnectionSettings>,
akka: Option<AkkaSettings>) -> App<'a> {
akka: Option<AkkaSettings>,
akka_cluster: Option<AkkaClusterSettings>) -> App<'a> {
let mut tabs: Vec<Tab<AppTabKind>> = vec![];

if let Some(_) = zio_zmx_addr {
Expand All @@ -364,6 +390,10 @@ impl<'a> App<'a> {
tabs.push(Tab { kind: AppTabKind::Akka, title: "Akka".to_owned() })
}

if let Some(_) = akka_cluster {
tabs.push(Tab { kind: AppTabKind::AkkaCluster, title: "Cluster".to_owned() })
}

App {
title,
should_quit: false,
Expand All @@ -372,22 +402,25 @@ impl<'a> App<'a> {
zmx: zio_zmx_addr.map(|_| ZMXTab::new()),
slick: jmx.map(|_| SlickTab::new()),
akka: akka.map(|_| AkkaTab::new()),
akka_cluster: akka_cluster.map(|_| AkkaClusterTab::new()),
}
}

pub fn on_up(&mut self) {
match self.tabs.current().kind {
AppTabKind::ZMX => self.zmx.as_mut().unwrap().select_prev_fiber(),
AppTabKind::Slick => {}
AppTabKind::Akka => self.akka.as_mut().unwrap().dead_letters_log.previous()
AppTabKind::Akka => self.akka.as_mut().unwrap().dead_letters_log.previous(),
AppTabKind::AkkaCluster => {}
}
}

pub fn on_down(&mut self) {
match self.tabs.current().kind {
AppTabKind::ZMX => self.zmx.as_mut().unwrap().select_next_fiber(),
AppTabKind::Slick => {}
AppTabKind::Akka => self.akka.as_mut().unwrap().dead_letters_log.next()
AppTabKind::Akka => self.akka.as_mut().unwrap().dead_letters_log.next(),
AppTabKind::AkkaCluster => {}
}
}

Expand All @@ -407,7 +440,8 @@ impl<'a> App<'a> {
let akka = self.akka.as_mut().unwrap();
akka.dead_letters_tabs.next();
akka.reload_dead_letters_log();
}
},
AppTabKind::AkkaCluster => {}
}
}

Expand All @@ -419,7 +453,8 @@ impl<'a> App<'a> {
let akka = self.akka.as_mut().unwrap();
akka.dead_letters_tabs.previous();
akka.reload_dead_letters_log();
}
},
AppTabKind::AkkaCluster => {}
}
}

Expand All @@ -442,6 +477,7 @@ impl<'a> App<'a> {
AppTabKind::ZMX => self.zmx.as_mut().unwrap().scroll_up(),
AppTabKind::Slick => {}
AppTabKind::Akka => self.akka.as_mut().unwrap().select_prev_actor(),
AppTabKind::AkkaCluster => {}
}
}

Expand All @@ -450,6 +486,7 @@ impl<'a> App<'a> {
AppTabKind::ZMX => self.zmx.as_mut().unwrap().scroll_down(),
AppTabKind::Slick => {}
AppTabKind::Akka => self.akka.as_mut().unwrap().select_next_actor(),
AppTabKind::AkkaCluster => {}
}
}
}
Expand Down
14 changes: 13 additions & 1 deletion src/fetcher.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use jmx::MBeanClient;

use crate::akka;
use crate::akka_cluster;
use crate::akka::model::{ActorTreeNode, AkkaSettings, DeadLettersSnapshot, DeadLettersWindow, ActorSystemStatus};
use crate::jmx::client::JMXClient;
use crate::jmx::model::{HikariMetrics, JMXConnectionSettings, SlickConfig, SlickMetrics};
use crate::zio::model::Fiber;
use crate::zio::zmx::{NetworkZMXClient, ZMXClient};
use crate::akka_cluster::model::{ClusterStatus, AkkaClusterSettings};

pub enum FetcherRequest {
FiberDump,
Expand All @@ -16,6 +18,7 @@ pub enum FetcherRequest {
ActorTree,
ActorSystemStatus,
DeadLetters,
ClusterStatus,
}

pub enum FetcherResponse {
Expand All @@ -27,20 +30,23 @@ pub enum FetcherResponse {
ActorTree(Result<Vec<ActorTreeNode>, String>),
ActorSystemStatus(Result<ActorSystemStatus, String>),
DeadLetters(Result<(DeadLettersSnapshot, DeadLettersWindow), String>),
ClusterStatus(Result<ClusterStatus, String>),
FatalFailure(String),
}

pub struct Fetcher {
pub zmx_client: Option<Box<dyn ZMXClient>>,
pub jmx: Option<JMXClient>,
pub akka_settings: Option<AkkaSettings>,
pub akka_cluster_settings: Option<AkkaClusterSettings>,
}

impl Fetcher {
pub fn new(
zio_zmx_addr: Option<String>,
jmx: Option<JMXConnectionSettings>,
akka: Option<AkkaSettings>) -> Result<Fetcher, String> {
akka: Option<AkkaSettings>,
akka_cluster: Option<AkkaClusterSettings>) -> Result<Fetcher, String> {
let jmx_client: Option<JMXClient> = match jmx {
None => Ok(None),
Some(conn) => {
Expand All @@ -64,6 +70,7 @@ impl Fetcher {
}),
jmx: jmx_client,
akka_settings: akka,
akka_cluster_settings: akka_cluster,
})
}

Expand Down Expand Up @@ -102,6 +109,11 @@ impl Fetcher {
.map_err(|e| format!("Error loading akka actor system status: {}", e))
}

pub fn get_akka_cluster_status(&self) -> Result<ClusterStatus, String> {
let s = self.akka_cluster_settings.as_ref().unwrap();
akka_cluster::client::get_akka_cluster_status(&s.cluster_status_address)
}

pub fn get_dead_letters(&self) -> Result<(DeadLettersSnapshot, DeadLettersWindow), String> {
let s = self.akka_settings.as_ref().unwrap();
akka::client::get_deadletters(&s.dead_letters_address, s.dead_letters_window)
Expand Down
30 changes: 28 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ mod ui;
mod zio;
mod jmx;
mod akka;
mod akka_cluster;
mod app;
mod fetcher;
mod widgets;
Expand Down Expand Up @@ -30,6 +31,7 @@ use crate::fetcher::{Fetcher, FetcherRequest, FetcherResponse};

use crate::akka::model::AkkaSettings;
use crate::jmx::model::JMXConnectionSettings;
use crate::akka_cluster::model::AkkaClusterSettings;

enum Event<I> {
Input(I),
Expand Down Expand Up @@ -80,6 +82,9 @@ struct Cli {
/// Time window for akka dead-letters metrics
#[structopt(long = "dead-letters-window", default_value = "5000")]
dead_letters_window: u64,
/// Address of http endpoint to get akka cluster status
#[structopt(long = "akka-cluster-status")]
akka_cluster_status: Option<String>,
}

impl Cli {
Expand Down Expand Up @@ -108,6 +113,13 @@ impl Cli {
_ => None
}
}

fn akka_cluster_settings(&self) -> Option<AkkaClusterSettings> {
match &self.akka_cluster_status {
Some(cluster_address) => Some(AkkaClusterSettings{ cluster_status_address: cluster_address.to_owned() }),
_ => None
}
}
}

fn main() -> Result<(), failure::Error> {
Expand All @@ -116,7 +128,7 @@ fn main() -> Result<(), failure::Error> {
// disable jmx crate logging
env::set_var("J4RS_CONSOLE_LOG_LEVEL", "disabled");

if cli.zio_zmx.is_none() && cli.jmx_settings().is_none() && cli.akka_settings().is_none() {
if cli.zio_zmx.is_none() && cli.jmx_settings().is_none() && cli.akka_settings().is_none() && cli.akka_cluster_settings().is_none() {
let mut clap = Cli::clap();
println!("Nothing to monitor. Please check the following help message.\n");
clap.print_long_help().expect("Failed printing help message");
Expand All @@ -141,6 +153,7 @@ fn main() -> Result<(), failure::Error> {
cli.zio_zmx.clone(),
cli.jmx_settings(),
cli.akka_settings(),
cli.akka_cluster_settings(),
);

terminal.clear()?;
Expand All @@ -157,7 +170,8 @@ fn main() -> Result<(), failure::Error> {

match Fetcher::new(cli.zio_zmx.clone(),
cli.jmx_settings(),
cli.akka_settings()) {
cli.akka_settings(),
cli.akka_cluster_settings()) {
Err(e) => {
eprintln!("Responding with failure {}", e);
loop {
Expand All @@ -184,6 +198,8 @@ fn main() -> Result<(), failure::Error> {
respond(FetcherResponse::ActorSystemStatus(fetcher.get_actor_system_status())),
FetcherRequest::DeadLetters =>
respond(FetcherResponse::DeadLetters(fetcher.get_dead_letters())),
FetcherRequest::ClusterStatus =>
respond(FetcherResponse::ClusterStatus(fetcher.get_akka_cluster_status())),
}
}
}
Expand Down Expand Up @@ -240,6 +256,7 @@ fn main() -> Result<(), failure::Error> {
AppTabKind::ZMX => txf.send(FetcherRequest::FiberDump)?,
AppTabKind::Slick => {}
AppTabKind::Akka => txf.send(FetcherRequest::ActorTree)?,
AppTabKind::AkkaCluster => {}
}
}
_ => {}
Expand Down Expand Up @@ -295,6 +312,11 @@ fn main() -> Result<(), failure::Error> {
Err(e) => app.quit(Some(e)),
Ok(x) => app.akka.as_mut().unwrap().append_dead_letters(x.0, x.1)
},
FetcherResponse::ClusterStatus(d) =>
match d {
Err(e) => app.quit(Some(e)),
Ok(x) => app.akka_cluster.as_mut().unwrap().update_cluster_status(x)
}
}

Event::Tick => {
Expand All @@ -316,6 +338,10 @@ fn main() -> Result<(), failure::Error> {
txf.send(FetcherRequest::ActorSystemStatus)?;
txf.send(FetcherRequest::DeadLetters)?;
}

if app.akka_cluster.is_some() {
txf.send(FetcherRequest::ClusterStatus)?;
}
}
}
if app.should_quit {
Expand Down