Skip to content

Commit

Permalink
Merge branch 'non-blocking' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
ohler55 committed Jan 5, 2022
2 parents 5055dc2 + d969b87 commit 34dce22
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 48 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
@@ -1,5 +1,8 @@
# CHANGELOG

## 3.13.11 - 2022-01-05

- Fixed write blocking failures on writes to a slow stream with larger writes.

## 3.13.10 - 2021-12-12

Expand Down
91 changes: 47 additions & 44 deletions ext/oj/dump.c
Expand Up @@ -10,6 +10,9 @@
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#if !IS_WINDOWS
#include <poll.h>
#endif

#include "cache8.h"
#include "odd.h"
Expand Down Expand Up @@ -113,12 +116,10 @@ static char rails_friendly_chars[256] = "\
11111111111111111111111111111111";

static void raise_strict(VALUE obj) {
rb_raise(rb_eTypeError,
"Failed to dump %s Object to JSON in strict mode.",
rb_class2name(rb_obj_class(obj)));
rb_raise(rb_eTypeError, "Failed to dump %s Object to JSON in strict mode.", rb_class2name(rb_obj_class(obj)));
}

inline static size_t calculate_string_size(const uint8_t *str, size_t len, const char * table) {
inline static size_t calculate_string_size(const uint8_t *str, size_t len, const char *table) {
size_t size = 0;
size_t i = len;

Expand Down Expand Up @@ -521,14 +522,7 @@ void oj_dump_xml_time(VALUE obj, Out out) {
}
if ((0 == nsec && !out->opts->sec_prec_set) || 0 == out->opts->sec_prec) {
if (0 == tzsecs && rb_funcall2(obj, oj_utcq_id, 0, 0)) {
int len = sprintf(buf,
"%04d-%02d-%02dT%02d:%02d:%02dZ",
ti.year,
ti.mon,
ti.day,
ti.hour,
ti.min,
ti.sec);
int len = sprintf(buf, "%04d-%02d-%02dT%02d:%02d:%02dZ", ti.year, ti.mon, ti.day, ti.hour, ti.min, ti.sec);
oj_dump_cstr(buf, len, 0, 0, out);
} else {
int len = sprintf(buf,
Expand Down Expand Up @@ -560,18 +554,7 @@ void oj_dump_xml_time(VALUE obj, Out out) {
if (9 > out->opts->sec_prec) {
format[32] = '0' + out->opts->sec_prec;
}
len = sprintf(buf,
format,
ti.year,
ti.mon,
ti.day,
ti.hour,
ti.min,
ti.sec,
(long)nsec,
tzsign,
tzhour,
tzmin);
len = sprintf(buf, format, ti.year, ti.mon, ti.day, ti.hour, ti.min, ti.sec, (long)nsec, tzsign, tzhour, tzmin);
oj_dump_cstr(buf, len, 0, 0, out);
}
}
Expand Down Expand Up @@ -652,6 +635,21 @@ void oj_write_obj_to_file(VALUE obj, const char *path, Options copts) {
}
}

static void write_ready(int fd) {
struct pollfd pp;
int i;

pp.fd = fd;
pp.events = POLLERR | POLLOUT;
pp.revents = 0;
if (0 >= (i = poll(&pp, 1, 5000))) {
if (0 == i || EAGAIN == errno) {
rb_raise(rb_eIOError, "write timed out");
}
rb_raise(rb_eIOError, "write failed. %d %s.", errno, strerror(errno));
}
}

void oj_write_obj_to_stream(VALUE obj, VALUE stream, Options copts) {
char buf[4096];
struct _out out;
Expand All @@ -671,13 +669,24 @@ void oj_write_obj_to_stream(VALUE obj, VALUE stream, Options copts) {
if (oj_stringio_class == clas) {
rb_funcall(stream, oj_write_id, 1, rb_str_new(out.buf, size));
#if !IS_WINDOWS
} else if (rb_respond_to(stream, oj_fileno_id) &&
Qnil != (s = rb_funcall(stream, oj_fileno_id, 0)) && 0 != (fd = FIX2INT(s))) {
if (size != write(fd, out.buf, size)) {
if (out.allocated) {
xfree(out.buf);
} else if (rb_respond_to(stream, oj_fileno_id) && Qnil != (s = rb_funcall(stream, oj_fileno_id, 0)) &&
0 != (fd = FIX2INT(s))) {
ssize_t cnt;
ssize_t total = 0;

while (true) {
if (0 > (cnt = write(fd, out.buf + total, size - total))) {
if (EAGAIN != errno) {
rb_raise(rb_eIOError, "write failed. %d %s.", errno, strerror(errno));
break;
}
}
total += cnt;
if (size <= total) {
// Completed
break;
}
rb_raise(rb_eIOError, "Write failed. [%d:%s]", errno, strerror(errno));
write_ready(fd);
}
#endif
} else if (rb_respond_to(stream, oj_write_id)) {
Expand Down Expand Up @@ -821,8 +830,7 @@ void oj_dump_cstr(const char *str, size_t cnt, bool is_sym, bool escape1, Out ou
for (; str < end; str++) {
switch (cmap[(uint8_t)*str]) {
case '1':
if ((JXEsc == out->opts->escape_mode || RailsXEsc == out->opts->escape_mode) &&
check_start <= str) {
if ((JXEsc == out->opts->escape_mode || RailsXEsc == out->opts->escape_mode) && check_start <= str) {
if (0 != (0x80 & (uint8_t)*str)) {
if (0xC0 == (0xC0 & (uint8_t)*str)) {
check_start = check_unicode(str, end, orig);
Expand All @@ -846,11 +854,9 @@ void oj_dump_cstr(const char *str, size_t cnt, bool is_sym, bool escape1, Out ou
}
break;
case '3': // Unicode
if (0xe2 == (uint8_t)*str &&
(JXEsc == out->opts->escape_mode || RailsXEsc == out->opts->escape_mode) &&
if (0xe2 == (uint8_t)*str && (JXEsc == out->opts->escape_mode || RailsXEsc == out->opts->escape_mode) &&
2 <= end - str) {
if (0x80 == (uint8_t)str[1] &&
(0xa8 == (uint8_t)str[2] || 0xa9 == (uint8_t)str[2])) {
if (0x80 == (uint8_t)str[1] && (0xa8 == (uint8_t)str[2] || 0xa9 == (uint8_t)str[2])) {
str = dump_unicode(str, end, out, orig);
} else {
check_start = check_unicode(str, end, orig);
Expand All @@ -869,10 +875,8 @@ void oj_dump_cstr(const char *str, size_t cnt, bool is_sym, bool escape1, Out ou
dump_hex((uint8_t)*str, out);
} else {
if (0xe2 == (uint8_t)*str &&
(JXEsc == out->opts->escape_mode || RailsXEsc == out->opts->escape_mode) &&
2 <= end - str) {
if (0x80 == (uint8_t)str[1] &&
(0xa8 == (uint8_t)str[2] || 0xa9 == (uint8_t)str[2])) {
(JXEsc == out->opts->escape_mode || RailsXEsc == out->opts->escape_mode) && 2 <= end - str) {
if (0x80 == (uint8_t)str[1] && (0xa8 == (uint8_t)str[2] || 0xa9 == (uint8_t)str[2])) {
str = dump_unicode(str, end, out, orig);
} else {
check_start = check_unicode(str, end, orig);
Expand All @@ -888,8 +892,8 @@ void oj_dump_cstr(const char *str, size_t cnt, bool is_sym, bool escape1, Out ou
}
*out->cur++ = '"';
}
if ((JXEsc == out->opts->escape_mode || RailsXEsc == out->opts->escape_mode) &&
0 < str - orig && 0 != (0x80 & *(str - 1))) {
if ((JXEsc == out->opts->escape_mode || RailsXEsc == out->opts->escape_mode) && 0 < str - orig &&
0 != (0x80 & *(str - 1))) {
uint8_t c = (uint8_t) * (str - 1);
int i;
int scnt = (int)(str - orig);
Expand Down Expand Up @@ -1050,8 +1054,7 @@ void oj_dump_bignum(VALUE obj, int depth, Out out, bool as_ok) {
int cnt = (int)RSTRING_LEN(rs);
bool dump_as_string = false;

if (out->opts->int_range_max != 0 ||
out->opts->int_range_min != 0) { // Bignum cannot be inside of Fixnum range
if (out->opts->int_range_max != 0 || out->opts->int_range_min != 0) { // Bignum cannot be inside of Fixnum range
dump_as_string = true;
assure_size(out, cnt + 2);
*out->cur++ = '"';
Expand Down
2 changes: 1 addition & 1 deletion lib/oj/version.rb
@@ -1,5 +1,5 @@

module Oj
# Current version of the module.
VERSION = '3.13.10'
VERSION = '3.13.11'
end
5 changes: 5 additions & 0 deletions notes
Expand Up @@ -13,6 +13,11 @@
- need to handle auto create
- return Qundef does not store

- stream writes
- dump.c:685
- stream_writer.c
- use io C calls instead

- future
- object mode delegate

Expand Down
4 changes: 4 additions & 0 deletions test/test_all.sh
Expand Up @@ -3,6 +3,10 @@
echo "----- General tests (tests.rb) -----"
ruby tests.rb

echo "----- Various tests (test_various.rb) -----"
# test_various forks which causes issues with other tests.
ruby test_various.rb

echo "----- Parser(:saj) tests (test_parser_saj.rb) -----"
ruby test_parser_saj.rb

Expand Down
29 changes: 27 additions & 2 deletions test/test_various.rb
Expand Up @@ -528,7 +528,7 @@ def test_bag
assert_equal(58, obj.y)
end

# Stream Deeply Nested
# Stream Deeply Nested
def test_deep_nest_dump
begin
a = []
Expand All @@ -541,7 +541,7 @@ def test_deep_nest_dump
assert(false, "*** expected an exception")
end

# Stream IO
# Stream IO
def test_io_string
src = { 'x' => true, 'y' => 58, 'z' => [1, 2, 3]}
output = StringIO.open("", "w+")
Expand All @@ -553,6 +553,9 @@ def test_io_string
end

def test_io_file
# Windows does not support fork
return if RbConfig::CONFIG['host_os'] =~ /(mingw|mswin)/

src = { 'x' => true, 'y' => 58, 'z' => [1, 2, 3]}
filename = File.join(File.dirname(__FILE__), 'open_file_test.json')
File.open(filename, "w") { |f|
Expand All @@ -564,6 +567,28 @@ def test_io_file
assert_equal(src, obj)
end

def test_io_stream
IO.pipe do |r, w|
if fork
r.close
#w.nonblock = false
a = []
10_000.times do |i|
a << i
end
Oj.to_stream(w, a, indent: 2)
w.close
else
w.close
sleep(0.1) # to force a busy
cnt = 0
r.each_line { cnt += 1 }
r.close
Process.exit(0)
end
end
end

# comments
def test_comment_slash
json = %{{
Expand Down
1 change: 0 additions & 1 deletion test/tests.rb
Expand Up @@ -18,7 +18,6 @@
require 'test_saj'
require 'test_scp'
require 'test_strict'
require 'test_various'
require 'test_rails'
require 'test_wab'
require 'test_writer'
Expand Down

0 comments on commit 34dce22

Please sign in to comment.