private static class LowLevelSocketImpl.StagedInput
extends java.lang.Object
implements java.lang.Runnable
lastByte
field.LowLevelSocketImpl.ReadListener
(which waits for data in hasMoreData()
,
via the lLock
.
rLock
). The rLock
will be
notified only after a read(byte[], int, int)
call is finished.LowLevelSocketImpl.ReadListener
will always call hasMoreData()
, which will
block on the lLock
until a byte was read.
A thread wanting to read data from the socket will always call read(byte[], int, int)
, which will
block for a byte to be read using hasMoreData()
.
A thread wanting to interrogate the available bytes on the input stream will always call
getBytesAvailable()
, which will use the InputStream.available()
to determine
the available bytes.
This approach was chosen after chasing a few other solution:
InputStream.available()
on the socket's input stream to check if data
is incoming is not enough because this will not block. A solution which calls
available()
in a loop (eventually after waiting for some millis) will
pose unnecessary overhead on the CPU or it will cause delays from posting the
READ-RESPONSE event.Selector
can be used to block until data is available.
But this will force the input and output stream to be non-blocking. Also, when using
channels in blocking mode, read and write can't be used in multiple threads; if a
thread is blocking on a read, then a write will be possible from another thread only
if the read has released the channel's lock
.
Modifier and Type | Field and Description |
---|---|
private int |
id
The ID of this socket on P2J client side.
|
private java.io.InputStream |
input
The socket's input stream.
|
private int |
lastByte
The last byte read.
|
private java.lang.Object |
lLock
Lock used by the
LowLevelSocketImpl.ReadListener . |
private static int |
NO_BYTE
Value indicating that no byte was read.
|
private java.util.concurrent.CountDownLatch |
ready
The signal to use when this thread is ready.
|
private java.lang.Object |
rLock
Lock used when reading.
|
private java.net.Socket |
socket
The associated java socket.
|
Constructor and Description |
---|
StagedInput(int id,
java.net.Socket socket,
java.io.InputStream input,
java.util.concurrent.CountDownLatch ready)
Initialize this staged input stream.
|
Modifier and Type | Method and Description |
---|---|
int |
getBytesAvailable()
Determine the number of available bytes on the socket's input stream.
|
boolean |
hasMoreData()
Check if the socket has more data to be read.
|
private boolean |
hasMoreDataWorker(int timeout)
Check if the socket has more data to be read.
|
int |
read(byte[] buffer,
int pos,
int len)
Read
len number of bytes and place them in the specified buffer ,
starting with the specified position. |
void |
run()
Wait for data to be received by the socket, in a dedicated thread.
|
void |
start()
Start a new thread, in which this instance will be executed.
|
void |
terminate()
Terminate the reader.
|
private static final int NO_BYTE
private final java.lang.Object lLock
LowLevelSocketImpl.ReadListener
.private final java.lang.Object rLock
private final java.io.InputStream input
private final java.net.Socket socket
private final int id
private java.util.concurrent.CountDownLatch ready
private int lastByte
public StagedInput(int id, java.net.Socket socket, java.io.InputStream input, java.util.concurrent.CountDownLatch ready)
id
- The ID of the associated socket on P2J Client side.socket
- The associated java socket.input
- The socket's input stream.ready
- The signal to use when this thread is ready.public void run()
run
in interface java.lang.Runnable
public boolean hasMoreData()
true
if data is incoming.public int getBytesAvailable()
lastByte
too, if it was read previously.public int read(byte[] buffer, int pos, int len) throws java.io.IOException
len
number of bytes and place them in the specified buffer
,
starting with the specified position.buffer
- The buffer where to place the read data.pos
- The position in the buffer where to place the data.len
- The number of bytes to read.java.io.IOException
- In case of read errors.public void start()
Socket reader [%d]
, where %d
is replaced with the id
value.
This thread is marked as async, as all outgoing requests must be processed async, in
their own dispatcher
.
public void terminate()
private boolean hasMoreDataWorker(int timeout) throws java.net.SocketTimeoutException
timeout
- the value of millis to wait; 0
to wait indefinitely.true
if data is incoming.java.net.SocketTimeoutException
- If no byte was read.