-
Notifications
You must be signed in to change notification settings - Fork 384
/
agent.rs
100 lines (85 loc) · 3.14 KB
/
agent.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
//! # UDP Jaeger Agent Client
use crate::exporter::thrift::{
agent::{self, TAgentSyncClient},
jaeger,
};
use crate::exporter::transport::{TBufferChannel, TNoopChannel};
use std::fmt;
use std::net::{ToSocketAddrs, UdpSocket};
use thrift::{
protocol::{TCompactInputProtocol, TCompactOutputProtocol},
transport::{ReadHalf, TIoChannel, WriteHalf},
};
struct BufferClient {
buffer: ReadHalf<TBufferChannel>,
client: agent::AgentSyncClient<
TCompactInputProtocol<TNoopChannel>,
TCompactOutputProtocol<WriteHalf<TBufferChannel>>,
>,
}
impl fmt::Debug for BufferClient {
/// Debug info
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("BufferClient")
.field("buffer", &self.buffer)
.field("client", &"AgentSyncClient")
.finish()
}
}
/// `AgentAsyncClientUDP` implements an async version of the `TAgentSyncClient`
/// interface over UDP.
#[derive(Debug)]
pub(crate) struct AgentAsyncClientUDP {
#[cfg(all(not(feature = "async-std"), not(feature = "tokio")))]
conn: UdpSocket,
#[cfg(feature = "tokio")]
conn: tokio::net::UdpSocket,
#[cfg(all(feature = "async-std", not(feature = "tokio")))]
conn: async_std::net::UdpSocket,
buffer_client: BufferClient,
}
impl AgentAsyncClientUDP {
/// Create a new UDP agent client
pub(crate) fn new<T: ToSocketAddrs>(host_port: T) -> thrift::Result<Self> {
let (buffer, write) = TBufferChannel::with_capacity(512).split()?;
let client = agent::AgentSyncClient::new(
TCompactInputProtocol::new(TNoopChannel),
TCompactOutputProtocol::new(write),
);
let conn = UdpSocket::bind("0.0.0.0:0")?;
conn.connect(host_port)?;
Ok(AgentAsyncClientUDP {
#[cfg(all(not(feature = "async-std"), not(feature = "tokio")))]
conn,
#[cfg(feature = "tokio")]
conn: tokio::net::UdpSocket::from_std(conn)?,
#[cfg(all(feature = "async-std", not(feature = "tokio")))]
conn: async_std::net::UdpSocket::from(conn),
buffer_client: BufferClient { buffer, client },
})
}
/// Emit standard Jaeger batch
pub(crate) async fn emit_batch(&mut self, batch: jaeger::Batch) -> thrift::Result<()> {
// Write payload to buffer
self.buffer_client.client.emit_batch(batch)?;
let payload = self.buffer_client.buffer.take_bytes();
// Write async to socket, reading from buffer
write_to_socket(self, payload).await?;
Ok(())
}
}
#[cfg(all(not(feature = "async-std"), not(feature = "tokio")))]
async fn write_to_socket(client: &mut AgentAsyncClientUDP, payload: Vec<u8>) -> thrift::Result<()> {
client.conn.send(&payload)?;
Ok(())
}
#[cfg(feature = "tokio")]
async fn write_to_socket(client: &mut AgentAsyncClientUDP, payload: Vec<u8>) -> thrift::Result<()> {
client.conn.send(&payload).await?;
Ok(())
}
#[cfg(all(feature = "async-std", not(feature = "tokio")))]
async fn write_to_socket(client: &mut AgentAsyncClientUDP, payload: Vec<u8>) -> thrift::Result<()> {
client.conn.send(&payload).await?;
Ok(())
}