Wednesday, 3 December 2014

EDI POST PROCESS PIPELINE - 3 VirtualStream

//---------------------------------------------------------------------
// File: VirtualStream.cs
//
// Summary: A sample pipeline component which demonstrates how to promote message context
//          properties and write distinguished fields for XML messages using arbitrary
//          XPath expressions.
//
// Sample: Arbitrary XPath Property Handler Pipeline Component SDK
//
//---------------------------------------------------------------------
// This file is part of the Microsoft BizTalk Server 2009 SDK
//
// Copyright (c) Microsoft Corporation. All rights reserved.
//
// This source code is intended only as a supplement to Microsoft BizTalk
// Server 2009 release and/or on-line documentation. See these other
// materials for detailed information regarding Microsoft code samples.
//
// THIS CODE AND INFORMATION ARE PROVIDED "AS IS" WITHOUT WARRANTY OF ANY
// KIND, WHETHER EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND/OR FITNESS FOR A PARTICULAR
// PURPOSE.
//---------------------------------------------------------------------

using System;
using System.IO;
using System.Text;
using System.Runtime.InteropServices;
using Microsoft.Win32.SafeHandles;

namespace Microsoft.Samples.BizTalk.Pipelines.CustomComponent
{
/// <summary>
/// Implements a virtual stream, i.e. the always seekable stream which
/// uses configurable amount of memory to reduce a memory footprint and
/// temporarily stores remaining data in a temporary file on disk.
/// </summary>
public sealed class VirtualStream : Stream, IDisposable
{
/// <summary>
/// Memory handling.
/// </summary>
public enum MemoryFlag
{
AutoOverFlowToDisk = 0,
OnlyInMemory = 1,
OnlyToDisk = 2
}

// Constants
private const int MemoryThreshold = 4*1024*1024; // The maximum possible memory consumption (4Mb)
private const int DefaultMemorySize = 4*1024; // Default memory consumption (4Kb)

private Stream wrappedStream;
private bool isDisposed;
private bool isInMemory;
private int thresholdSize;
private MemoryFlag memoryStatus;

/// <summary>
/// Initializes a VirtualStream instance with default parameters (10K memory buffer,
/// allow overflow to disk).
/// </summary>
public VirtualStream()
: this(DefaultMemorySize, MemoryFlag.AutoOverFlowToDisk, new MemoryStream())
{
}

/// <summary>
/// Initializes a VirtualStream instance with memory buffer size.
/// </summary>
/// <param name="bufferSize">Memory buffer size</param>
public VirtualStream(int bufferSize)
: this(bufferSize, MemoryFlag.AutoOverFlowToDisk, new MemoryStream(bufferSize))
{
}

/// <summary>
/// Initializes a VirtualStream instance with a default memory size and memory flag specified.
/// </summary>
/// <param name="flag">Memory flag</param>
public VirtualStream(MemoryFlag flag)
: this(DefaultMemorySize, flag,
(flag == MemoryFlag.OnlyToDisk) ? CreatePersistentStream() : new MemoryStream())
{
}

/// <summary>
/// Initializes a VirtualStream instance with a memory buffer size and memory flag specified.
/// </summary>
/// <param name="bufferSize">Memory buffer size</param>
/// <param name="flag">Memory flag</param>
public VirtualStream(int bufferSize, MemoryFlag flag)
: this(bufferSize, flag,
(flag == MemoryFlag.OnlyToDisk) ? CreatePersistentStream() : new MemoryStream(bufferSize))
{
}

/// <summary>
/// Initializes a VirtualStream instance with a memory buffer size, memory flag and underlying stream
/// specified.
/// </summary>
/// <param name="bufferSize">Memory buffer size</param>
/// <param name="flag">Memory flag</param>
/// <param name="dataStream">Underlying stream</param>
private VirtualStream(int bufferSize, MemoryFlag flag, Stream dataStream)
{
if (null == dataStream)
throw new ArgumentNullException("dataStream");

isInMemory = (flag != MemoryFlag.OnlyToDisk);
memoryStatus = flag;
bufferSize = Math.Min(bufferSize, MemoryThreshold);
thresholdSize = bufferSize;

if (isInMemory)
wrappedStream = dataStream;  // Don't want to double wrap memory stream
else
wrappedStream = new BufferedStream(dataStream, bufferSize);
isDisposed = false;
}

#region Stream Methods and Properties

/// <summary>
/// Gets a flag indicating whether a stream can be read.
/// </summary>
override public bool CanRead
{
get {return wrappedStream.CanRead;}
}
/// <summary>
/// Gets a flag indicating whether a stream can be written.
/// </summary>
override public bool CanWrite
{
get {return wrappedStream.CanWrite;}
}
/// <summary>
/// Gets a flag indicating whether a stream can seek.
/// </summary>
override public bool CanSeek
{
get {return true;}
}
/// <summary>
/// Returns the length of the source stream.
/// <seealso cref="GetLength()"/>
/// </summary>
override public long Length
{
get {return wrappedStream.Length;}
}

/// <summary>
/// Gets or sets a position in the stream.
/// </summary>
override public long Position
{
get {return wrappedStream.Position;}
set {wrappedStream.Seek(value, SeekOrigin.Begin);}
}

/// <summary>
/// <see cref="Stream.Close()"/>
/// </summary>
/// <remarks>
/// Calling other methods after calling Close() may result in a ObjectDisposedException beeing throwed.
/// </remarks>
override public void Close()
{
if(!isDisposed)
{
GC.SuppressFinalize(this);
Cleanup();
}
}

/// <summary>
/// <see cref="Stream.Flush()"/>
/// </summary>
/// <remarks>
/// </remarks>
override public void Flush()
{
ThrowIfDisposed();
wrappedStream.Flush();
}

/// <summary>
/// <see cref="Stream.Read()"/>
/// </summary>
/// <param name="buffer"></param>
/// <param name="offset"></param>
/// <param name="count"></param>
/// <returns>
/// The number of bytes read
/// </returns>
/// <remarks>
/// May throw <see cref="ObjectDisposedException"/>.
/// It will read from cached persistence stream
/// </remarks>
override public int Read(byte[] buffer, int offset, int count)
{
ThrowIfDisposed();
return wrappedStream.Read(buffer, offset, count);
}

/// <summary>
/// <see cref="Stream.Seek()"/>
/// </summary>
/// <param name="offset"></param>
/// <param name="origin"></param>
/// <returns>
/// The current position
/// </returns>
/// <remarks>
/// May throw <see cref="ObjectDisposedException"/>.
/// It will cache any new data into the persistence stream
/// </remarks>
override public long Seek(long offset, SeekOrigin origin)
{
ThrowIfDisposed();
return wrappedStream.Seek(offset, origin);
}

/// <summary>
/// <see cref="Stream.SetLength()"/>
/// </summary>
/// <param name="length"></param>
/// <remarks>
/// May throw <see cref="ObjectDisposedException"/>.
/// </remarks>
override public void SetLength(long length)
{
ThrowIfDisposed();

// Check if new position is greater than allowed by threshold
if (memoryStatus == MemoryFlag.AutoOverFlowToDisk &&
isInMemory &&
length > thresholdSize)
{
// Currently in memory, and the new write will push it over the limit
// Switching to Persist Stream
BufferedStream persistStream = new BufferedStream(CreatePersistentStream(), thresholdSize);

// Copy current wrapped memory stream to the persist stream
CopyStreamContent((MemoryStream)wrappedStream, persistStream);

// Close old wrapped stream
if (wrappedStream != null)
wrappedStream.Close();

wrappedStream = persistStream;
isInMemory = false;
}

// Set new length for the wrapped stream
wrappedStream.SetLength(length);
}

/// <summary>
/// <see cref="Stream.Write()"/>
/// <param name="buffer"></param>
/// <param name="offset"></param>
/// <param name="count"></param>
/// </summary>
/// <remarks>
/// Write to the underlying stream.
/// </remarks>
override public void Write(byte[] buffer, int offset, int count)
{
ThrowIfDisposed();

// Check if new position after write is greater than allowed by threshold
if (memoryStatus == MemoryFlag.AutoOverFlowToDisk &&
isInMemory &&
(count + wrappedStream.Position) > thresholdSize)
{
// Currently in memory, and the new write will push it over the limit
// Switching to Persist Stream
BufferedStream persistStream = new BufferedStream(CreatePersistentStream(), thresholdSize);

// Copy current wrapped memory stream to the persist stream
CopyStreamContent((MemoryStream) wrappedStream, persistStream);

// Close old wrapped stream
if (wrappedStream != null)
wrappedStream.Close();

wrappedStream = persistStream;
isInMemory = false;
}

wrappedStream.Write(buffer, offset, count);
}

#endregion

#region IDisposable Interface

/// <summary>
/// <see cref="IDisposeable.Dispose()"/>
/// </summary>
/// <remarks>
/// It will call <see cref="Close()"/>
/// </remarks>
public void Dispose()
{
Close();
}

#endregion

#region Private Utility Functions

/// <summary>
/// Utility method called by the Finalize(), Close() or Dispose() to close and release
/// both the source and the persistence stream.
/// </summary>
private void Cleanup()
{
if(!isDisposed)
{
isDisposed = true;
if(null != wrappedStream)
{
wrappedStream.Close();
wrappedStream = null;
}
}
}

/// <summary>
/// Copies source memory stream to the target stream.
/// </summary>
/// <param name="source">Source memory stream</param>
/// <param name="target">Target stream</param>
private void CopyStreamContent(MemoryStream source, Stream target)
{
// Remember position for the source stream
long currentPosition = source.Position;

// Read and write in chunks each thresholdSize
byte[] tempBuffer = new Byte[thresholdSize];
int read = 0;

source.Position = 0;
while ((read = source.Read(tempBuffer, 0, tempBuffer.Length)) != 0)
target.Write(tempBuffer, 0, read);

// Set target's stream position to be the same as was in source stream. This is required because
// target stream is going substitute source stream.
target.Position = currentPosition;

// Restore source stream's position (just in case to preserve the source stream's state)
source.Position = currentPosition;
}

/// <summary>
/// Called by other methods to check the stream state.
/// It will thorw <see cref="ObjectDisposedException"/> if the stream was closed or disposed.
/// </summary>
private void ThrowIfDisposed()
{
if(isDisposed || null == wrappedStream)
throw new ObjectDisposedException("VirtualStream");
}

/// <summary>
/// Utility method.
/// Creates a FileStream with a unique name and the temporary and delete-on-close attributes.
/// </summary>
/// <returns>
/// The temporary persistence stream
/// </returns>
public static Stream CreatePersistentStream()
{
StringBuilder name = new StringBuilder(256);

IntPtr handle;
if(0 == GetTempFileName(Path.GetTempPath(), "BTS", 0, name))
throw new IOException("GetTempFileName Failed.", Marshal.GetHRForLastWin32Error());

handle = CreateFile(name.ToString(), (UInt32) FileAccess.ReadWrite, 0, IntPtr.Zero, (UInt32) FileMode.Create, 0x04000100, IntPtr.Zero);

// FileStream constructor will throw exception if handle is zero or -1.
return new FileStream(new SafeFileHandle(handle, true), FileAccess.ReadWrite);
}

[DllImport("kernel32.dll")]
private static extern UInt32 GetTempFileName
(
string path,
string prefix,
UInt32 unique,
StringBuilder name
);

[DllImport("kernel32.dll")]
private static extern IntPtr CreateFile
(
string name,
UInt32 accessMode,
UInt32 shareMode,
IntPtr security,
UInt32 createMode,
UInt32 flags,
IntPtr template
);

#endregion
}
}

No comments:

Post a Comment