Functional approach to distributed computations and Big Data with F# MBrace cloud monads. Part 3.

This is the series of blog posts about the functional approach to cloud computations and big data using F# and MBrace framework.

Motivation

MBrace is fully open source framework written in F# and offers very clear and easy programming model. It aims to create all the necessary conditions to make distributed cloud scale computations and data as simple as possible. At the same time it is totally straightforward to setup MBrace clusters. In this series you will get the concept of "cloud monads" or cloud computation expressions, find out how to create a cluster and configure the environment, explore the features and opportunities we have with new F# functional cloud programming model and view the code of MBrace examples.

Background

Here, in the Part III of the series you will have a look at the features of MBrace framework. If you want to know more about the concepts and the approach behind MBrace - it may be interesting to have a look at Part I. If you want to learn more about the infrastructural options of creating the cluster for MBrace - it may be interesting to have a look at Part II.

MBrace features

  • Cloud combinators
  • CloudCells and CloudVectors
  • CloudAtoms
  • CloudFiles
  • CloudFlows
  • CloudChannels
  • Cloud disposable resources
  • Exception handling
  • And many more!

Cloud Combinators

As you already seen in Part I, we may use cloud combinators in our code. Cloud.Parallel takes a sequence of cloud computations and executes them in parallel, then it returns an array of gathered results or throws an exception. But there are other interesting combinators.

Take for example Cloud.Choice, it also performs cloud computations in parallel, but it will return only when any of these computations returned a successful value, or if all of them failed to succeed. So we pass there computations returning Option type - either Some or None.

We can also wrap our asynchronous function into a cloud workflow using Cloud.OfAsync. This combinator is useful if we already have existing async functions and want to use them in cloud worklows.

There are other combinators, like Cloud.Raise and Cloud.Catch for raising and catching exceptions from the cloud workflow, Cloud.Dispose for disposing cloud resources, and Cloud.Ignore to discard the results of the cloud computation. You can find other combinators in the API or just check out the code on github.

Cloud Cells and Cloud Vectors

So far you have seen how MBrace scales the computations. Now it's time to see how it distributes data. The most simple way of storing data entities in the cloud is to use CloudCell type. It persists the object in the underlying store and provides an immutable reference to it. If we have some local data it is easy to save it using CloudCell.New and run this opetation on a cluster. This way we get a CloudCell object, which we can use for our tasks in the future. Cloud cells are cached locally on machines for performance.
If you need to store some large collections of objects, it will make sense to use partitioned data structure CloudVector. It is similar to CloudCell, it is immutable, but it's values are retrieved as required only if needed. When using CloudVector, we may create it from a normal array and also specify how many bytes per chunk each partition of the CloudVector can store.

Cloud Atoms

Take a look at the async incrementation of some reference value: Here, there is an async block and incrementation of the value 10 thousand times. The result of this computation can be different from time to time, because it's a typical race condition. If we do the same code on the cloud...

...we will get the result of zero. Seems like the value wasn't incremented at all. This happens because when the code is distributed in the cloud, each node gets the copy of initial data and there should be a specific way to perform cluster-level synchronization. That's why cloud atoms exist, they serve to preform transactional safe cross runtime operations. To create new cloud atom you can use CloudAtom.New.

If we now rewrite the code with cloud atoms and update its value using CloudAtom.Update, we will get the correct value at the end:

If we look inside, cloud atoms are stores as refs under the hood. So inside the Update method MBrace replaces the existing element's value with an alternative value as an atomic operation. It uses Interlocked.CompareExchange to do it.

Cloud Flows

MBrace has another exciting feature, called cloud flows. They allow to perform distributed cloud parallel data flow tasks. Cloud flows have a certain model of operations pipelining:


All starts from the input data. Then there is a number of intermediate operations, like filtering, ordering, transformations.. and a final terminating operation like sum, count, toArray or any other. For example, when we are creating the CloudFlow from an array, this array is divided into parts, so that all existing nodes will get one of them. Then the operations like filtering and mapping are performed on each part of the input array as cloud tasks, and the terminating operation is executed as a final step.
There are different options for input data for the CloudFlow. The siplest is an array, but you can also create a flow from cloud vectors, cloud files and cloud channels. Cloud files and cloud channels are next in the queue!

Cloud Files

Cloud files are the way to store data in the cloud using the binary blob representation. Have a look at the quick example below:
Suppose there is an array with URL addresses with songs lyrics. There is also a function to download the lyrics from the given address. And now we want to create some cloud files to store the lyrics of all the songs! To do this, there is songCloudFile function. So we are creating a cloud file with the name equal to the name of the song, which is the first line of the downloaded lyrics. After that we are writing the text to the cloud file using CloudFile.WriteAllText. To push the process and make the work done for all the songs we use Cloud.Parallel where a new cloud file is created for each song using Array.map. When cloud files already exist, they can be easily read using ReadAllLines, ReadAllText, ReadAllBytes methods. When cloud files are not already required, they can be removed either as a separate CloudFile or all files from the entire CloudDirectory.

Cloud Channels

When you don't have all input data at the moment and data is coming by influxes - that might be one of the use cases for cloud channels - to do reactive things. In a way, that our cluster might provide us the updated result taking into account new data. It is one of the reasons why cloud channels may be helpful.

Pretend we have a function in the cloud, that will perform some work, for example, replicate the string random number of times:
Here this function is called replicateMessage and takes channel receive port and send port as parameters. Inside, it has an endless task of receiving new data from the receive port if there is something available, performing the replication and sending the result back to the send port. This function is one part of the example, so we are launching this job and it will silently listen for new data. Another part of the example is creating the job that will send new input from some client to the cloud, and the job that what will receive the results as they become available. As a result, when we start these jobs, we may send any number of messages to our cloud job and get the results as soon as they are available. Don't forget to gracefully kill the jobs when they are not already required!

Disposing cloud things

Another useful thing MBrace offers is ICloudDisposable type, which acts like IDisposable for distributed environment. This type defines Dispose method that releases the resources used by the object.
A lot of MBrace types use IcloudDisposable. To automatically release the resources used by some object there is an opportunity to define it with use keyword, which will trigger Dispose when the work with this object is finished. For example, inside Dispose method of CloudDirectory class the directory is deleted.

Exception Handling

Another really strong characteristic of MBrace is exception handling. If some exceptions occur in the cloud workflows they can be caught the same way as we are used to. So, the state of the exception is global and not tied to any local context.
So if we have the input array with the wrong address inside...
... we will get an exception in the client with clear message and presenved stacktrace information:

Commands for administration and health monitoring

To monitor the state of the cluster or get some process metadata there are very useful administrative commands:

More coming soon!


19.05.2015
|
fsharp cloud big data mbrace functional programming
Yandex.Metrica