Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New HTTP tunnel code moved to org.jboss.netty.channel.socket.http as requested #12

Closed
wants to merge 7 commits into from
8 changes: 7 additions & 1 deletion pom.xml
Expand Up @@ -177,6 +177,12 @@
<version>2.5.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jmock</groupId>
<artifactId>jmock-junit4</artifactId>
<version>2.5.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
Expand Down Expand Up @@ -254,7 +260,7 @@
<artifactId>maven-surefire-plugin</artifactId>
<version>2.7.2</version>
<configuration>
<forkMode>never</forkMode>
<forkMode>once</forkMode>
<excludes>
<exclude>**/Abstract*</exclude>
<exclude>**/TestUtil*</exclude>
Expand Down
@@ -0,0 +1,56 @@
/*
* Copyright 2009 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package org.jboss.netty.channel.socket.http;

import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
import org.jboss.netty.handler.codec.http.HttpResponseEncoder;

/**
* Creates pipelines for incoming http tunnel connections, capable of decoding the incoming HTTP
* requests, determining their type (client sending data, client polling data, or unknown) and
* handling them appropriately.
*
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Iain McGinniss (iain.mcginniss@onedrum.com)
* @author OneDrum Ltd.
*/
class AcceptedServerChannelPipelineFactory implements ChannelPipelineFactory {

private final ServerMessageSwitch messageSwitch;

public AcceptedServerChannelPipelineFactory(
ServerMessageSwitch messageSwitch) {
this.messageSwitch = messageSwitch;
}

public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();

pipeline.addLast("httpResponseEncoder", new HttpResponseEncoder());
pipeline.addLast("httpRequestDecoder", new HttpRequestDecoder());
pipeline.addLast("httpChunkAggregator", new HttpChunkAggregator(
HttpTunnelMessageUtils.MAX_BODY_SIZE));
pipeline.addLast("messageSwitchClient",
new AcceptedServerChannelRequestDispatch(messageSwitch));

return pipeline;
}
}
@@ -0,0 +1,182 @@
/*
* Copyright 2009 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.http;

import java.net.InetSocketAddress;
import java.net.SocketAddress;

import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;

/**
* Upstream handler which is responsible for determining whether a received HTTP request is a legal
* tunnel request, and if so, invoking the appropriate request method on the
* {@link ServerMessageSwitch} to service the request.
*
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Iain McGinniss (iain.mcginniss@onedrum.com)
* @author OneDrum Ltd.
*/
class AcceptedServerChannelRequestDispatch extends SimpleChannelUpstreamHandler {

public static final String NAME = "AcceptedServerChannelRequestDispatch";

private static final InternalLogger LOG = InternalLoggerFactory
.getInstance(AcceptedServerChannelRequestDispatch.class);

private final ServerMessageSwitchUpstreamInterface messageSwitch;

public AcceptedServerChannelRequestDispatch(
ServerMessageSwitchUpstreamInterface messageSwitch) {
this.messageSwitch = messageSwitch;
}

@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
HttpRequest request = (HttpRequest) e.getMessage();

if (HttpTunnelMessageUtils.isOpenTunnelRequest(request)) {
handleOpenTunnel(ctx);
} else if (HttpTunnelMessageUtils.isSendDataRequest(request)) {
handleSendData(ctx, request);
} else if (HttpTunnelMessageUtils.isReceiveDataRequest(request)) {
handleReceiveData(ctx, request);
} else if (HttpTunnelMessageUtils.isCloseTunnelRequest(request)) {
handleCloseTunnel(ctx, request);
} else {
respondWithRejection(ctx, request,
"invalid request to netty HTTP tunnel gateway");
}
}

private void handleOpenTunnel(ChannelHandlerContext ctx) {
String tunnelId =
messageSwitch.createTunnel((InetSocketAddress) ctx.getChannel()
.getRemoteAddress());
if (LOG.isDebugEnabled()) {
LOG.debug("open tunnel request received from " +
ctx.getChannel().getRemoteAddress() + " - allocated ID " +
tunnelId);
}
respondWith(ctx,
HttpTunnelMessageUtils.createTunnelOpenResponse(tunnelId));
}

private void handleCloseTunnel(ChannelHandlerContext ctx,
HttpRequest request) {
String tunnelId = checkTunnelId(request, ctx);
if (tunnelId == null) {
return;
}

if (LOG.isDebugEnabled()) {
LOG.debug("close tunnel request received for tunnel " + tunnelId);
}
messageSwitch.clientCloseTunnel(tunnelId);
respondWith(ctx, HttpTunnelMessageUtils.createTunnelCloseResponse())
.addListener(ChannelFutureListener.CLOSE);
}

