Packetizer.java
import java.net.*;
import java.io.*;
import java.util.*; // for LinkedList
/*
XXX Needs a lot of work on thread state, starting, and stopping issues.
*/
/**
Format outgoing messages, parse incoming messages.
*/
public class Packetizer implements Runnable,MsgSender
{
protected Socket sock;
protected MsgBuilder builder;
protected InputStreamReader in;
protected OutputStreamWriter out;
protected String encoding = "ISO-8859-1";
protected LinkedList inbound,outbound;
protected boolean done;
protected Thread mythread;
/**
*/
public Packetizer(Socket sock,MsgBuilder builder)
throws IOException
{
this.sock = sock;
this.builder = builder;
in = new InputStreamReader(sock.getInputStream(),encoding);
out = new OutputStreamWriter(sock.getOutputStream(),encoding);
inbound = new LinkedList();
outbound = new LinkedList();
mythread = null;
}
/**
*/
public void start()
{
mythread = new Thread(this);
mythread.start();
}
/**
*/
public void stop()
{
done = true;
}
/**
*/
public boolean isRunning()
{
return mythread != null; // XXX Probably not good enough.
}
/**
*/
public void run()
{
try {
StringBuffer buf = new StringBuffer();
while (true) {
if (sock.isInputShutdown())
break; // XXX Is this right?
// Send all messages in the outbound queue.
int nout;
while (true) {
Object ob = null;
synchronized(outbound) {
nout = outbound.size();
if (nout <= 0)
break; // No messages to be sent.
ob = outbound.removeFirst();
}
String outstring =
builder.makeMessageFromObject(ob);
out.write(outstring,0,outstring.length());
out.flush();
// System.err.println("Sent: " + ob);
}
// Has the user requested shutdown?
if (done)
break;
// Read any pending characters into the receive buffer.
while (in.ready()) {
int c = in.read();
buf.append((char)c);
}
// Get first message (if any) from the buffer.
Object obj =
builder.getMessageFromString(buf);
if (obj != null) {
// System.err.println("Rcvd: " + obj);
synchronized(inbound) {
inbound.addLast(obj);
}
}
Thread.sleep(50);
}
// Play nice.
out.close();
in.close();
sock.close();
}
catch(InterruptedException e_ie) {
System.out.println("Interrupted: " + e_ie);
}
catch(IOException e_io) {
System.out.println("IO exception: " + e_io);
}
finally {
mythread = null;
}
}
/**
*/
public synchronized void sendMessage(Object o)
{
// System.err.println("Packetizer.sendMessage(" + o + ")");
outbound.addLast(o);
}
/**
*/
public synchronized boolean messageAvailable()
{
return inbound.size() > 0;
}
/**
*/
public synchronized Object peekMessage()
{
Object answer;
if (inbound.size() > 0)
answer = inbound.getFirst();
else
answer = null;
// System.err.println("Packetizer.peekMessage() -> " + answer);
return answer;
}
/**
*/
public Object readMessage()
{
Object answer;
if (inbound.size() > 0)
answer = inbound.removeFirst();
else
answer = null;
// System.err.println("Packetizer.readMessage() -> " + answer);
return answer;
}
}