Skip to content

Commit

Permalink
When dumping to IO, dump directly
Browse files Browse the repository at this point in the history
Json.dump allows you to pass an IO to which the dump output will
be sent, but it still buffers the entire output in memory before
sending it to the given IO. This leads to issues on JRuby like
jruby/jruby#6265 when it tries to create a byte[] that exceeds the
maximum size of a signed int (JVM's array size limit).

This commit plumbs the IO all the way through the generation logic
so that it can be written to directly without filling a temporary
memory buffer first. This allow JRuby to dump object graphs that
would normally produce more content than the JVM can hold in a
single array, providing a workaround for jruby/jruby#6265.

It is unfortunately a bit slow to dump directly to IO due to the
many small writes that all acquire locks and participate in the
IO encoding subsystem. A more direct path that can skip some of
these pieces could be more competitive with the in-memory version,
but functionally it expands the size of graphs that cana be dumped
when using JRuby.

See flori#524
  • Loading branch information
headius committed Aug 15, 2023
1 parent c233be9 commit 7576f5c
Show file tree
Hide file tree
Showing 8 changed files with 228 additions and 128 deletions.
21 changes: 12 additions & 9 deletions java/src/json/ext/ByteListTranscoder.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
import org.jruby.runtime.ThreadContext;
import org.jruby.util.ByteList;

import java.io.IOException;
import java.io.OutputStream;

/**
* A class specialized in transcoding a certain String format into another,
* using UTF-8 ByteLists as both input and output.
Expand All @@ -23,7 +26,7 @@ abstract class ByteListTranscoder {
/** Position of the next character to read */
protected int pos;

private ByteList out;
private OutputStream out;
/**
* When a character that can be copied straight into the output is found,
* its index is stored on this variable, and copying is delayed until
Expand All @@ -37,11 +40,11 @@ protected ByteListTranscoder(ThreadContext context) {
this.context = context;
}

protected void init(ByteList src, ByteList out) {
protected void init(ByteList src, OutputStream out) {
this.init(src, 0, src.length(), out);
}

protected void init(ByteList src, int start, int end, ByteList out) {
protected void init(ByteList src, int start, int end, OutputStream out) {
this.src = src;
this.pos = start;
this.charStart = start;
Expand Down Expand Up @@ -142,19 +145,19 @@ protected void quoteStart() {
* recently read character, or {@link #charStart} to quote
* until the character before it.
*/
protected void quoteStop(int endPos) {
protected void quoteStop(int endPos) throws IOException {
if (quoteStart != -1) {
out.append(src, quoteStart, endPos - quoteStart);
out.write(src.unsafeBytes(), src.begin() + quoteStart, src.begin() + endPos - quoteStart);
quoteStart = -1;
}
}

protected void append(int b) {
out.append(b);
protected void append(int b) throws IOException {
out.write(b);
}

protected void append(byte[] origin, int start, int length) {
out.append(origin, start, length);
protected void append(byte[] origin, int start, int length) throws IOException {
out.write(origin, start, length);
}


Expand Down
148 changes: 100 additions & 48 deletions java/src/json/ext/Generator.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
*/
package json.ext;

import org.jcodings.specific.USASCIIEncoding;
import org.jcodings.specific.UTF8Encoding;
import org.jruby.Ruby;
import org.jruby.RubyArray;
import org.jruby.RubyBasicObject;
Expand All @@ -13,10 +15,17 @@
import org.jruby.RubyFixnum;
import org.jruby.RubyFloat;
import org.jruby.RubyHash;
import org.jruby.RubyIO;
import org.jruby.RubyString;
import org.jruby.runtime.Helpers;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.jruby.util.ByteList;
import org.jruby.util.IOOutputStream;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;

public final class Generator {
private Generator() {
Expand Down Expand Up @@ -55,6 +64,18 @@ private Generator() {
return handler.generateNew(session, object);
}

/**
* Encodes the given object as a JSON string, as in other forms, but
* outputs directly to the given stream
*/
public static <T extends IRubyObject> void
generateJson(ThreadContext context, T object,
GeneratorState config, OutputStream out) {
Session session = new Session(context, config);
Handler<? super T> handler = getHandlerFor(context.runtime, object);
handler.generateNew(session, object, out);
}

/**
* Returns the best serialization handler for the given object.
*/
Expand Down Expand Up @@ -159,6 +180,16 @@ public <T extends IRubyObject> T infect(T object) {

/* Handler base classes */

static class ByteListOutputStream extends ByteArrayOutputStream {
public ByteListOutputStream(int size) {
super(size);
}

public ByteList toByteListDirect() {
return new ByteList(buf, 0, count);
}
}

private static abstract class Handler<T extends IRubyObject> {
/**
* Returns an estimative of how much space the serialization of the
Expand All @@ -171,16 +202,33 @@ int guessSize(Session session, T object) {

RubyString generateNew(Session session, T object) {
RubyString result;
ByteList buffer = new ByteList(guessSize(session, object));
generate(session, object, buffer);
result = RubyString.newString(session.getRuntime(), buffer);
ByteListOutputStream blos = new ByteListOutputStream(guessSize(session, object));
generateNew(session, object, blos);
result = RubyString.newString(session.getRuntime(), blos.toByteListDirect());
ThreadContext context = session.getContext();
RuntimeInfo info = session.getInfo();
result.force_encoding(context, info.utf8.get());
return result;
}

abstract void generate(Session session, T object, ByteList buffer);
void generateNew(Session session, T object, RubyIO buffer) {
buffer.setEnc2(UTF8Encoding.INSTANCE);
generateNew(session, object, buffer);
}

void generateNew(Session session, T object, OutputStream buffer) {
try {
generate(session, object, buffer);
} catch (IOException ioe) {
throw Helpers.newIOErrorFromException(session.getRuntime(), ioe);
}
}

abstract void generate(Session session, T object, OutputStream os) throws IOException;

protected void writeByteList(OutputStream os, ByteList byteList) throws IOException {
os.write(byteList.unsafeBytes(), byteList.begin(), byteList.realSize());
}
}

/**
Expand All @@ -205,8 +253,8 @@ RubyString generateNew(Session session, T object) {
}

@Override
void generate(Session session, T object, ByteList buffer) {
buffer.append(keyword);
void generate(Session session, T object, OutputStream buffer) throws IOException {
writeByteList(buffer, keyword);
}
}

Expand All @@ -216,26 +264,27 @@ void generate(Session session, T object, ByteList buffer) {
static final Handler<RubyBignum> BIGNUM_HANDLER =
new Handler<RubyBignum>() {
@Override
void generate(Session session, RubyBignum object, ByteList buffer) {
void generate(Session session, RubyBignum object, OutputStream buffer) throws IOException {
// JRUBY-4751: RubyBignum.to_s() returns generic object
// representation (fixed in 1.5, but we maintain backwards
// compatibility; call to_s(IRubyObject[]) then
buffer.append(((RubyString)object.to_s(IRubyObject.NULL_ARRAY)).getByteList());
byte[] bigIntStr = object.getBigIntegerValue().toString().getBytes();
buffer.write(bigIntStr, 0, bigIntStr.length);
}
};

static final Handler<RubyFixnum> FIXNUM_HANDLER =
new Handler<RubyFixnum>() {
@Override
void generate(Session session, RubyFixnum object, ByteList buffer) {
buffer.append(object.to_s().getByteList());
void generate(Session session, RubyFixnum object, OutputStream buffer) throws IOException {
writeByteList(buffer, object.to_s().getByteList());
}
};

static final Handler<RubyFloat> FLOAT_HANDLER =
new Handler<RubyFloat>() {
@Override
void generate(Session session, RubyFloat object, ByteList buffer) {
void generate(Session session, RubyFloat object, OutputStream buffer) throws IOException {
double value = RubyFloat.num2dbl(object);

if (Double.isInfinite(value) || Double.isNaN(value)) {
Expand All @@ -245,7 +294,7 @@ void generate(Session session, RubyFloat object, ByteList buffer) {
object + " not allowed in JSON");
}
}
buffer.append(((RubyString)object.to_s()).getByteList());
writeByteList(buffer, ((RubyString)object.to_s()).getByteList());
}
};

Expand All @@ -263,7 +312,7 @@ int guessSize(Session session, RubyArray object) {
}

@Override
void generate(Session session, RubyArray object, ByteList buffer) {
void generate(Session session, RubyArray object, OutputStream buffer) throws IOException {
ThreadContext context = session.getContext();
Ruby runtime = context.getRuntime();
GeneratorState state = session.getState();
Expand All @@ -280,29 +329,29 @@ void generate(Session session, RubyArray object, ByteList buffer) {

session.infectBy(object);

buffer.append((byte)'[');
buffer.append(arrayNl);
buffer.write((byte)'[');
buffer.write(arrayNl.unsafeBytes());
boolean firstItem = true;
for (int i = 0, t = object.getLength(); i < t; i++) {
IRubyObject element = object.eltInternal(i);
session.infectBy(element);
if (firstItem) {
firstItem = false;
} else {
buffer.append(delim);
buffer.write(delim);
}
buffer.append(shift);
buffer.write(shift);
Handler<IRubyObject> handler = (Handler<IRubyObject>) getHandlerFor(runtime, element);
handler.generate(session, element, buffer);
}

state.decreaseDepth();
if (arrayNl.length() != 0) {
buffer.append(arrayNl);
buffer.append(shift, 0, state.getDepth() * indentUnit.length());
buffer.write(arrayNl.unsafeBytes());
buffer.write(shift, 0, state.getDepth() * indentUnit.length());
}

buffer.append((byte)']');
buffer.write((byte)']');
}
};

Expand All @@ -321,7 +370,7 @@ int guessSize(Session session, RubyHash object) {

@Override
void generate(final Session session, RubyHash object,
final ByteList buffer) {
final OutputStream buffer) throws IOException {
ThreadContext context = session.getContext();
final Ruby runtime = context.getRuntime();
final GeneratorState state = session.getState();
Expand All @@ -332,39 +381,43 @@ void generate(final Session session, RubyHash object,
final ByteList spaceBefore = state.getSpaceBefore();
final ByteList space = state.getSpace();

buffer.append((byte)'{');
buffer.append(objectNl);
buffer.write((byte)'{');
buffer.write(objectNl.unsafeBytes());

final boolean[] firstPair = new boolean[]{true};
object.visitAll(new RubyHash.Visitor() {
@Override
public void visit(IRubyObject key, IRubyObject value) {
if (firstPair[0]) {
firstPair[0] = false;
} else {
buffer.append((byte)',');
buffer.append(objectNl);
try {
if (firstPair[0]) {
firstPair[0] = false;
} else {
buffer.write((byte) ',');
buffer.write(objectNl.unsafeBytes());
}
if (objectNl.length() != 0) buffer.write(indent);

STRING_HANDLER.generate(session, key.asString(), buffer);
session.infectBy(key);

buffer.write(spaceBefore.unsafeBytes());
buffer.write((byte) ':');
buffer.write(space.unsafeBytes());

Handler<IRubyObject> valueHandler = (Handler<IRubyObject>) getHandlerFor(runtime, value);
valueHandler.generate(session, value, buffer);
session.infectBy(value);
} catch (IOException ioe) {
throw Helpers.newIOErrorFromException(session.getRuntime(), ioe);
}
if (objectNl.length() != 0) buffer.append(indent);

STRING_HANDLER.generate(session, key.asString(), buffer);
session.infectBy(key);

buffer.append(spaceBefore);
buffer.append((byte)':');
buffer.append(space);

Handler<IRubyObject> valueHandler = (Handler<IRubyObject>) getHandlerFor(runtime, value);
valueHandler.generate(session, value, buffer);
session.infectBy(value);
}
});
state.decreaseDepth();
if (!firstPair[0] && objectNl.length() != 0) {
buffer.append(objectNl);
buffer.write(objectNl.unsafeBytes());
}
buffer.append(Utils.repeat(state.getIndent(), state.getDepth()));
buffer.append((byte)'}');
buffer.write(Utils.repeat(state.getIndent(), state.getDepth()));
buffer.write((byte)'}');
}
};

Expand All @@ -379,7 +432,7 @@ int guessSize(Session session, RubyString object) {
}

@Override
void generate(Session session, RubyString object, ByteList buffer) {
void generate(Session session, RubyString object, OutputStream buffer) throws IOException {
RuntimeInfo info = session.getInfo();
RubyString src;

Expand Down Expand Up @@ -414,7 +467,7 @@ RubyString generateNew(Session session, IRubyObject object) {
}

@Override
void generate(Session session, IRubyObject object, ByteList buffer) {
void generate(Session session, IRubyObject object, OutputStream buffer) throws IOException {
RubyString str = object.asString();
STRING_HANDLER.generate(session, str, buffer);
}
Expand All @@ -439,9 +492,8 @@ RubyString generateNew(Session session, IRubyObject object) {
}

@Override
void generate(Session session, IRubyObject object, ByteList buffer) {
RubyString result = generateNew(session, object);
buffer.append(result.getByteList());
void generate(Session session, IRubyObject object, OutputStream buffer) throws IOException {
generateNew(session, object, buffer);
}
};
}

0 comments on commit 7576f5c

Please sign in to comment.