forked from libpnet/libpnet
/
fanout.rs
112 lines (99 loc) · 3.77 KB
/
fanout.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
// Copyright (c) 2018 Berkus Decker <berkus+github@metta.systems>
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
/// This example shows simple packet_fanout processing under linux.
/// PACKET_FANOUT in linux allows to offload packet processing to multiple threads.
/// See [man 7 packet](http://man7.org/linux/man-pages/man7/packet.7.html) for more details.
extern crate pnet;
extern crate pnet_datalink;
use std::io::{self, Write};
use std::process;
#[cfg(not(target_os = "linux"))]
fn main() {
writeln!(io::stderr(), "fanout is only supported on Linux").unwrap();
process::exit(1);
}
#[cfg(target_os = "linux")]
fn main() {
use pnet::datalink::Channel::Ethernet;
use pnet::datalink::{self, Config, FanoutOption, FanoutType, NetworkInterface};
use std::env;
use std::thread;
let iface_name = match env::args().nth(1) {
Some(n) => n,
None => {
writeln!(io::stderr(), "USAGE: fanout <NETWORK INTERFACE> [hash|*round-robin*|cpu|rollover|rnd|qm|cbpf|ebpf] [group-id:123]").unwrap();
process::exit(1);
}
};
let interface_names_match = |iface: &NetworkInterface| iface.name == iface_name;
// Find the network interface with the provided name
let interfaces = datalink::linux::interfaces();
let interface = interfaces
.into_iter()
.filter(interface_names_match)
.next()
.unwrap();
let fanout_type = match env::args().nth(2) {
Some(n) => match n.to_lowercase().as_str() {
"hash" => FanoutType::HASH,
"round-robin" => FanoutType::LB,
"cpu" => FanoutType::CPU,
"rollover" => FanoutType::ROLLOVER,
"rnd" => FanoutType::RND,
"qm" => FanoutType::QM,
"cbpf" => FanoutType::CBPF,
"ebpf" => FanoutType::EBPF,
_ => panic!("Unsupported fanout type, use one of hash, round-robin, cpu, rollover, rnd, qm, cbpf or ebpf")
},
None => FanoutType::LB,
};
let group_id = match env::args().nth(3) {
Some(n) => n.parse::<u16>().unwrap(),
None => 123,
};
let mut config: Config = Default::default();
config.linux_fanout = Some(FanoutOption {
group_id: group_id,
fanout_type: fanout_type,
defrag: true,
rollover: false,
});
let mut threads = vec![];
for x in 0..3 {
let itf = interface.clone();
let thread = thread::Builder::new()
.name(format!("thread{}", x))
.spawn(move || {
// Create a channel to receive on
let (_, mut rx) = match datalink::channel(&itf, config) {
Ok(Ethernet(tx, rx)) => (tx, rx),
Ok(_) => panic!("packetdump: unhandled channel type"),
Err(e) => panic!("packetdump: unable to create channel: {}", e),
};
let handle = thread::current();
loop {
match rx.next() {
Ok(_packet) => {
writeln!(
io::stdout(),
"Received packet on thread {:?}",
handle.name()
)
.unwrap();
}
Err(e) => panic!("packetdump: unable to receive packet: {}", e),
}
}
})
.unwrap();
threads.push(thread);
}
for t in threads {
t.join().unwrap();
}
}