pipedstreams
What's wrong with the following code? Usually works fine but from time to
time readInt returns values that has never been written to the stream (that
results in ArrayIndexOutOfBoundsException). On the other hand if I have
only one consumer and one producer thread everything works just fine. What
may be the problem? Should I synchronize access to that streams in some
other way?
--
Regards,
Tomasz Grobelny
import java.io.*;
class comm
{
PipedInputStream in;
PipedOutputStream out;
PipedInputStream[] in_t;
PipedOutputStream[] out_t;
public comm(int num)
{
in=new PipedInputStream();
out=new PipedOutputStream();
try{in.connect(out);}
catch (IOException e){System.err.println("An error occured while
connecting");}
in_t=new PipedInputStream[num];
out_t=new PipedOutputStream[num];
for(int i=0;i<num;i++)
{
in_t[i]=new PipedInputStream();
out_t[i]=new PipedOutputStream();
try{in_t[i].connect(out_t[i]);}
catch (IOException e){System.err.println("An error occured while
connecting ("+i+")");}
}
}
public synchronized void send(int dest, int v)
{
DataOutputStream dos;
if(dest==-1)
{
synchronized(out)
{
dos=new DataOutputStream(out);
try{dos.writeInt(v);dos.flush();out.flush();}
catch (IOException e){System.err.println("Error while writing (-1)");}
}
}
else
{
synchronized(out_t[dest])
{
dos=new DataOutputStream(out_t[dest]);
try{dos.writeInt(v);dos.flush();out_t[dest].flush();}
catch (IOException e){System.err.println("Error while writing
("+dest+")");}
}
}
}
public int receive(int from)
{
if(from==-1)
{
synchronized(in)
{
try{return (new DataInputStream(in)).readInt();}
catch (IOException e){System.err.println("Error while reading
(-1)");return -1;}
}
}
else
{
synchronized(in_t[from])
{
try{return (new DataInputStream(in_t[from])).readInt();}
catch (IOException e){System.err.println("Error while reading
("+from+")");return -1;}
}
}
}
}
class mp_producer extends Thread
{
comm global;
int id;
public mp_producer(comm g, int i)
{
global=g;
id=i;
}
public void run()
{
comm local;
for(int i=0;;i++)
{
System.out.println("Producer "+id+" Waiting for free space...");
int cd=global.receive(-1);
System.out.println("Producer "+id+" Started producing for "+cd);
try{Thread.sleep((int
(java.lang.Math.random()*1000));}catch(java.lang.InterruptedException e){}
System.out.println("Producer "+id+" Finished producing for "+cd+"
item "+i);
global.send(cd, i);
}
}
}
class mp_consumer extends Thread
{
comm global;
int id;
public mp_consumer(comm g, int i)
{
global=g;
id=i;
}
public void run()
{
for(;;)
{
global.send(-1, id); //announce ability to consume an item
System.out.println("Consumer "+id+" Waiting for item...");
int d=global.receive(id);
System.out.println("Consumer "+id+" Started consuming: "+d);
try{Thread.sleep((int
(java.lang.Math.random()*1000));}catch(java.lang.InterruptedException e){}
System.out.println("Consumer "+id+" Finished consuming: "+d);
}
}
}
class mp
{
public static void main(String[] args)
{
int max=10;
comm globalcomm=new comm(max);
for(int i=0;i<max;i++)
{
mp_consumer cons=new mp_consumer(globalcomm, i);
mp_producer prod=new mp_producer(globalcomm, i);
cons.start();
prod.start();
}
}
}