Embracing Clouds

F#, MBrace & Azure

 

Alena Dzenisenka

.NET Fringe 2015, Portland, Oregon

Contents of the talk:

  • Introduction - F# in the Cloud
  • MBrace - Big Compute and Big Data
  • Code & Demo

F# in the cloud

 
 let maxContentLength = cloud {

     let lengthsJob = [| "https://github.com";"http://www.microsoft.com/"|]
                      |> Array.map (getContentLenght >> Cloud.OfAsync)
    
     let! lengths = Cloud.Parallel lengthsJob
     return Array.max lengths
 }

How is it possible?...

F# computation expressions!

You care only about your main code logic

They handle side effects in the background

What happens inside?...

???

CODE...

PROFIT!!!

 let cloud = new CloudBuilder()
 cloud { (* code logic *) }

What cloud block hides...

... name of the block is equal to name of the builder class instance ...

CloudBuilder methods...

type CloudBuilder () =
    member __.Return ...
    member __.Bind ...
    member __.Using ...
    // other

Builder methods are triggered from cloud block and take care of all additional side effects work behind the scenes!

... operate Cloud<T> type

How does it parallelize?

 let! lengths = Cloud.Parallel lengthsJob
 static member Parallel (computations : seq<Cloud<'T>>) : Cloud<'T []> = cloud {
        let! runtime = Cloud.GetResource<IRuntimeProvider> ()
        let workflow = runtime.ScheduleParallel (computations |> Seq.map (fun c -> c,None))
        return! Cloud.WithAppendedStackTrace "Cloud.Parallel[T](seq<Cloud<T>> computations)" workflow
 }

What happens behind the scenes?

// don't worry, this code is already inside the framework

Let's save it in our memory!

  • Contain actions for deferred execution in the cloud
  • Use a set of worker machines to perform jobs
  • Implemented inside MBrace Framework
  • Built as F# computation expressions / monads
  • Inspired by async workflows
  • Use CloudBuilder methods to handle side effects
  • Operate the Cloud<'T> wrapper type

Cloud workflows:

MBrace

CLUSTER!

Cluster everywhere!

And how to start?...

Cluster setup options

  • Custom cluster configuration
  • MBrace on Windows Azure IaaS
  • MBrace cluster using Brisk!

Custom cluster configuration

1. Machines, visible on the same network. Each node should have:

  • MBrace runtime installed
  • Necessary firewall exceptions
  • Ensuring mbraced.exe.config has correct settings
  • MBrace windows service started and running

2. Client to send requests for the cluster machines. Client should have MBrace runtime and any F# supporting environment, e.g. F# Interactive, VS or any other.

Azure IaaS cluster

With Azure subscriptionaccount you can: 

  1. Virtual Network (mandatory before the creation of virtual machines) following the instructions
  2. virtual machines located within previously created Virtual Network and configure them as MBrace nodes. Specify the IP address in the mbraced.exe.config file.
  3. Have the client with MBrace runtime and F# to send requests to your Azure VMs cluster.

Brisk cluster for MBrace!

With Azure account you also can: 

  1. Sign up on the Brisk website: https://www.briskengine.com/
  2. Click "Create new cluster" button and follow the instructions. 
  3. Indicate the number of worker machines and other settings for the cluster.
  4. In several minutes you will get a cluster ready for usage:

Prepare for boot 

After configuring the cluster using one of the possible options, it's time to connect and boot the runtime:

 1. With Azure or custom machines, it's required to indicate the IP addresses or

     host names:

 

 

 

     And the default store we are going to use:

 

    or

 

let nodes =
    [
        MBraceNode.Connect("10.0.1.4", 2675)
        MBraceNode.Connect("10.0.1.5", 2675)
        MBraceNode.Connect("10.0.1.6", 2675)
    ]
MBraceSettings.DefaultStore <- AzureStore.Create(accountName = "your name", accountKey = "your key")
MBraceSettings.DefaultStore <- FileSystemStore.Create @"\\path\to\shared"
let nodes =
    [
        MBraceNode.Connect("hostname1", 2675)
        MBraceNode.Connect("hostname1", 2675)
        MBraceNode.Connect("hostname1", 2675)
    ]

Prepare for boot with Brisk!

2.  With Brisk cluster, it's necessary to create the configuration object with your

     connection details:

 

 

 

let config = Configuration(myStorageConnectionString, myServiceBusConnectionString)

The storage connection string and service bus connection string may be obtained from brisk.

And finally - boot the runtime!

For custom cluster:

 

 

For Azure cluster:

 

 

For Brisk cluster:

let runtime = MBrace.Boot nodes
let runtime = MBrace.Boot(nodes, store = azureStore)
let runtime = AzureCluster.Connect(config)

MBrace Features

  • Cloud combinators
  • CloudValues
  • CloudAtoms
  • CloudFiles
  • CloudFlows
  • CloudQueues
  • Cloud disposable resources
  • Exception handling
  • And many more!

Cloud combinators

  • Cloud.Parallel
  • Cloud.Choice
  • Cloud.OfAsync
  • Cloud.Raise
  • Cloud.Catch
  • Cloud.Dispose
  • Cloud.Ignore
  • Other...

Cloud Values and Persistent Cloud Flows

Store distributed data entities. Immutable.

 let dataInTheCloud = localData |> CloudValue.New |> runtime.Run

Can be read anytime in the future:

 cloud { 
     let! dataFromCloudValue = CloudValue.Read dataInTheCloud
     // actions with data
 }

CloudValue:

CloudAtoms

async {
    let value = ref 0
    let parallel = Async.Parallel [| for i in 1 .. 10000 -> async { incr value } |]
    return !value 
} |> Async.RunSynchronously

Example of race condition in async:

cloud {
    let value = ref 0
    Cloud.Parallel [| for i in 1 .. 10000 -> cloud { incr value } |] |> ignore
    return !value 
} |> runtime.Run

May for probably give the result of 9981.

Will always return the initial value, which is 0 here

The same example executed within the cloud workflow:

So ... CloudAtoms!

let atomicValue = CloudAtom.New(42) |> cluster.Run

cloud {
    do!
       [ for i in 1..n -> cloud { return! CloudAtom.Update (atomicValue , fun i -> i + 1)}]
       |> Cloud.Parallel
       |> Cloud.Ignore
    return! CloudAtom.Read atomicValue 
} |> runtime.Run

CloudAtoms - safe transactional cross-runtime operations!

This should result in n + 42. 

Cloud atoms are stored as refs under the hood. 

CloudFlows

CloudFlows allow to use efficient functional data query pipelines on streams of data in the cloud. 

 let songNames = 
     songsUrls 
     |> CloudFlow.OfArray
     |> CloudFlow.map download
     |> CloudFlow.filter (fun s -> s.Length > 300)
     |> CloudFlow.map (fun s -> s.Substring(0, s.IndexOf("\n") - 1))
     |> CloudFlow.toArray
     |> runtime.Submit

Input

Predicate

Comparison

Function

Terminal

filter

sort

map

    count, sum ...

In action:

CloudFiles

 let songCloudFile (song:string) (dir: CloudDirectoryInfo) = cloud { 
    let songName = song.Substring(0, song.IndexOf("\n") - 1).Trim().Replace("\"", "")
    let fileName = Path.Combine(dir.Path, songName)
    do! CloudFile.Delete fileName 
    return! CloudFile.WriteAllText(path = fileName,text = song)
}

Store data in the cloud using the binary blobs representation.

 let songCloudFilesProcess = 
    songFileNames
    |> Array.map (fun i -> cloud {
        let! songText =  download i |> Cloud.OfAsync
        return! songCloudFile songText
    })
    |> Cloud.Parallel |> cluster.Run

Creation of cloud file:

Start the function for necessary input:

CloudFiles

 let displayLyrics (song: string) = cloud {
    let fileName = Path.Combine(dir.Path, song)
    return! CloudFile.ReadAllText fileName 
}

 let songLyrics = displayLyrics "Smells Like Teen Spirit" |> runtime.Run

Read from the cloud files easily:

 do! CloudDirectory.Delete dir

Remove cloud directories or cloud files:

 let! exists = CloudDirectory.Exists dir
 let! cloudFiles = CloudFile.Enumerate dir

Existance:

Enumeration:

CloudQueues

// Creating an anonumous cloud queue and sending the message
let queue = CloudQueue.New() |> cluster.Run

// Sending
CloudQueue.Enqueue (queue, "Monads") |> cluster.Run

// Receiving
let msg = CloudQueue.Dequeue(queue) |> cluster.Run
 

Send and receive messages with cloud workflows, reactive things!


let sendTask = 
    cloud { for i in [ 0 .. 100 ] do 
                do! queue.Enqueue (sprintf "FSharp%d" i) }
     |> cluster.Submit

sendTask.ShowInfo() 
let receiveMessages = async {
        while true do
            let! result = channel.ReceiveAsync(receivePort2) 
            printfn "Received: %A" result
 }

Send messages from client:

CloudQueues!

 // kill all jobs
 receiveCancel.Cancel()
 replicateMessage.Kill()
 channel.Delete receivePort1
 channel.Delete receivePort2

And the receiving job:

let receiveTask = 
    cloud { let results = new ResizeArray<_>()
            for i in [ 0 .. 100 ] do 
               let! msg = CloudQueue.Dequeue(queue)
               results.Add msg
            return results.ToArray() }
     |> cluster.Submit

Send some messages and see the results!

Disposing cloud things!

MBrace provides ICloudDisposable type, which acts like IDisposable for distributed environment.

/// Denotes handle to a distributable resource that can be disposed of.
type ICloudDisposable =
    /// Releases any storage resources used by this object.
    abstract Dispose : unit -> Local<unit>

We may apply use and use! with ICloudDisposable objects.

interface ICloudDisposable with
    member d.Dispose () = CloudDirectory.Delete(d, recursiveDelete = true)

Implementation of ICloudDisposable for CloudDirectory:

Exception handling

System.Net.WebException: The remote name could not be resolved: 'www.someunknownaddress'
   at Microsoft.FSharp.Control.WebExtensions.AsyncGetResponse@2036-1.Invoke(Exception _arg1)
   at Microsoft.FSharp.Control.AsyncBuilderImpl.callA@803.Invoke(AsyncParams`1 args)
   at Cloud.Parallel[T](seq<Cloud<T>> computations)
--- End of stack trace from previous location where exception was thrown ---
   в Microsoft.FSharp.Core.Operators.Raise[T](Exception exn)
   в <StartupCode$MBrace-Azure-Client>.$Process.AwaitResultAsync@116-1.Invoke(Result`1 _arg2)
   в C:\Users\developer001\Source\Repos\MBrace.Azure\src\MBrace.Azure.Client\Process.fs:строка 116
   в Microsoft.FSharp.Control.AsyncBuilderImpl.args@797-1.Invoke(a a)
   at MBrace.Continuation.ExceptionDispatchInfoUtils.Async.RunSync[T](FSharpAsync`1 workflow, FSharpOption`1 cancellationToken)
   in C:\dev\github-repositories\MBrace.Core\src\MBrace.Core\Continuation\ExceptionDispatchInfo.fs:line 134
   at MBrace.Azure.Client.Process`1.AwaitResult()
   in C:\Users\developer001\Source\Repos\MBrace.Azure\src\MBrace.Azure.Client\Process.fs:line 110
   at <StartupCode$MBrace-Azure-Client>.$Client.RunAsync@103-2.Invoke(Unit unitVar)
   in C:\Users\developer001\Source\Repos\MBrace.Azure\src\MBrace.Azure.Client\Client.fs:line 103
   at Microsoft.FSharp.Control.AsyncBuilderImpl.callA@813.Invoke(AsyncParams`1 args)
   at MBrace.Continuation.ExceptionDispatchInfoUtils.Async.RunSync[T](FSharpAsync`1 workflow, FSharpOption`1 cancellationToken)
   in C:\dev\github-repositories\MBrace.Core\src\MBrace.Core\Continuation\ExceptionDispatchInfo.fs:line 134
   at MBrace.Azure.Client.Runtime.Run[T](Cloud`1 workflow, FSharpOption`1 cancellationToken, FSharpOption`1 faultPolicy)
   in C:\Users\developer001\Source\Repos\MBrace.Azure\src\MBrace.Azure.Client\Client.fs:line 125
   at <StartupCode$FSI_0010>.$FSI_0010.main@() in RestrictionsFilter.fsx:line 54
Stopped due to error
> 
let maxContentLenght = cloud {
    let lengthsJob = [|"https://github.com"; "https://www.microsoft.com"; "http://www.someunknownaddress"|] 
                     |> Array.map (getContentLenght >> Cloud.OfAsync)           
    let! lengths = Cloud.Parallel lengthsJob
    return Array.max lengths
}

We get the concrete exception:

Executing the computation with the wrong link:

Administrative commands


// Lists all processes of the runtime with their statuses, execution time, etc.
runtime.ShowProcesses()

// Lists all cluster machines with the info for CPU, memory, network usage, etc.
runtime.ShowWorkers()

// Gets runtime logs as an array of LogRecords
runtime.GetSystemLogs()

// Attaches specified logger to the runtime
runtime.AttachLogger(...)

// Kills the process by name
runtime.GetProcessById("someprocess").Cancel()

// Deletes runtime records for all the processes
runtime.ClearAllProcesses()

// Deletes and re-activates runtime state
runtime.Reset()

TALK IS CHEAP

SHOW ME THE CODE

Result

MBrace vs Hadoop performance

SUMMARY

Useful resources:

Thank You!