using System; using System.IO; using System.Collections.Generic; namespace XY { /// <summary> /// An alternative to <see cref="System.IO.MemoryStream"/> that uses a number of buffers /// taken from a pool as its backing store instead of a single buffer. /// </summary> public class PoolMemoryStream : Stream { private List<byte[]> Buffers = new List<byte[]>(); private IBufferPool _pool; public IBufferPool Pool { get { return _pool; } private set { _pool = value; } } public PoolMemoryStream(IBufferPool pool = null) { if (pool == null) pool = BufferPool.Instance; Pool = pool; } public PoolMemoryStream(int capacity, IBufferPool pool = null) { if (pool == null) pool = BufferPool.Instance; Pool = pool; SetCapacity(capacity); } public PoolMemoryStream(byte[] buffer, IBufferPool pool = null) { Constructor(buffer, 0, buffer.Length, true, false, pool); } public PoolMemoryStream(byte[] buffer, bool writable, IBufferPool pool = null) { Constructor(buffer, 0, buffer.Length, writable, false, pool); } public PoolMemoryStream(byte[] buffer, int index, int count, IBufferPool pool = null) { Constructor(buffer, index, count, true, false, pool); } public PoolMemoryStream(byte[] buffer, int index, int count, bool writable, IBufferPool pool = null) { Constructor(buffer, index, count, writable, false, pool); } public PoolMemoryStream(byte[] buffer, int index, int count, bool writable, bool publiclyVisible, IBufferPool pool = null) { Constructor(buffer, index, count, writable, publiclyVisible, pool); } private void Constructor(byte[] buffer, int index, int count, bool writable, bool publiclyVisible, IBufferPool pool) { if (buffer == null) throw new ArgumentNullException("buffer"); if (index < 0) throw new ArgumentOutOfRangeException("index", "index is negative"); if (count < 0) throw new ArgumentOutOfRangeException("count", "count is negative"); if (buffer.Length - index < count) throw new ArgumentException("index+count", "size of buffer is less than index + count"); if (pool == null) pool = BufferPool.Instance; Pool = pool; Write(buffer, index, count); Position = 0; canWrite = writable; visible = publiclyVisible; expandable = false; } private bool expandable = true; private bool canWrite = true; private bool visible = true; public virtual byte[] ToArray() { var buffer = new byte[Length]; var bufNum = 0; var posInBuf = 0; var bytesToRead = Length; var bytesRead = 0; var bytesLeft = Length; var offset = 0; while (bytesLeft > 0) { var bytesToCopy = (posInBuf + bytesLeft) < Pool.BufferSize ? (int)bytesLeft : Pool.BufferSize - posInBuf; var buf = Buffers[bufNum]; Buffer.BlockCopy(buf, posInBuf, buffer, offset, bytesToCopy); Position += bytesToCopy; offset += bytesToCopy; bytesLeft -= bytesToCopy; bytesRead += bytesToCopy; bufNum++; posInBuf = 0; } return buffer; } public virtual byte[] GetBuffer() { if (!visible) throw new UnauthorizedAccessException(); var buffer = new byte[Capacity]; var offset = 0; foreach (var buf in Buffers) { Buffer.BlockCopy(buf, 0, buffer, offset, buf.Length); offset += buf.Length; } return buffer; } #region implemented abstract members of Stream public override void Flush() { } private void CheckIfDisposed() { if (_disposed) throw new ObjectDisposedException("PoolMemoryStream"); } public override int Read(byte[] buffer, int offset, int count) { if (buffer == null) throw new ArgumentNullException("buffer"); if ((offset + count) > buffer.Length) throw new ArgumentException("buffer too small", "buffer"); if (offset < 0) throw new ArgumentException("offset must be >= 0", "offset"); if (count < 0) throw new ArgumentException("count must be >= 0", "count"); CheckIfDisposed(); if (Position >= Length || count == 0) return 0; var bufNum = (int)Position / Pool.BufferSize; var posInBuf = (int)Position - bufNum * Pool.BufferSize; var bytesToRead = Math.Min(count, (int)(Length - Position)); var bytesRead = 0; var bytesLeft = bytesToRead - bytesRead; while (bytesLeft > 0) { var bytesToCopy = (posInBuf + bytesLeft) < Pool.BufferSize ? bytesLeft : Pool.BufferSize - posInBuf; var buf = Buffers[bufNum]; Buffer.BlockCopy(buf, posInBuf, buffer, offset, bytesToCopy); Position += bytesToCopy; offset += bytesToCopy; bytesLeft -= bytesToCopy; bytesRead += bytesToCopy; bufNum++; posInBuf = 0; } return bytesToRead; } public override long Seek(long offset, SeekOrigin origin) { CheckIfDisposed(); if (offset > (long)int.MaxValue) throw new ArgumentOutOfRangeException("offset out of range. " + offset); switch (origin) { case SeekOrigin.Current: offset += Position; break; case SeekOrigin.End: offset += Length; break; case SeekOrigin.Begin: break; default: throw new ArgumentException("origin", "invalid SeekOrigin"); } if (offset < 0) throw new IOException("Attempted to seek before start of PoolMemoryStream."); Position = offset; return Position; } public int Capacity { get { CheckIfDisposed(); return Buffers.Count * Pool.BufferSize; } set { CheckIfDisposed(); if (value < 0 || value < Length) throw new ArgumentOutOfRangeException("value", "capacity cannot be negative or smaller than length of stream."); SetCapacity(value); } } private bool dirty = true; public override void SetLength(long value) { if (!canWrite) throw new NotSupportedException("cannot write to stream"); SetCapacity(value); if (value < _length) dirty = true; _length = value; if (Position > _length) Position = _length; } private void SetCapacity(long value) { CheckIfDisposed(); if (!expandable && value > Capacity) throw new NotSupportedException("cannot expand stream"); if (value < 0 || value > int.MaxValue) throw new ArgumentOutOfRangeException(); if (value == 0) { foreach (var buf in Buffers) { Pool.Return(buf); } Buffers.Clear(); return; } if (value == Capacity) return; int buffers = (int)(value / Pool.BufferSize); if ((value - buffers * Pool.BufferSize) > 0) buffers++; if (buffers < Buffers.Count) { for (int i = buffers + 1; i < Buffers.Count; i++) { Pool.Return(Buffers[i]); } Buffers.RemoveRange(buffers + 1, Buffers.Count - buffers); } else { if (dirty && Buffers.Count > 0) { var dirtyBytes = Capacity - Length; var lastBuf = Buffers[Buffers.Count - 1]; for (var i = Pool.BufferSize - dirtyBytes; i < Pool.BufferSize; i++) { lastBuf[i] = 0; } dirty = false; } if (buffers > Buffers.Count) { for (int i = Buffers.Count; i < buffers; i++) { var buf = Pool.Take(); Buffers.Add(buf); } } } } public override void Write(byte[] buffer, int offset, int count) { if (!canWrite) throw new NotSupportedException("cannot write to stream"); if (buffer == null) throw new ArgumentNullException("buffer"); if ((offset + count) > buffer.Length) throw new ArgumentException("buffer too small", "buffer"); if (offset < 0) throw new ArgumentException("offset must be >= 0", "offset"); if (count < 0) throw new ArgumentException("count must be >= 0", "count"); CheckIfDisposed(); if (Position > Length - count) SetLength(Position + count); var bufNum = (int)Position / Pool.BufferSize; var posInBuf = (int)Position - bufNum * Pool.BufferSize; var bytesWritten = 0; var bytesLeft = count - bytesWritten; while (bytesLeft > 0) { var bytesToCopy = (posInBuf + bytesLeft) < Pool.BufferSize ? bytesLeft : Pool.BufferSize - posInBuf; var buf = Buffers[bufNum]; Buffer.BlockCopy(buffer, offset, buf, posInBuf, bytesToCopy); Position += bytesToCopy; offset += bytesToCopy; bytesLeft -= bytesToCopy; bytesWritten += bytesToCopy; bufNum++; posInBuf = 0; } } public override void WriteByte(byte value) { if (!canWrite) throw new NotSupportedException("cannot write to stream"); CheckIfDisposed(); if (Position >= Length) SetLength(Position + 1); var bufNum = (int)Position / Pool.BufferSize; var posInBuf = (int)Position - bufNum * Pool.BufferSize; var buf = Buffers[bufNum]; buf[posInBuf] = value; Position++; } public override int ReadByte() { CheckIfDisposed(); if (Position >= Length) return -1; var bufNum = (int)Position / Pool.BufferSize; var posInBuf = (int)Position - bufNum * Pool.BufferSize; var buf = Buffers[bufNum]; Position++; return buf[posInBuf]; } public override bool CanRead { get { return !_disposed; } } public override bool CanSeek { get { return !_disposed; } } public override bool CanWrite { get { return !_disposed && canWrite; } } private long _length; public override long Length { get { CheckIfDisposed(); return _length; } } private long _position; public override long Position { get { CheckIfDisposed(); return _position; } set { CheckIfDisposed(); _position = value; } } #endregion private bool _disposed; protected override void Dispose(bool disposing) { if (!_disposed) { if (disposing) { foreach (var buffer in Buffers) { Pool.Return(buffer); } Buffers.Clear(); } _disposed = true; } } } }