F#, MBrace & Azure
Alena Dzenisenka
.NET Fringe 2015, Portland, Oregon
let maxContentLength = cloud {
let lengthsJob = [| "https://github.com";"http://www.microsoft.com/"|]
|> Array.map (getContentLenght >> Cloud.OfAsync)
let! lengths = Cloud.Parallel lengthsJob
return Array.max lengths
}
You care only about your main code logic
They handle side effects in the background
let cloud = new CloudBuilder()
cloud { (* code logic *) }
... name of the block is equal to name of the builder class instance ...
type CloudBuilder () =
member __.Return ...
member __.Bind ...
member __.Using ...
// other
Builder methods are triggered from cloud block and take care of all additional side effects work behind the scenes!
... operate Cloud<T> type
let! lengths = Cloud.Parallel lengthsJob
static member Parallel (computations : seq<Cloud<'T>>) : Cloud<'T []> = cloud {
let! runtime = Cloud.GetResource<IRuntimeProvider> ()
let workflow = runtime.ScheduleParallel (computations |> Seq.map (fun c -> c,None))
return! Cloud.WithAppendedStackTrace "Cloud.Parallel[T](seq<Cloud<T>> computations)" workflow
}
// don't worry, this code is already inside the framework
Cloud workflows:
MBrace
1. Machines, visible on the same network. Each node should have:
2. Client to send requests for the cluster machines. Client should have MBrace runtime and any F# supporting environment, e.g. F# Interactive, VS or any other.
With Azure subscriptionaccount you can:
With Azure account you also can:
After configuring the cluster using one of the possible options, it's time to connect and boot the runtime:
1. With Azure or custom machines, it's required to indicate the IP addresses or
host names:
And the default store we are going to use:
or
let nodes =
[
MBraceNode.Connect("10.0.1.4", 2675)
MBraceNode.Connect("10.0.1.5", 2675)
MBraceNode.Connect("10.0.1.6", 2675)
]
MBraceSettings.DefaultStore <- AzureStore.Create(accountName = "your name", accountKey = "your key")
MBraceSettings.DefaultStore <- FileSystemStore.Create @"\\path\to\shared"
let nodes =
[
MBraceNode.Connect("hostname1", 2675)
MBraceNode.Connect("hostname1", 2675)
MBraceNode.Connect("hostname1", 2675)
]
2. With Brisk cluster, it's necessary to create the configuration object with your
connection details:
let config = Configuration(myStorageConnectionString, myServiceBusConnectionString)
The storage connection string and service bus connection string may be obtained from brisk.
For custom cluster:
For Azure cluster:
For Brisk cluster:
let runtime = MBrace.Boot nodes
let runtime = MBrace.Boot(nodes, store = azureStore)
let runtime = AzureCluster.Connect(config)
Store distributed data entities. Immutable.
let dataInTheCloud = localData |> CloudValue.New |> runtime.Run
Can be read anytime in the future:
cloud {
let! dataFromCloudValue = CloudValue.Read dataInTheCloud
// actions with data
}
CloudValue:
async {
let value = ref 0
let parallel = Async.Parallel [| for i in 1 .. 10000 -> async { incr value } |]
return !value
} |> Async.RunSynchronously
Example of race condition in async:
cloud {
let value = ref 0
Cloud.Parallel [| for i in 1 .. 10000 -> cloud { incr value } |] |> ignore
return !value
} |> runtime.Run
May for probably give the result of 9981.
Will always return the initial value, which is 0 here
The same example executed within the cloud workflow:
let atomicValue = CloudAtom.New(42) |> cluster.Run
cloud {
do!
[ for i in 1..n -> cloud { return! CloudAtom.Update (atomicValue , fun i -> i + 1)}]
|> Cloud.Parallel
|> Cloud.Ignore
return! CloudAtom.Read atomicValue
} |> runtime.Run
CloudAtoms - safe transactional cross-runtime operations!
This should result in n + 42.
Cloud atoms are stored as refs under the hood.
CloudFlows allow to use efficient functional data query pipelines on streams of data in the cloud.
let songNames =
songsUrls
|> CloudFlow.OfArray
|> CloudFlow.map download
|> CloudFlow.filter (fun s -> s.Length > 300)
|> CloudFlow.map (fun s -> s.Substring(0, s.IndexOf("\n") - 1))
|> CloudFlow.toArray
|> runtime.Submit
Input
Predicate
Comparison
Function
Terminal
filter
sort
map
count, sum ...
In action:
let songCloudFile (song:string) (dir: CloudDirectoryInfo) = cloud {
let songName = song.Substring(0, song.IndexOf("\n") - 1).Trim().Replace("\"", "")
let fileName = Path.Combine(dir.Path, songName)
do! CloudFile.Delete fileName
return! CloudFile.WriteAllText(path = fileName,text = song)
}
Store data in the cloud using the binary blobs representation.
let songCloudFilesProcess =
songFileNames
|> Array.map (fun i -> cloud {
let! songText = download i |> Cloud.OfAsync
return! songCloudFile songText
})
|> Cloud.Parallel |> cluster.Run
Creation of cloud file:
Start the function for necessary input:
let displayLyrics (song: string) = cloud {
let fileName = Path.Combine(dir.Path, song)
return! CloudFile.ReadAllText fileName
}
let songLyrics = displayLyrics "Smells Like Teen Spirit" |> runtime.Run
Read from the cloud files easily:
do! CloudDirectory.Delete dir
Remove cloud directories or cloud files:
let! exists = CloudDirectory.Exists dir
let! cloudFiles = CloudFile.Enumerate dir
Existance:
Enumeration:
// Creating an anonumous cloud queue and sending the message
let queue = CloudQueue.New() |> cluster.Run
// Sending
CloudQueue.Enqueue (queue, "Monads") |> cluster.Run
// Receiving
let msg = CloudQueue.Dequeue(queue) |> cluster.Run
Send and receive messages with cloud workflows, reactive things!
let sendTask =
cloud { for i in [ 0 .. 100 ] do
do! queue.Enqueue (sprintf "FSharp%d" i) }
|> cluster.Submit
sendTask.ShowInfo()
let receiveMessages = async {
while true do
let! result = channel.ReceiveAsync(receivePort2)
printfn "Received: %A" result
}
Send messages from client:
// kill all jobs
receiveCancel.Cancel()
replicateMessage.Kill()
channel.Delete receivePort1
channel.Delete receivePort2
And the receiving job:
let receiveTask =
cloud { let results = new ResizeArray<_>()
for i in [ 0 .. 100 ] do
let! msg = CloudQueue.Dequeue(queue)
results.Add msg
return results.ToArray() }
|> cluster.Submit
Send some messages and see the results!
MBrace provides ICloudDisposable type, which acts like IDisposable for distributed environment.
/// Denotes handle to a distributable resource that can be disposed of.
type ICloudDisposable =
/// Releases any storage resources used by this object.
abstract Dispose : unit -> Local<unit>
We may apply use and use! with ICloudDisposable objects.
interface ICloudDisposable with
member d.Dispose () = CloudDirectory.Delete(d, recursiveDelete = true)
Implementation of ICloudDisposable for CloudDirectory:
System.Net.WebException: The remote name could not be resolved: 'www.someunknownaddress'
at Microsoft.FSharp.Control.WebExtensions.AsyncGetResponse@2036-1.Invoke(Exception _arg1)
at Microsoft.FSharp.Control.AsyncBuilderImpl.callA@803.Invoke(AsyncParams`1 args)
at Cloud.Parallel[T](seq<Cloud<T>> computations)
--- End of stack trace from previous location where exception was thrown ---
в Microsoft.FSharp.Core.Operators.Raise[T](Exception exn)
в <StartupCode$MBrace-Azure-Client>.$Process.AwaitResultAsync@116-1.Invoke(Result`1 _arg2)
в C:\Users\developer001\Source\Repos\MBrace.Azure\src\MBrace.Azure.Client\Process.fs:строка 116
в Microsoft.FSharp.Control.AsyncBuilderImpl.args@797-1.Invoke(a a)
at MBrace.Continuation.ExceptionDispatchInfoUtils.Async.RunSync[T](FSharpAsync`1 workflow, FSharpOption`1 cancellationToken)
in C:\dev\github-repositories\MBrace.Core\src\MBrace.Core\Continuation\ExceptionDispatchInfo.fs:line 134
at MBrace.Azure.Client.Process`1.AwaitResult()
in C:\Users\developer001\Source\Repos\MBrace.Azure\src\MBrace.Azure.Client\Process.fs:line 110
at <StartupCode$MBrace-Azure-Client>.$Client.RunAsync@103-2.Invoke(Unit unitVar)
in C:\Users\developer001\Source\Repos\MBrace.Azure\src\MBrace.Azure.Client\Client.fs:line 103
at Microsoft.FSharp.Control.AsyncBuilderImpl.callA@813.Invoke(AsyncParams`1 args)
at MBrace.Continuation.ExceptionDispatchInfoUtils.Async.RunSync[T](FSharpAsync`1 workflow, FSharpOption`1 cancellationToken)
in C:\dev\github-repositories\MBrace.Core\src\MBrace.Core\Continuation\ExceptionDispatchInfo.fs:line 134
at MBrace.Azure.Client.Runtime.Run[T](Cloud`1 workflow, FSharpOption`1 cancellationToken, FSharpOption`1 faultPolicy)
in C:\Users\developer001\Source\Repos\MBrace.Azure\src\MBrace.Azure.Client\Client.fs:line 125
at <StartupCode$FSI_0010>.$FSI_0010.main@() in RestrictionsFilter.fsx:line 54
Stopped due to error
>
let maxContentLenght = cloud {
let lengthsJob = [|"https://github.com"; "https://www.microsoft.com"; "http://www.someunknownaddress"|]
|> Array.map (getContentLenght >> Cloud.OfAsync)
let! lengths = Cloud.Parallel lengthsJob
return Array.max lengths
}
We get the concrete exception:
Executing the computation with the wrong link:
// Lists all processes of the runtime with their statuses, execution time, etc.
runtime.ShowProcesses()
// Lists all cluster machines with the info for CPU, memory, network usage, etc.
runtime.ShowWorkers()
// Gets runtime logs as an array of LogRecords
runtime.GetSystemLogs()
// Attaches specified logger to the runtime
runtime.AttachLogger(...)
// Kills the process by name
runtime.GetProcessById("someprocess").Cancel()
// Deletes runtime records for all the processes
runtime.ClearAllProcesses()
// Deletes and re-activates runtime state
runtime.Reset()
SUMMARY
Thank You!