Skip to content

Commit

Permalink
re-modularize
Browse files Browse the repository at this point in the history
  • Loading branch information
bernardoaraujor committed Oct 21, 2023
1 parent d4d9e29 commit 1f027b5
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 81 deletions.
71 changes: 0 additions & 71 deletions node/src/block_template.rs
Expand Up @@ -13,77 +13,6 @@ const BLOCK_TEMPLATE_RULES: [GetBlockTemplateRules; 4] = [
const BACKOFF_BASE: u64 = 2;
const MAX_RPC_FAILURES: u32 = 20;

#[derive(Debug)]
pub enum BlockTemplateError {
Rpc(bitcoincore_rpc::Error),
Zmq(bitcoincore_zmq::Error),
}

async fn zmq_setup(
bitcoin: String,
zmq_port: u16,
) -> Result<Receiver<Result<bitcoincore_zmq::Message, bitcoincore_zmq::Error>>, BlockTemplateError>
{
let zmq_url = format!("tcp://{}:{}", bitcoin, zmq_port);

match bitcoincore_zmq::subscribe_single(&zmq_url).await {
Ok(zmq) => Ok(zmq),
Err(err) => Err(BlockTemplateError::Zmq(err)),
}
}

fn rpc_setup(
bitcoin: String,
rpc_port: u16,
rpc_user: String,
rpc_pass: String,
) -> Result<bitcoincore_rpc::Client, BlockTemplateError> {
let rpc_url = format!("{}:{}", bitcoin, rpc_port);
match bitcoincore_rpc::Client::new(
&rpc_url,
bitcoincore_rpc::Auth::UserPass(rpc_user, rpc_pass),
) {
Ok(client) => Ok(client),
Err(err) => Err(BlockTemplateError::Rpc(err)),
}
}

pub async fn listener(
bitcoin: String,
rpc_port: u16,
rpc_user: String,
rpc_pass: String,
zmq_port: u16,
block_template_tx: Sender<GetBlockTemplateResult>,
) -> Result<(), BlockTemplateError> {
let rpc: bitcoincore_rpc::Client = rpc_setup(bitcoin.clone(), rpc_port, rpc_user, rpc_pass)?;
let mut zmq: Receiver<Result<bitcoincore_zmq::Message, bitcoincore_zmq::Error>> =
zmq_setup(bitcoin, zmq_port).await?;

while let Some(msg) = zmq.recv().await {
match msg {
Ok(m) => {
match m {
bitcoincore_zmq::Message::HashBlock(_, _) => {
log::info!(
"Received a new `hashblock` notification via ZeroMQ. \
Calling `getblocktemplate` RPC now..."
);
fetcher(&rpc, block_template_tx.clone()).await;
}
bitcoincore_zmq::Message::HashTx(_, _) => todo!(),
bitcoincore_zmq::Message::Block(_, _) => todo!(),
bitcoincore_zmq::Message::Tx(_, _) => todo!(),
bitcoincore_zmq::Message::Sequence(_, _) => todo!(),
};
}
Err(err) => return Err(BlockTemplateError::Zmq(err)),
}
}

Ok(())
}

pub async fn fetcher(
rpc: &bitcoincore_rpc::Client,
block_template_tx: Sender<GetBlockTemplateResult>,
Expand Down
16 changes: 10 additions & 6 deletions node/src/main.rs
Expand Up @@ -3,12 +3,15 @@ use std::error::Error;
use std::net::ToSocketAddrs;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::mpsc;
use tokio::sync::mpsc::Receiver;
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};

mod block_template;
mod cli;
mod connection;
mod protocol;
mod rpc;
mod zmq;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
Expand All @@ -20,15 +23,16 @@ async fn main() -> Result<(), Box<dyn Error>> {
let datadir = args.datadir;
log::info!("Using braid data directory: {}", datadir.display());

let (block_template_tx, block_template_rx) = mpsc::channel(1);
tokio::spawn(block_template::listener(
args.bitcoin,
let rpc = rpc::setup(
args.bitcoin.clone(),
args.rpcport,
args.rpcuser,
args.rpcpass,
args.zmqport,
block_template_tx,
));
)?;
let zmq = zmq::setup(args.bitcoin, args.zmqport).await?;

let (block_template_tx, block_template_rx) = mpsc::channel(1);
tokio::spawn(zmq::listener(zmq, rpc, block_template_tx));
tokio::spawn(block_template::consumer(block_template_rx));

if let Some(addnode) = args.addnode {
Expand Down
12 changes: 12 additions & 0 deletions node/src/rpc.rs
@@ -0,0 +1,12 @@
pub fn setup(
bitcoin: String,
rpc_port: u16,
rpc_user: String,
rpc_pass: String,
) -> Result<bitcoincore_rpc::Client, bitcoincore_rpc::Error> {
let rpc_url = format!("{}:{}", bitcoin, rpc_port);
bitcoincore_rpc::Client::new(
&rpc_url,
bitcoincore_rpc::Auth::UserPass(rpc_user, rpc_pass),
)
}
18 changes: 14 additions & 4 deletions node/src/zmq.rs
@@ -1,13 +1,23 @@
use crate::block_template;
use tokio::sync::mpsc::{Receiver, Sender};

pub async fn setup(
bitcoin: String,
zmq_port: u16,
) -> Result<
Receiver<Result<bitcoincore_zmq::Message, bitcoincore_zmq::Error>>,
bitcoincore_zmq::Error,
> {
let zmq_url = format!("tcp://{}:{}", bitcoin, zmq_port);

bitcoincore_zmq::subscribe_single(&zmq_url).await
}

pub async fn listener(
zmq_url: String,
mut zmq: Receiver<Result<bitcoincore_zmq::Message, bitcoincore_zmq::Error>>,
rpc: bitcoincore_rpc::Client,
block_template_tx: Sender<bitcoincore_rpc_json::GetBlockTemplateResult>,
) -> Result<(), bitcoincore_zmq::Error> {
let mut zmq = bitcoincore_zmq::subscribe_single(&zmq_url).await?;

while let Some(msg) = zmq.recv().await {
match msg {
Ok(m) => {
Expand All @@ -30,4 +40,4 @@ pub async fn listener(
}

Ok(())
}
}

0 comments on commit 1f027b5

Please sign in to comment.