-
Notifications
You must be signed in to change notification settings - Fork 1.9k
/
BlockingContentProducer.java
161 lines (140 loc) · 4.96 KB
/
BlockingContentProducer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.server;
import java.util.concurrent.Semaphore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Blocking implementation of {@link ContentProducer}. Calling {@link #nextContent()} will block when
* there is no available content but will never return null.
*/
class BlockingContentProducer implements ContentProducer
{
private static final Logger LOG = LoggerFactory.getLogger(BlockingContentProducer.class);
private final Semaphore _semaphore = new Semaphore(0);
private final AsyncContentProducer _asyncContentProducer;
BlockingContentProducer(AsyncContentProducer delegate)
{
_asyncContentProducer = delegate;
}
@Override
public void recycle()
{
if (LOG.isDebugEnabled())
LOG.debug("recycling {}", this);
_asyncContentProducer.recycle();
_semaphore.drainPermits();
}
@Override
public int available()
{
return _asyncContentProducer.available();
}
@Override
public boolean hasContent()
{
return _asyncContentProducer.hasContent();
}
@Override
public boolean isError()
{
return _asyncContentProducer.isError();
}
@Override
public void checkMinDataRate()
{
_asyncContentProducer.checkMinDataRate();
}
@Override
public long getRawContentArrived()
{
return _asyncContentProducer.getRawContentArrived();
}
@Override
public boolean consumeAll(Throwable x)
{
boolean b = _asyncContentProducer.consumeAll(x);
_semaphore.release();
return b;
}
@Override
public HttpInput.Content nextContent()
{
while (true)
{
HttpInput.Content content = _asyncContentProducer.nextContent();
if (LOG.isDebugEnabled())
LOG.debug("nextContent async producer returned {}", content);
if (content != null)
return content;
// IFF isReady() returns false then HttpChannel.needContent() has been called,
// thus we know that eventually a call to onContentProducible will come.
if (_asyncContentProducer.isReady())
{
if (LOG.isDebugEnabled())
LOG.debug("nextContent async producer is ready, retrying");
continue;
}
if (LOG.isDebugEnabled())
LOG.debug("nextContent async producer is not ready, waiting on semaphore {}", _semaphore);
try
{
_semaphore.acquire();
}
catch (InterruptedException e)
{
return new HttpInput.ErrorContent(e);
}
}
}
@Override
public void reclaim(HttpInput.Content content)
{
_asyncContentProducer.reclaim(content);
}
@Override
public boolean isReady()
{
boolean ready = available() > 0;
if (LOG.isDebugEnabled())
LOG.debug("isReady = {}", ready);
return ready;
}
@Override
public HttpInput.Interceptor getInterceptor()
{
return _asyncContentProducer.getInterceptor();
}
@Override
public void setInterceptor(HttpInput.Interceptor interceptor)
{
_asyncContentProducer.setInterceptor(interceptor);
}
@Override
public boolean onContentProducible()
{
// In blocking mode, the dispatched thread normally does not have to be rescheduled as it is normally in state
// DISPATCHED blocked on the semaphore that just needs to be released for the dispatched thread to resume. This is why
// this method always returns false.
// But async errors can occur while the dispatched thread is NOT blocked reading (i.e.: in state WAITING),
// so the WAITING to WOKEN transition must be done by the error-notifying thread which then has to reschedule the
// dispatched thread after HttpChannelState.asyncError() is called.
// Calling _asyncContentProducer.wakeup() changes the channel state from WAITING to WOKEN which would prevent the
// subsequent call to HttpChannelState.asyncError() from rescheduling the thread.
// AsyncServletTest.testStartAsyncThenClientStreamIdleTimeout() tests this.
if (LOG.isDebugEnabled())
LOG.debug("onContentProducible releasing semaphore {}", _semaphore);
_semaphore.release();
return false;
}
}