final class Queue extends java.lang.Object implements LowLevelSession, MessageTypes
Dispatcher
, Router
and Protocol
.Modifier and Type | Field and Description |
---|---|
private java.lang.Object |
cleanupLock
A lock used by all sessions which require to cleanup the context.
|
private Conversation |
conversation
Dedicated conversation mode thread.
|
private static java.lang.ThreadGroup |
convThreads
thread group for the conversation threads
|
private Control |
ctrl
Interrupt manager.
|
private boolean |
dead
Status flag.
|
private java.lang.Exception |
exception
Queue termination exception.
|
private static ThreadSafeQueue<InboundRequest> |
inboundQueue
Inbound message queue (shared instance).
|
private int |
lastID
Last generated request ID.
|
private static java.util.logging.Logger |
LOG
Logger.
|
private int |
nodeAddress
Address of the node to which this instance belongs.
|
private ThreadSafeQueue<Message> |
outboundQueue
Outbound message queue (per session).
|
private Protocol |
protocol
Instance of the protocol used to send/receive messages.
|
private java.util.Map<java.lang.Integer,WaitingThread> |
pwt
Pool of Waiting Threads.
|
private int |
remoteAddress
Address of the node at remote side of the connection.
|
private boolean |
sessInit
true if the session is fully initialized. |
private NetSocket |
socket
Socket which is used for communication with remote side.
|
ADDRESS_REPLY, ADDRESS_REQUEST, BEGIN_REPLY, ECHO, ECHO_REPLY, END_REPLY, INIT_CONVERSATION, INIT_REPLY, INIT_ROUTER, INIT_STANDARD, REPLY_ASYNC, REPLY_EXCEPTION_ASYNC, REPLY_EXCEPTION_SYNC, REPLY_SYNC, REQUEST_ASYNCH, REQUEST_SYNCH, UNKNOWN
Constructor and Description |
---|
Queue(NetSocket socket)
Constructs an instance.
|
Modifier and Type | Method and Description |
---|---|
private void |
checkState()
Check if the queue is running and throw an exception if it is not.
|
(package private) void |
closeTransport()
Closes the low-level networking transport.
|
(package private) static InboundRequest |
dequeueInbound()
Retrieve a message from the Inbound Queue.
|
(package private) Message |
dequeueOutbound()
Retrieve a message from the Outbound Queue.
|
(package private) void |
disable(java.lang.Exception reason)
Disable the queue in preparation for shutting it down.
|
(package private) void |
enableConversation(boolean start,
boolean check)
Enable conversation mode.
|
(package private) void |
enqueueInbound(java.io.Serializable obj)
Preliminary message processing method.
|
(package private) void |
enqueueOutbound(Message msg)
Put message into Outbound Queue for delivery to the remote side.
|
void |
forward(Message message)
Send provided message as an asynchronous request without waiting for a
reply.
|
(package private) AddressPair |
getAddressPair()
Return an instance of
AddressPair built using local node
and remote node addresses. |
(package private) java.lang.Object |
getCleanupLock()
Get the context cleanup
lock , to avoid concurrent cleanups. |
(package private) Control |
getControl(int contextID)
Gets the registered instance of the interrupt manager.
|
java.lang.Exception |
getException()
Return the exception which was passed to the
stop(java.lang.Exception, boolean) method, if
any. |
int |
getNodeAddress()
Provide an access to the node address assigned to the queue.
|
(package private) Protocol |
getProtocol()
Get the
Protocol instance of this queue. |
int |
getRemoteAddress()
Provide an access to the remote node address known to the queue.
|
(package private) java.net.InetSocketAddress |
getRemoteInetAddress()
Report the socket address for the given queue.
|
static boolean |
isConversationThread()
Report if the current threads is a conversation thread.
|
(package private) static boolean |
isInboundTerminated()
Report if the inbound queue is terminated.
|
private boolean |
isLocalAddress(int addr)
Utility method which checks if address belongs to the same address space
as the local node address.
|
boolean |
isRunning()
Returns the operational state of this session.
|
(package private) boolean |
isTransportClosed()
Reports if the low level networking transport is closed.
|
private void |
notifyPWT(java.lang.Exception reason)
Release all waiting threads in the PWT and pass an exception to let the
waiting threads know the disposition of the protocol.
|
(package private) byte[] |
read()
Read next portion of input from the socket
|
void |
registerSynchronizer(StateSynchronizer ssync)
Sets the instance of the state synchronizer the reader and writer
threads should use, if and only if there is no other state synchronizer
already in use.
|
(package private) void |
sessionInitialized()
Set the value of the session initialized flag to
true . |
(package private) void |
setNodeAddress(int address)
Assign new node address to the queue.
|
(package private) void |
setProtocolNames()
Force the reader and writer threads to have more descriptive names
to aid debugging.
|
(package private) void |
setRemoteAddress(int address)
Assign new remote node address to the queue.
|
(package private) void |
start(boolean init,
int timeout,
boolean convo,
boolean thread)
Start queue.
|
(package private) void |
stop(java.lang.Exception reason,
boolean switchContext)
Explicitly stop message queue processing, notify all interested parties
that queue is about to shutdown, and either release all waiting threads
or halt the current conversation.
|
(package private) void |
stopTransport(java.lang.Exception reason)
Stop the protocol threads and either release all waiting threads or halt
the current conversation.
|
(package private) static void |
terminateQueue()
Terminate inbound queue.
|
java.lang.Object |
transact(Message message,
int timeout)
Send provided message as a synchronous request and wait for reply.
|
(package private) Message |
transactImpl(Message message,
int timeout)
Underlying method for all other transactXX() methods.
|
(package private) void |
write(byte[] data)
Write data to the socket
|
private static final java.util.logging.Logger LOG
private static final ThreadSafeQueue<InboundRequest> inboundQueue
private static final java.lang.ThreadGroup convThreads
private final ThreadSafeQueue<Message> outboundQueue
private final java.util.Map<java.lang.Integer,WaitingThread> pwt
private NetSocket socket
private volatile int nodeAddress
private volatile int remoteAddress
private volatile Protocol protocol
private int lastID
private boolean dead
private java.lang.Exception exception
private volatile Control ctrl
private volatile Conversation conversation
private boolean sessInit
true
if the session is fully initialized.private final java.lang.Object cleanupLock
Queue(NetSocket socket)
socket
- Low level transport used to communicate with remote side.public static boolean isConversationThread()
true
if the current thread is a conversion thread.static InboundRequest dequeueInbound() throws java.lang.InterruptedException
java.lang.InterruptedException
- is forwarded from ThreadSafeQueue.dequeue() .static void terminateQueue()
static boolean isInboundTerminated()
true
if the inbound queue is terminated.public boolean isRunning()
isRunning
in interface LowLevelSession
true
if the session is running (operational)
and false
if the session is closed.public void registerSynchronizer(StateSynchronizer ssync)
ssync
- The synchronizer to register or null
if the
currently registered synchronizer should be deregistered.public java.lang.Exception getException()
stop(java.lang.Exception, boolean)
method, if
any.
If not null
, it will contain the queue termination reason.
getException
in interface LowLevelSession
null
if the queue is not yet terminated or if the queue's
termination was not abnormal.public int getNodeAddress()
getNodeAddress
in interface LowLevelSession
public int getRemoteAddress()
getRemoteAddress
in interface LowLevelSession
java.lang.Object getCleanupLock()
lock
, to avoid concurrent cleanups.void stop(java.lang.Exception reason, boolean switchContext)
reason
- The reason for the shutdown or null
to signal
a clean shutdown. Any non-null
value indicates
an abnormal end (abend).switchContext
- Optionally switch the security context to that of the session's
user. This is necessary when this method is invoked from
secondary protocol threads which do not have the security
context of the user.void disable(java.lang.Exception reason)
isRunning()
will report false, even if the
underlying transport is still active.
This call should be followed by a call to stopTransport(Exception)
to actually shut down the transport
mechanism.
reason
- The reason for the shutdown or null
to signal
a clean shutdown. Any non-null
value indicates
an abnormal end (abend).void stopTransport(java.lang.Exception reason)
This call should be preceded by a call to disable(Exception)
.
reason
- The reason for the shutdown or null
to signal
a clean shutdown. Any non-null
value indicates
an abnormal end (abend).public void forward(Message message)
REQUEST_ASYNCH
.message
- Request message.public java.lang.Object transact(Message message, int timeout) throws java.lang.Exception
The message passed as a parameter must be completely filled with all required information - routing key should point to valid entry point. If addresses in the routing key are not set then destination is assumed at directly connected node and appropriate addresses are assigned automatically.
Caller can specify timeout for waiting reply message. If timeout is set to 0 then queue will wait for reply as long as queue is running.
message
- Request message.timeout
- Reply wait timeout.java.lang.Exception
- Is thrown if reply received from remote side has an exception
as its payload, if a timeout occurs before the reply message
is received or if a problem occurs during transmission.Control getControl(int contextID)
contextID
- the security context IDnull
if no instance
is registered.Message dequeueOutbound() throws java.lang.InterruptedException
java.lang.InterruptedException
- is forwarded from ThreadSafeQueue.dequeue() .void enqueueInbound(java.io.Serializable obj) throws ProtocolViolation, java.lang.InterruptedException
The algorithm:
Message Type Disposition ------------------------------------------- ------------------------- ADDRESS_REQUEST router INIT_STANDARD router INIT_CONVERSATION router any where destination addr != current addr router ECHO immediate send back REQUEST_SYNCH dispatcher REQUEST_ASYNCH dispatcher all forms of REPLY wake waiting thread
obj
- The message to process.ProtocolViolation
- In case of protocol error.java.lang.InterruptedException
- If routing queue or inbound queue is terminated.void enqueueOutbound(Message msg) throws java.security.InvalidParameterException
msg
- An instance of Message
.java.security.InvalidParameterException
- if message does not contain routing information.AddressPair getAddressPair()
AddressPair
built using local node
and remote node addresses. Note that instance is built for use by remote
side (i.e. remote address and node address are swapped). Refer to
AddressPair
description for more details.AddressPair
built from local node
and remote node addresses.AddressPair
java.net.InetSocketAddress getRemoteInetAddress()
boolean isTransportClosed()
true
if the transport is closed.void closeTransport()
void write(byte[] data) throws java.io.IOException
data
- data to be writtenjava.io.IOException
- on errorbyte[] read() throws java.io.IOException
java.io.IOException
- on errorvoid setNodeAddress(int address)
address
- New node address.void setRemoteAddress(int address)
address
- New remote node address.void start(boolean init, int timeout, boolean convo, boolean thread) throws java.lang.InterruptedException, RequestTimeoutException, java.lang.IllegalStateException, java.lang.Exception
Protocol
instance
and address negotiation if this is requested by application.
All address information related to this queue instance known by the application should be assigned before calling this method.
If conversation mode is requested, then all synchronous messages will be processed by a single dedicated thread on both sides of the session.
init
- true
if INIT_STANDARD or INIT_CONVERSATION
message should be sent to the other side (this invokes node
address negotiation) for a leaf node, or if INIT_ROUTER
should be sent for a routing node. If false
is
passed as a parameter then message is not sent and timeout
parameter is ignored.timeout
- Timeout waiting for the INIT_REPLYconvo
- true
to enable conversation mode.thread
- true
to start a dedicated thread in conversation
mode. Ignored otherwise.java.lang.InterruptedException
- forwarded from the transactImpl() method.RequestTimeoutException
- forwarded from the transactImpl() method.ProtocolViolation
- in case of invalid reply on INIT message.java.lang.Exception
- forwarded from the transactImpl() method.java.lang.IllegalStateException
void setProtocolNames()
Message transactImpl(Message message, int timeout) throws java.lang.InterruptedException, RequestTimeoutException, java.lang.Exception
WaitingThread
instance into PWT and
wait for a reply. Then it returns a reply message or throws an exception
if reply is not arrived in time.message
- Request message.timeout
- Reply wait timeout.Message
instance.java.lang.InterruptedException
- if waiting for the message was interrupted.RequestTimeoutException
- if message is not arrived in time.java.lang.Exception
- if it is delivered from the
WaitingThread.waitMessage()
void enableConversation(boolean start, boolean check)
start
- true
if a new thread should be started for the
conversation.check
- true
if the queue should be checked to confirm
that it is running. false
to bypass that check.void sessionInitialized()
true
.private boolean isLocalAddress(int addr)
addr
- Address to check.true
if address passed as a parameter and local
node address belong to the same address space (i.e. owned by
the same routing node).private void notifyPWT(java.lang.Exception reason)
reason
- Exception object which will be passed to all waiting threads.private void checkState() throws java.lang.IllegalStateException
java.lang.IllegalStateException
- If the queue is not running.