有了上面的类,就很容易将Blob序列化了。只要有一个我们还未迭代的实体数据,就可以将实体分批为若干模块。每个模块类都会写到Azure Blob Block,而我们也会重设MemoryStream,因为下一个实体数据将被写入一个新模块中的新批次。当模块数已经达到极限,就要援引写有所有模块ID的PutBlockList,并为新的模块创建一个新的Blob。如果我们看到至少一个实体数据,BackupToBlock 也只会将写入内存流的数据就保存在模块中。
static void BackupToContainer(CloudBlobContainer containerToSave, CloudTableQuery query,
string backupId, PartitionKeyRange range)
{
// A block can be at most 4 MB in Azure Storage. Though we will be using much less
// we will allocate 4MB for the edge case where an entity may be 1MB
MemoryStream stream = new MemoryStream(4 * 1024 * 1024);
State state = new State(query, stream);
BlobRequestOptions requestOptions = new BlobRequestOptions()
{
RetryPolicy = RetryPolicies.RetryExponential(5, RetryPolicies.DefaultClientBackoff)
};
while (!state.HasCompleted)
{
// Store the resultset to a blob in the container. We will use a naming scheme but the scheme does not
// have any conseuqences on the strategy itself
string backupFileName = string.Format("{0}/{1}_{2}_{3}.xml",
backupId,
Guid.NewGuid(),
range.Min == null ? "null" : range.Min.GetHashCode().ToString(),
range.Max == null ? "null" : range.Max.GetHashCode().ToString());
CloudBlockBlob backupBlob = containerToSave.GetBlockBlobReference(backupFileName);
Blob blob = new Blob(state);
List blockIdList = new List();
foreach (Block block in blob.Blocks)
{
string blockId = BackupBlock(stream, requestOptions, backupBlob, block);
if (!string.IsNullOrEmpty(blockId))
{
blockIdList.Add(blockId);
}
}
if (blockIdList.Count > 0)
{
// commit block list
backupBlob.PutBlockList(blockIdList, requestOptions);
}
}
}
private static string BackupBlock(MemoryStream stream, BlobRequestOptions requestOptions, CloudBlockBlob backupBlob, Block block)
{
int entityCount = 0;
// reset the memory stream as we begin a new block
stream.Seek(0, SeekOrigin.Begin);
stream.SetLength(0);
XmlWriter writer = XmlWriter.Create(stream);
writer.WriteStartElement("Block");
foreach (Batch batch in block.Batches)
{
// write begin batch statement
writer.WriteStartElement("Batch");
foreach (BackupEntity entity in batch.Entities)
{
entityCount++;
entity.EntryElement.WriteTo(writer);
}
writer.WriteEndElement();
}
writer.WriteEndElement();
writer.Close();
stream.SetLength(stream.Position);
stream.Seek(0, SeekOrigin.Begin);
// if we have written > 0 entities, let us store to a block. Else we can reject this block
if (entityCount > 0)
{
backupBlob.PutBlock(block.BlockId, stream, null, requestOptions);
return block.BlockId;
}
return null;
}
///
/// The class that maintains the global state for the iteration
///
internal class State
{
protected MemoryStream stream;
IEnumerator queryIterator;
internal State(CloudTableQuery query, MemoryStream stream)
{
this.queryIterator = query.GetEnumerator();
this.stream = stream;
}
///
/// This entity is the one we may have retrieved but it does not belong to the batch
/// So we store it here so that it can be returned on the next iteration
///
internal BackupEntity LookAheadEntity { private get; set; }
///
/// We have completed if look ahead entity is null and iterator is completed too.
///
internal bool HasCompleted
{
get
{
return this.queryIterator == null && this.LookAheadEntity == null;
}
}
///
/// Get the amount of data we have saved in the entity
///
internal long CurrentBlockSize
{
get
{
stream.Flush();
return stream.Position;
}
}
///
/// Return the next entity - which can be either the
/// look ahead entity or a new one from the iterator.
/// We return null if there are no more entities
///
///
internal BackupEntity GetNextEntity()
{
BackupEntity entityToReturn = null;
if (this.LookAheadEntity != null)
{
entityToReturn = this.LookAheadEntity;
this.LookAheadEntity = null;
}
else if (this.queryIterator != null)
{
if (this.queryIterator.MoveNext())
{
entityToReturn = this.queryIterator.Current;
}
else
{
this.queryIterator = null;
}
}
return entityToReturn;
}
}
///
/// Represents a collection of entities in a single batch
///
internal class Batch
{
static int MaxEntityCount = 100;
// Save at most 3.5MB in a batch so that we have enough room for
// the xml tags that WCF Data Services adds in the OData protocol
static int MaxBatchSize = (int)(3.5 * 1024 * 1024);
State state;
internal Batch(State state)
{
this.state = state;
}
///
/// Yield entities until we hit a condition that should terminate a batch.
/// The conditions to terminate on are:
/// 1. 100 entities in a batch
/// 2. 3.5MB of data
/// 2. 3.8MB of block size
/// 3. We see a new partition key
///
internal IEnumerable Entities
{
get
{
BackupEntity entity;
long currentSize = this.state.CurrentBlockSize;
string lastPartitionKeySeen = null;
int entityCount = 0;
while ((entity = state.GetNextEntity()) != null)
{
if (lastPartitionKeySeen == null)
{
lastPartitionKeySeen = entity.PartitionKey;
}
int approxEntitySize = entity.EntryElement.ToString().Length * 2;
long batchSize = this.state.CurrentBlockSize - currentSize;
if (entityCount >= Batch.MaxEntityCount
|| !string.Equals(entity.PartitionKey, lastPartitionKeySeen)
|| batchSize + approxEntitySize > Batch.MaxBatchSize
|| this.state.CurrentBlockSize + approxEntitySize > Block.MaxBlockSize)
{
// set this current entity as the look ahead since it needs to be part of the next batch
state.LookAheadEntity = entity;
yield break;
}
entityCount++;
yield return entity;
}
}
}
}
///
/// Represents all batches in a block
///
internal class Block
{
// Though a block can be of 4MB we will stop before to allow buffer
static int MaxBlockSize = (int)(3.8 * 1024 * 1024);
State state;
internal string BlockId { get; private set; }
internal Block(State state)
{
this.state = state;
this.BlockId = Convert.ToBase64String(Guid.NewGuid().ToByteArray());
}
///
/// The list of batches in the block.
///
internal IEnumerable Batches
{
get
{
while (!state.HasCompleted && state.CurrentBlockSize < Block.MaxBlockSize)
{
yield return new Batch(state);
}
}
}
}
///
/// Represents all blocks in a blob
///
internal class Blob
{
///
/// We will allow storing at most 20 blocks in a blob
///
static int MaxBlocksInBlobs = 20;
State state;
internal CloudBlob blob { get; private set; }
internal Blob(State state)
{
this.state = state;
}
///
/// The blocks that form the blob
///
internal IEnumerable Blocks
{
get
{
int blockCount = 0;
while (!state.HasCompleted && blockCount < Blob.MaxBlocksInBlobs)
{
blockCount++;
yield return new Block(state);
}
}
}
}
9
7
3
1
2
3
4
5
6
4
8
:
本文来源:不详 作者:佚名