Re: Reading into a buffer and writing from it at the same time
Hello,
On 14 Okt., 15:33, Rze=C5=BAnik <marcin.rzezni...@gmail.com> wrote:
On 14 Pa=C5=BA, 15:18, "A. Farber" <alexander.far...@gmail.com> wrote:
I'm programming an embedded device which
receives data over USB device and saves it to
a file.
Won't this:http://java.sun.com/javase/6/docs/api/java/util/concurrent/loc=
ks/Read...
suit you better? It is lightweight, lock-free implementation.
unfortunately I don't have ReadWriteLock at the device.
I've come up with a solution that I've listed below,
but it is missing something minor - reports an
IllegalMonitorStateException even though
I have put synchronized(this) around the _writeLock.notify();
You should consider making _head and _tail volatile, you might end up
reading stalled value from cache without memory barrier.
(ReadwriteLock states memory barrier)
Could you explain this?
Regards
Alex
PS: Here is my current try:
CyclicBuffer readBuffer = new CyclicBuffer(2 *
_conn._readBufferSize);
.......
while (true) {
if (readBuffer.read(in) <= 0)
break;
readBuffer.write(out);
final class CyclicBuffer
{
private final static int UNKNOWN = 0;
private final static int EMPTY = 1;
private final static int FULL = 2;
/*
* _head shows to the next free position in the buffer
* _tail shows to the last occupied position in the buffer
* If _tail == _head then buffer is either full or empty (depends on
_state)
*/
private byte[] _buf;
private int _head;
private int _tail;
private int _state;
/* package */ Object _readLock;
/* package */ Object _writeLock;
public CyclicBuffer(int len)
{
_buf = new byte[len];
_head = 0;
_tail = 0;
_state = EMPTY;
_readLock = new Object();
_writeLock = new Object();
}
public int read(InputStream in) throws IOException
{
int nbytes;
if (_state == FULL)
throw new IllegalStateException();
if (_tail <= _head) {
/*
* _buf: [_][_][_][x][x][x][x][_][_][_][_]
* ^ ^ ^
* | | |
* _tail _head _buf.length
*/
nbytes = in.read(_buf, _head, _buf.length - _head);
} else {
/*
* _buf: [x][x][_][_][_][_][_][_][x][x][x]
* ^ ^ ^
* | | |
* _head _tail _buf.length
*/
nbytes = in.read(_buf, _head, _tail - _head);
}
if (nbytes <= 0)
return -1;
// advance to the next free position in buffer
_head = (_head + nbytes) % _buf.length;
synchronized(this) {
if (_tail == _head) {
_state = FULL;
try {
// the buffer is full, so stop reading for now
_readLock.wait();
} catch (InterruptedException e) {
}
} else
_state = UNKNOWN;
// something has been read into buffer, so allow writing
again
_writeLock.notify();
}
return nbytes;
}
public void write(OutputStream out) throws IOException
{
int nbytes;
if (_state == EMPTY)
throw new IllegalStateException();
if (_tail < _head) {
/*
* _buf: [_][_][_][x][x][x][x][_][_][_][_]
* ^ ^ ^
* | | |
* _tail _head _buf.length
*/
nbytes = _head - _tail;
out.write(_buf, _tail, nbytes);
} else {
/*
* _buf: [x][x][_][_][_][_][_][_][x][x][x]
* ^ ^ ^
* | | |
* _head _tail _buf.length
*/
nbytes = _buf.length - _tail;
out.write(_buf, _tail, nbytes);
if (_head != 0) {
out.write(_buf, 0, _head);
nbytes += _head;
}
}
// advance to the next occupied position in buffer
_tail = (_tail + nbytes) % _buf.length;
synchronized(this) {
if (_tail == _head) {
_state = EMPTY;
try {
// the buffer is empty, so stop writing for now
_writeLock.wait();
} catch (InterruptedException e) {
}
} else
_state = UNKNOWN;
// something has been written out, so allow reading again
_readLock.notify();
}
}
}
(and I probably can get rid of the _state later)