Skip to content

Commit

Permalink
Merge branch 'apache:master' into apache#1843
Browse files Browse the repository at this point in the history
  • Loading branch information
DaltonModlin committed Jun 15, 2022
2 parents b5d525e + 5683ee9 commit 582d397
Show file tree
Hide file tree
Showing 45 changed files with 708 additions and 680 deletions.
22 changes: 7 additions & 15 deletions .github/workflows/miri.sh
Expand Up @@ -6,21 +6,13 @@
# rustup default nightly


export MIRIFLAGS="-Zmiri-disable-isolation"
# stacked borrows checking uses too much memory to run successfully in github actions
# re-enable if the CI is migrated to something more powerful (https://github.com/apache/arrow-rs/issues/1833)
# see also https://github.com/rust-lang/miri/issues/1367
export MIRIFLAGS="-Zmiri-disable-isolation -Zmiri-disable-stacked-borrows"
cargo miri setup
cargo clean

run_miri() {
# Currently only the arrow crate is tested with miri
# IO related tests and some unsupported tests are skipped
cargo miri test -p arrow -- --skip csv --skip ipc --skip json
}

# If MIRI fails, automatically retry
# Seems like miri is occasionally killed by the github runner
# https://github.com/apache/arrow-rs/issues/879
for i in `seq 1 5`; do
echo "Starting Arrow MIRI run..."
run_miri && break
echo "foo" > /tmp/data.txt
done
echo "Starting Arrow MIRI run..."
cargo miri test -p arrow -- --skip csv --skip ipc --skip json
echo "Miri finished with exit code $?"
45 changes: 23 additions & 22 deletions arrow-flight/src/arrow.flight.protocol.rs
Expand Up @@ -229,7 +229,7 @@ pub mod flight_service_client {
where
T: tonic::client::GrpcService<tonic::body::BoxBody>,
T::Error: Into<StdError>,
T::ResponseBody: Default + Body<Data = Bytes> + Send + 'static,
T::ResponseBody: Body<Data = Bytes> + Send + 'static,
<T::ResponseBody as Body>::Error: Into<StdError> + Send,
{
pub fn new(inner: T) -> Self {
Expand All @@ -242,6 +242,7 @@ pub mod flight_service_client {
) -> FlightServiceClient<InterceptedService<T, F>>
where
F: tonic::service::Interceptor,
T::ResponseBody: Default,
T: tonic::codegen::Service<
http::Request<tonic::body::BoxBody>,
Response = http::Response<
Expand Down Expand Up @@ -278,9 +279,9 @@ pub mod flight_service_client {
&mut self,
request: impl tonic::IntoStreamingRequest<Message = super::HandshakeRequest>,
) -> Result<
tonic::Response<tonic::codec::Streaming<super::HandshakeResponse>>,
tonic::Status,
> {
tonic::Response<tonic::codec::Streaming<super::HandshakeResponse>>,
tonic::Status,
> {
self.inner
.ready()
.await
Expand All @@ -307,9 +308,9 @@ pub mod flight_service_client {
&mut self,
request: impl tonic::IntoRequest<super::Criteria>,
) -> Result<
tonic::Response<tonic::codec::Streaming<super::FlightInfo>>,
tonic::Status,
> {
tonic::Response<tonic::codec::Streaming<super::FlightInfo>>,
tonic::Status,
> {
self.inner
.ready()
.await
Expand Down Expand Up @@ -388,9 +389,9 @@ pub mod flight_service_client {
&mut self,
request: impl tonic::IntoRequest<super::Ticket>,
) -> Result<
tonic::Response<tonic::codec::Streaming<super::FlightData>>,
tonic::Status,
> {
tonic::Response<tonic::codec::Streaming<super::FlightData>>,
tonic::Status,
> {
self.inner
.ready()
.await
Expand All @@ -417,9 +418,9 @@ pub mod flight_service_client {
&mut self,
request: impl tonic::IntoStreamingRequest<Message = super::FlightData>,
) -> Result<
tonic::Response<tonic::codec::Streaming<super::PutResult>>,
tonic::Status,
> {
tonic::Response<tonic::codec::Streaming<super::PutResult>>,
tonic::Status,
> {
self.inner
.ready()
.await
Expand All @@ -445,9 +446,9 @@ pub mod flight_service_client {
&mut self,
request: impl tonic::IntoStreamingRequest<Message = super::FlightData>,
) -> Result<
tonic::Response<tonic::codec::Streaming<super::FlightData>>,
tonic::Status,
> {
tonic::Response<tonic::codec::Streaming<super::FlightData>>,
tonic::Status,
> {
self.inner
.ready()
.await
Expand All @@ -474,9 +475,9 @@ pub mod flight_service_client {
&mut self,
request: impl tonic::IntoRequest<super::Action>,
) -> Result<
tonic::Response<tonic::codec::Streaming<super::Result>>,
tonic::Status,
> {
tonic::Response<tonic::codec::Streaming<super::Result>>,
tonic::Status,
> {
self.inner
.ready()
.await
Expand All @@ -500,9 +501,9 @@ pub mod flight_service_client {
&mut self,
request: impl tonic::IntoRequest<super::Empty>,
) -> Result<
tonic::Response<tonic::codec::Streaming<super::ActionType>>,
tonic::Status,
> {
tonic::Response<tonic::codec::Streaming<super::ActionType>>,
tonic::Status,
> {
self.inner
.ready()
.await
Expand Down
9 changes: 9 additions & 0 deletions arrow-pyarrow-integration-testing/src/lib.rs
Expand Up @@ -27,6 +27,7 @@ use arrow::array::{ArrayData, ArrayRef, Int64Array};
use arrow::compute::kernels;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::error::ArrowError;
use arrow::ffi_stream::ArrowArrayStreamReader;
use arrow::pyarrow::PyArrowConvert;
use arrow::record_batch::RecordBatch;

Expand Down Expand Up @@ -111,6 +112,13 @@ fn round_trip_record_batch(obj: RecordBatch) -> PyResult<RecordBatch> {
Ok(obj)
}

#[pyfunction]
fn round_trip_record_batch_reader(
obj: ArrowArrayStreamReader,
) -> PyResult<ArrowArrayStreamReader> {
Ok(obj)
}

#[pymodule]
fn arrow_pyarrow_integration_testing(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_wrapped(wrap_pyfunction!(double))?;
Expand All @@ -122,5 +130,6 @@ fn arrow_pyarrow_integration_testing(_py: Python, m: &PyModule) -> PyResult<()>
m.add_wrapped(wrap_pyfunction!(round_trip_schema))?;
m.add_wrapped(wrap_pyfunction!(round_trip_array))?;
m.add_wrapped(wrap_pyfunction!(round_trip_record_batch))?;
m.add_wrapped(wrap_pyfunction!(round_trip_record_batch_reader))?;
Ok(())
}
16 changes: 16 additions & 0 deletions arrow-pyarrow-integration-testing/tests/test_sql.py
Expand Up @@ -303,3 +303,19 @@ def test_dictionary_python():
assert a == b
del a
del b

def test_record_batch_reader():
"""
Python -> Rust -> Python
"""
schema = pa.schema([('ints', pa.list_(pa.int32()))], metadata={b'key1': b'value1'})
batches = [
pa.record_batch([[[1], [2, 42]]], schema),
pa.record_batch([[None, [], [5, 6]]], schema),
]
a = pa.RecordBatchReader.from_batches(schema, batches)
b = rust.round_trip_record_batch_reader(a)

assert b.schema == schema
got_batches = list(b)
assert got_batches == batches
3 changes: 1 addition & 2 deletions arrow/Cargo.toml
Expand Up @@ -46,7 +46,7 @@ rand = { version = "0.8", optional = true }
num = "0.4"
half = "1.8"
csv_crate = { version = "1.1", optional = true, package="csv" }
regex = "1.3"
regex = "1.5.6"
lazy_static = "1.4"
packed_simd = { version = "0.3", optional = true, package = "packed_simd_2" }
chrono = { version = "0.4", default-features = false, features = ["clock"] }
Expand All @@ -61,7 +61,6 @@ bitflags = "1.2.1"

[features]
default = ["csv", "ipc", "test_utils"]
avx512 = []
csv = ["csv_crate"]
ipc = ["flatbuffers"]
simd = ["packed_simd"]
Expand Down
14 changes: 14 additions & 0 deletions arrow/README.md
Expand Up @@ -100,3 +100,17 @@ cargo run --example read_csv
```

[arrow]: https://arrow.apache.org/


## Performance

Most of the compute kernels benefit a lot from being optimized for a specific CPU target.
This is especially so on x86-64 since without specifying a target the compiler can only assume support for SSE2 vector instructions.
One of the following values as `-Ctarget-cpu=value` in `RUSTFLAGS` can therefore improve performance significantly:

- `native`: Target the exact features of the cpu that the build is running on.
This should give the best performance when building and running locally, but should be used carefully for example when building in a CI pipeline or when shipping pre-compiled software.
- `x86-64-v3`: Includes AVX2 support and is close to the intel `haswell` architecture released in 2013 and should be supported by any recent Intel or Amd cpu.
- `x86-64-v4`: Includes AVX512 support available on intel `skylake` server and `icelake`/`tigerlake`/`rocketlake` laptop and desktop processors.

These flags should be used in addition to the `simd` feature, since they will also affect the code generated by the simd library.
61 changes: 53 additions & 8 deletions arrow/benches/buffer_bit_ops.rs
Expand Up @@ -17,11 +17,14 @@

#[macro_use]
extern crate criterion;
use criterion::Criterion;

use criterion::{Criterion, Throughput};

extern crate arrow;

use arrow::buffer::{Buffer, MutableBuffer};
use arrow::buffer::{
buffer_bin_and, buffer_bin_or, buffer_unary_not, Buffer, MutableBuffer,
};

/// Helper function to create arrays
fn create_buffer(size: usize) -> Buffer {
Expand All @@ -42,17 +45,59 @@ fn bench_buffer_or(left: &Buffer, right: &Buffer) {
criterion::black_box((left | right).unwrap());
}

fn bench_buffer_not(buffer: &Buffer) {
criterion::black_box(!buffer);
}

fn bench_buffer_and_with_offsets(
left: &Buffer,
left_offset: usize,
right: &Buffer,
right_offset: usize,
len: usize,
) {
criterion::black_box(buffer_bin_and(left, left_offset, right, right_offset, len));
}

fn bench_buffer_or_with_offsets(
left: &Buffer,
left_offset: usize,
right: &Buffer,
right_offset: usize,
len: usize,
) {
criterion::black_box(buffer_bin_or(left, left_offset, right, right_offset, len));
}

fn bench_buffer_not_with_offsets(buffer: &Buffer, offset: usize, len: usize) {
criterion::black_box(buffer_unary_not(buffer, offset, len));
}

fn bit_ops_benchmark(c: &mut Criterion) {
let left = create_buffer(512 * 10);
let right = create_buffer(512 * 10);

c.bench_function("buffer_bit_ops and", |b| {
b.iter(|| bench_buffer_and(&left, &right))
});
c.benchmark_group("buffer_binary_ops")
.throughput(Throughput::Bytes(3 * left.len() as u64))
.bench_function("and", |b| b.iter(|| bench_buffer_and(&left, &right)))
.bench_function("or", |b| b.iter(|| bench_buffer_or(&left, &right)))
.bench_function("and_with_offset", |b| {
b.iter(|| {
bench_buffer_and_with_offsets(&left, 1, &right, 2, left.len() * 8 - 5)
})
})
.bench_function("or_with_offset", |b| {
b.iter(|| {
bench_buffer_or_with_offsets(&left, 1, &right, 2, left.len() * 8 - 5)
})
});

c.bench_function("buffer_bit_ops or", |b| {
b.iter(|| bench_buffer_or(&left, &right))
});
c.benchmark_group("buffer_unary_ops")
.throughput(Throughput::Bytes(2 * left.len() as u64))
.bench_function("not", |b| b.iter(|| bench_buffer_not(&left)))
.bench_function("not_with_offset", |b| {
b.iter(|| bench_buffer_not_with_offsets(&left, 1, left.len() * 8 - 5))
});
}

criterion_group!(benches, bit_ops_benchmark);
Expand Down
73 changes: 0 additions & 73 deletions arrow/src/arch/avx512.rs

This file was deleted.

4 changes: 3 additions & 1 deletion arrow/src/array/array.rs
Expand Up @@ -873,7 +873,9 @@ mod tests {

#[test]
fn test_memory_size_primitive_nullable() {
let arr: PrimitiveArray<Int64Type> = (0..128).map(Some).collect();
let arr: PrimitiveArray<Int64Type> = (0..128)
.map(|i| if i % 20 == 0 { Some(i) } else { None })
.collect();
let empty_with_bitmap = PrimitiveArray::<Int64Type>::from(
ArrayData::builder(arr.data_type().clone())
.add_buffer(MutableBuffer::new(0).into())
Expand Down

0 comments on commit 582d397

Please sign in to comment.