F#, MBrace & Azure
Alena Dzenisenka
.NET Fringe 2015, Portland, Oregon
let maxContentLength = cloud {
let lengthsJob = [| "https://github.com";"http://www.microsoft.com/"|]
|> Array.map (getContentLenght >> Cloud.OfAsync)
let! lengths = Cloud.Parallel lengthsJob
return Array.max lengths
}You care only about your main code logic
They handle side effects in the background
let cloud = new CloudBuilder() cloud { (* code logic *) }... name of the block is equal to name of the builder class instance ...
type CloudBuilder () =
member __.Return ...
member __.Bind ...
member __.Using ...
// otherBuilder methods are triggered from cloud block and take care of all additional side effects work behind the scenes!
... operate Cloud<T> type
let! lengths = Cloud.Parallel lengthsJob static member Parallel (computations : seq<Cloud<'T>>) : Cloud<'T []> = cloud {
let! runtime = Cloud.GetResource<IRuntimeProvider> ()
let workflow = runtime.ScheduleParallel (computations |> Seq.map (fun c -> c,None))
return! Cloud.WithAppendedStackTrace "Cloud.Parallel[T](seq<Cloud<T>> computations)" workflow
}// don't worry, this code is already inside the framework
Cloud workflows:
MBrace
1. Machines, visible on the same network. Each node should have:
2. Client to send requests for the cluster machines. Client should have MBrace runtime and any F# supporting environment, e.g. F# Interactive, VS or any other.
With Azure subscriptionaccount you can:
With Azure account you also can:
After configuring the cluster using one of the possible options, it's time to connect and boot the runtime:
1. With Azure or custom machines, it's required to indicate the IP addresses or
host names:
And the default store we are going to use:
or
let nodes =
[
MBraceNode.Connect("10.0.1.4", 2675)
MBraceNode.Connect("10.0.1.5", 2675)
MBraceNode.Connect("10.0.1.6", 2675)
]MBraceSettings.DefaultStore <- AzureStore.Create(accountName = "your name", accountKey = "your key")MBraceSettings.DefaultStore <- FileSystemStore.Create @"\\path\to\shared"let nodes =
[
MBraceNode.Connect("hostname1", 2675)
MBraceNode.Connect("hostname1", 2675)
MBraceNode.Connect("hostname1", 2675)
]2. With Brisk cluster, it's necessary to create the configuration object with your
connection details:
let config = Configuration(myStorageConnectionString, myServiceBusConnectionString)The storage connection string and service bus connection string may be obtained from brisk.
For custom cluster:
For Azure cluster:
For Brisk cluster:
let runtime = MBrace.Boot nodeslet runtime = MBrace.Boot(nodes, store = azureStore)let runtime = AzureCluster.Connect(config)Store distributed data entities. Immutable.
let dataInTheCloud = localData |> CloudValue.New |> runtime.RunCan be read anytime in the future:
cloud {
let! dataFromCloudValue = CloudValue.Read dataInTheCloud
// actions with data
}CloudValue:
async {
let value = ref 0
let parallel = Async.Parallel [| for i in 1 .. 10000 -> async { incr value } |]
return !value
} |> Async.RunSynchronouslyExample of race condition in async:
cloud {
let value = ref 0
Cloud.Parallel [| for i in 1 .. 10000 -> cloud { incr value } |] |> ignore
return !value
} |> runtime.RunMay for probably give the result of 9981.
Will always return the initial value, which is 0 here
The same example executed within the cloud workflow:
let atomicValue = CloudAtom.New(42) |> cluster.Run
cloud {
do!
[ for i in 1..n -> cloud { return! CloudAtom.Update (atomicValue , fun i -> i + 1)}]
|> Cloud.Parallel
|> Cloud.Ignore
return! CloudAtom.Read atomicValue
} |> runtime.RunCloudAtoms - safe transactional cross-runtime operations!
This should result in n + 42.
Cloud atoms are stored as refs under the hood.
CloudFlows allow to use efficient functional data query pipelines on streams of data in the cloud.
let songNames =
songsUrls
|> CloudFlow.OfArray
|> CloudFlow.map download
|> CloudFlow.filter (fun s -> s.Length > 300)
|> CloudFlow.map (fun s -> s.Substring(0, s.IndexOf("\n") - 1))
|> CloudFlow.toArray
|> runtime.SubmitInput
Predicate
Comparison
Function
Terminal
filter
sort
map
count, sum ...
In action:
let songCloudFile (song:string) (dir: CloudDirectoryInfo) = cloud {
let songName = song.Substring(0, song.IndexOf("\n") - 1).Trim().Replace("\"", "")
let fileName = Path.Combine(dir.Path, songName)
do! CloudFile.Delete fileName
return! CloudFile.WriteAllText(path = fileName,text = song)
}Store data in the cloud using the binary blobs representation.
let songCloudFilesProcess =
songFileNames
|> Array.map (fun i -> cloud {
let! songText = download i |> Cloud.OfAsync
return! songCloudFile songText
})
|> Cloud.Parallel |> cluster.RunCreation of cloud file:
Start the function for necessary input:
let displayLyrics (song: string) = cloud {
let fileName = Path.Combine(dir.Path, song)
return! CloudFile.ReadAllText fileName
}
let songLyrics = displayLyrics "Smells Like Teen Spirit" |> runtime.RunRead from the cloud files easily:
do! CloudDirectory.Delete dirRemove cloud directories or cloud files:
let! exists = CloudDirectory.Exists dir let! cloudFiles = CloudFile.Enumerate dirExistance:
Enumeration:
// Creating an anonumous cloud queue and sending the message
let queue = CloudQueue.New() |> cluster.Run
// Sending
CloudQueue.Enqueue (queue, "Monads") |> cluster.Run
// Receiving
let msg = CloudQueue.Dequeue(queue) |> cluster.Run
Send and receive messages with cloud workflows, reactive things!
let sendTask =
cloud { for i in [ 0 .. 100 ] do
do! queue.Enqueue (sprintf "FSharp%d" i) }
|> cluster.Submit
sendTask.ShowInfo()
let receiveMessages = async {
while true do
let! result = channel.ReceiveAsync(receivePort2)
printfn "Received: %A" result
}Send messages from client:
// kill all jobs
receiveCancel.Cancel()
replicateMessage.Kill()
channel.Delete receivePort1
channel.Delete receivePort2And the receiving job:
let receiveTask =
cloud { let results = new ResizeArray<_>()
for i in [ 0 .. 100 ] do
let! msg = CloudQueue.Dequeue(queue)
results.Add msg
return results.ToArray() }
|> cluster.Submit
Send some messages and see the results!
MBrace provides ICloudDisposable type, which acts like IDisposable for distributed environment.
/// Denotes handle to a distributable resource that can be disposed of.
type ICloudDisposable =
/// Releases any storage resources used by this object.
abstract Dispose : unit -> Local<unit>We may apply use and use! with ICloudDisposable objects.
interface ICloudDisposable with
member d.Dispose () = CloudDirectory.Delete(d, recursiveDelete = true)Implementation of ICloudDisposable for CloudDirectory:
System.Net.WebException: The remote name could not be resolved: 'www.someunknownaddress'
at Microsoft.FSharp.Control.WebExtensions.AsyncGetResponse@2036-1.Invoke(Exception _arg1)
at Microsoft.FSharp.Control.AsyncBuilderImpl.callA@803.Invoke(AsyncParams`1 args)
at Cloud.Parallel[T](seq<Cloud<T>> computations)
--- End of stack trace from previous location where exception was thrown ---
в Microsoft.FSharp.Core.Operators.Raise[T](Exception exn)
в <StartupCode$MBrace-Azure-Client>.$Process.AwaitResultAsync@116-1.Invoke(Result`1 _arg2)
в C:\Users\developer001\Source\Repos\MBrace.Azure\src\MBrace.Azure.Client\Process.fs:строка 116
в Microsoft.FSharp.Control.AsyncBuilderImpl.args@797-1.Invoke(a a)
at MBrace.Continuation.ExceptionDispatchInfoUtils.Async.RunSync[T](FSharpAsync`1 workflow, FSharpOption`1 cancellationToken)
in C:\dev\github-repositories\MBrace.Core\src\MBrace.Core\Continuation\ExceptionDispatchInfo.fs:line 134
at MBrace.Azure.Client.Process`1.AwaitResult()
in C:\Users\developer001\Source\Repos\MBrace.Azure\src\MBrace.Azure.Client\Process.fs:line 110
at <StartupCode$MBrace-Azure-Client>.$Client.RunAsync@103-2.Invoke(Unit unitVar)
in C:\Users\developer001\Source\Repos\MBrace.Azure\src\MBrace.Azure.Client\Client.fs:line 103
at Microsoft.FSharp.Control.AsyncBuilderImpl.callA@813.Invoke(AsyncParams`1 args)
at MBrace.Continuation.ExceptionDispatchInfoUtils.Async.RunSync[T](FSharpAsync`1 workflow, FSharpOption`1 cancellationToken)
in C:\dev\github-repositories\MBrace.Core\src\MBrace.Core\Continuation\ExceptionDispatchInfo.fs:line 134
at MBrace.Azure.Client.Runtime.Run[T](Cloud`1 workflow, FSharpOption`1 cancellationToken, FSharpOption`1 faultPolicy)
in C:\Users\developer001\Source\Repos\MBrace.Azure\src\MBrace.Azure.Client\Client.fs:line 125
at <StartupCode$FSI_0010>.$FSI_0010.main@() in RestrictionsFilter.fsx:line 54
Stopped due to error
> let maxContentLenght = cloud {
let lengthsJob = [|"https://github.com"; "https://www.microsoft.com"; "http://www.someunknownaddress"|]
|> Array.map (getContentLenght >> Cloud.OfAsync)
let! lengths = Cloud.Parallel lengthsJob
return Array.max lengths
}We get the concrete exception:
Executing the computation with the wrong link:
// Lists all processes of the runtime with their statuses, execution time, etc.
runtime.ShowProcesses()
// Lists all cluster machines with the info for CPU, memory, network usage, etc.
runtime.ShowWorkers()
// Gets runtime logs as an array of LogRecords
runtime.GetSystemLogs()
// Attaches specified logger to the runtime
runtime.AttachLogger(...)
// Kills the process by name
runtime.GetProcessById("someprocess").Cancel()
// Deletes runtime records for all the processes
runtime.ClearAllProcesses()
// Deletes and re-activates runtime state
runtime.Reset()SUMMARY
Thank You!