private void handleSendData(ChannelHandlerContext ctx, HttpRequest request) {
String tunnelId = checkTunnelId(request, ctx);
if (tunnelId == null) {
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug("send data request received for tunnel " + tunnelId);
}

if (HttpHeaders.getContentLength(request) == 0 ||
request.getContent() == null ||
request.getContent().readableBytes() == 0) {
respondWithRejection(ctx, request,
"Send data requests must contain data");
return;
}

messageSwitch.routeInboundData(tunnelId, request.getContent());
respondWith(ctx, HttpTunnelMessageUtils.createSendDataResponse());
}

private void handleReceiveData(ChannelHandlerContext ctx,
HttpRequest request) {
String tunnelId = checkTunnelId(request, ctx);
if (tunnelId == null) {
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug("poll data request received for tunnel " + tunnelId);
}
messageSwitch.pollOutboundData(tunnelId, ctx.getChannel());
}

private String checkTunnelId(HttpRequest request, ChannelHandlerContext ctx) {
String tunnelId = HttpTunnelMessageUtils.extractTunnelId(request);
if (tunnelId == null) {
respondWithRejection(ctx, request,
"no tunnel id specified in request");
} else if (!messageSwitch.isOpenTunnel(tunnelId)) {
respondWithRejection(ctx, request,
"specified tunnel is either closed or does not exist");
return null;
}

return tunnelId;
}

/**
* Sends the provided response back on the channel, returning the created ChannelFuture
* for this operation.
*/
private ChannelFuture respondWith(ChannelHandlerContext ctx,
HttpResponse response) {
ChannelFuture writeFuture = Channels.future(ctx.getChannel());
Channels.write(ctx, writeFuture, response);
return writeFuture;
}

/**
* Sends an HTTP 400 message back to on the channel with the specified error message, and asynchronously
* closes the channel after this is successfully sent.
*/
private void respondWithRejection(ChannelHandlerContext ctx,
HttpRequest rejectedRequest, String errorMessage) {
if (LOG.isWarnEnabled()) {
SocketAddress remoteAddress = ctx.getChannel().getRemoteAddress();
String tunnelId =
HttpTunnelMessageUtils.extractTunnelId(rejectedRequest);
if (tunnelId == null) {
tunnelId = "<UNKNOWN>";
}
LOG.warn("Rejecting request from " + remoteAddress +
" representing tunnel " + tunnelId + ": " + errorMessage);
}
HttpResponse rejection =
HttpTunnelMessageUtils.createRejection(rejectedRequest,
errorMessage);
respondWith(ctx, rejection).addListener(ChannelFutureListener.CLOSE);
}
}
@@ -0,0 +1,71 @@
/*
* Copyright 2009 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package org.jboss.netty.channel.socket.http;

import java.util.HashSet;
import java.util.Set;

import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;

/**
* Class which is used to consolidate multiple channel futures into one, by
* listening to the individual futures and producing an aggregated result
* (success/failure) when all futures have completed.
*
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Iain McGinniss (iain.mcginniss@onedrum.com)
* @author OneDrum Ltd.
*/
class ChannelFutureAggregator implements ChannelFutureListener {

private final ChannelFuture aggregateFuture;

private final Set<ChannelFuture> pendingFutures;

public ChannelFutureAggregator(ChannelFuture aggregateFuture) {
this.aggregateFuture = aggregateFuture;
pendingFutures = new HashSet<ChannelFuture>();
}

public void addFuture(ChannelFuture future) {
pendingFutures.add(future);
future.addListener(this);
}

public synchronized void operationComplete(ChannelFuture future)
throws Exception {
if (future.isCancelled()) {
// TODO: what should the correct behaviour be when a fragment is cancelled?
// cancel all outstanding fragments and cancel the aggregate?
return;
}

pendingFutures.remove(future);
if (!future.isSuccess()) {
aggregateFuture.setFailure(future.getCause());
for (ChannelFuture pendingFuture: pendingFutures) {
pendingFuture.cancel();
}
return;
}

if (pendingFutures.isEmpty()) {
aggregateFuture.setSuccess();
}
}
}
@@ -0,0 +1,49 @@
/*
* Copyright 2009 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package org.jboss.netty.channel.socket.http;

import java.security.SecureRandom;

/**
* Default implementation of TunnelIdGenerator, which uses a
* {@link java.security.SecureRandom SecureRandom} generator
* to produce 32-bit tunnel identifiers.
*
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Iain McGinniss (iain.mcginniss@onedrum.com)
* @author OneDrum Ltd.
*/
public class DefaultTunnelIdGenerator implements TunnelIdGenerator {

private SecureRandom generator;

public DefaultTunnelIdGenerator() {
this(new SecureRandom());
}

public DefaultTunnelIdGenerator(SecureRandom generator) {
this.generator = generator;
}

public synchronized String generateId() {
// synchronized to ensure that this code is thread safe. The Sun
// standard implementations seem to be synchronized or lock free
// but are not documented as guaranteeing this
return Integer.toHexString(generator.nextInt());
}

}