This is the series of blog posts about the functional approach to cloud computations and big data using F# and MBrace framework.
Motivation
MBrace is fully open source framework written in F# and offers very clear and easy programming model. It aims to create all the necessary conditions to make distributed cloud scale computations and data as simple as possible. At the same time it is totally straightforward to setup MBrace clusters. In this series you will get the concept of "cloud monads" or cloud computation expressions, find out how to create a cluster and configure the environment, explore the features and opportunities we have with new F# functional cloud programming model and view the code of MBrace examples.
Background
Here, in the Part III of the series you will have a look at the features of MBrace framework. If you want to know more about the concepts and the approach behind MBrace - it may be interesting to have a look at Part I. If you want to learn more about the infrastructural options of creating the cluster for MBrace - it may be interesting to have a look at Part II.
MBrace features
- Cloud combinators
- CloudCells and CloudVectors
- CloudAtoms
- CloudFiles
- CloudFlows
- CloudChannels
- Cloud disposable resources
- Exception handling
- And many more!
Cloud Combinators
As you already seen in Part I, we may use cloud combinators in our code. Cloud.Parallel
takes a sequence of cloud computations and executes them in parallel, then it returns an array of gathered results or throws an exception. But there are other interesting combinators.
Take for example Cloud.Choice
, it also performs cloud computations in parallel, but it will return only when any of these computations returned a successful value, or if all of them failed to succeed. So we pass there computations returning Option
type - either Some
or None
.
We can also wrap our asynchronous function into a cloud workflow using Cloud.OfAsync
. This combinator is useful if we already have existing async functions and want to use them in cloud worklows.
There are other combinators, like Cloud.Raise
and Cloud.Catch
for raising and catching exceptions from the cloud workflow, Cloud.Dispose
for disposing cloud resources, and Cloud.Ignore
to discard the results of the cloud computation. You can find other combinators in the API or just check out the code on github.
Cloud Cells and Cloud Vectors
So far you have seen how MBrace scales the computations. Now it's time to see how it distributes data. The most simple way of storing data entities in the cloud is to use CloudCell
type. It persists the object in the underlying store and provides an immutable reference to it. If we have some local data it is easy to save it using CloudCell.New
and run this opetation on a cluster. This way we get a CloudCell
object, which we can use for our tasks in the future. Cloud cells are cached locally on machines for performance.
If you need to store some large collections of objects, it will make sense to use partitioned data structure CloudVector
. It is similar to CloudCell
, it is immutable, but it's values are retrieved as required only if needed. When using CloudVector
, we may create it from a normal array and also specify how many bytes per chunk each partition of the CloudVector
can store.
Cloud Atoms
Take a look at the async incrementation of some reference value:
Here, there is an async block and incrementation of the value 10 thousand times. The result of this computation can be different from time to time, because it's a typical race condition. If we do the same code on the cloud...
...we will get the result of zero. Seems like the value wasn't incremented at all. This happens because when the code is distributed in the cloud, each node gets the copy of initial data and there should be a specific way to perform cluster-level synchronization. That's why cloud atoms exist, they serve to preform transactional safe cross runtime operations. To create new cloud atom you can use CloudAtom.New
.
If we now rewrite the code with cloud atoms and update its value using CloudAtom.Update
, we will get the correct value at the end:
If we look inside, cloud atoms are stores as refs under the hood. So inside the Update
method MBrace replaces the existing element's value with an alternative value as an atomic operation. It uses Interlocked.CompareExchange
to do it.
Cloud Flows
MBrace has another exciting feature, called cloud flows. They allow to perform distributed cloud parallel data flow tasks. Cloud flows have a certain model of operations pipelining:
All starts from the input data. Then there is a number of intermediate operations, like filtering, ordering, transformations.. and a final terminating operation like sum
, count
, toArray
or any other. For example, when we are creating the CloudFlow
from an array, this array is divided into parts, so that all existing nodes will get one of them. Then the operations like filtering and mapping are performed on each part of the input array as cloud tasks, and the terminating operation is executed as a final step.
There are different options for input data for the CloudFlow
. The siplest is an array, but you can also create a flow from cloud vectors, cloud files and cloud channels. Cloud files and cloud channels are next in the queue!
Cloud Files
Cloud files are the way to store data in the cloud using the binary blob representation. Have a look at the quick example below:
Suppose there is an array with URL addresses with songs lyrics. There is also a function to download the lyrics from the given address. And now we want to create some cloud files to store the lyrics of all the songs! To do this, there is songCloudFile
function. So we are creating a cloud file with the name equal to the name of the song, which is the first line of the downloaded lyrics. After that we are writing the text to the cloud file using CloudFile.WriteAllText
. To push the process and make the work done for all the songs we use Cloud.Parallel
where a new cloud file is created for each song using Array.map
. When cloud files already exist, they can be easily read using ReadAllLines
, ReadAllText
, ReadAllBytes
methods. When cloud files are not already required, they can be removed either as a separate CloudFile
or all files from the entire CloudDirectory
.
Cloud Channels
When you don't have all input data at the moment and data is coming by influxes - that might be one of the use cases for cloud channels - to do reactive things. In a way, that our cluster might provide us the updated result taking into account new data. It is one of the reasons why cloud channels may be helpful.
Pretend we have a function in the cloud, that will perform some work, for example, replicate the string random number of times:
Here this function is called replicateMessage
and takes channel receive port and send port as parameters. Inside, it has an endless task of receiving new data from the receive port if there is something available, performing the replication and sending the result back to the send port. This function is one part of the example, so we are launching this job and it will silently listen for new data. Another part of the example is creating the job that will send new input from some client to the cloud, and the job that what will receive the results as they become available. As a result, when we start these jobs, we may send any number of messages to our cloud job and get the results as soon as they are available. Don't forget to gracefully kill the jobs when they are not already required!
Disposing cloud things
Another useful thing MBrace offers is ICloudDisposable
type, which acts like IDisposable
for distributed environment. This type defines Dispose
method that releases the resources used by the object.
A lot of MBrace types use IcloudDisposable
. To automatically release the resources used by some object there is an opportunity to define it with use
keyword, which will trigger Dispose
when the work with this object is finished. For example, inside Dispose
method of CloudDirectory
class the directory is deleted.
Exception Handling
Another really strong characteristic of MBrace is exception handling. If some exceptions occur in the cloud workflows they can be caught the same way as we are used to. So, the state of the exception is global and not tied to any local context.
So if we have the input array with the wrong address inside...
... we will get an exception in the client with clear message and presenved stacktrace information:
Commands for administration and health monitoring
To monitor the state of the cluster or get some process metadata there are very useful administrative commands: