This page explains the plumbing inside the GoMachine.
GoMachine is an experimental feature hold in the xvm package.
The API of the package and its name may change.
This document is based on commit 7538ab3
The principle relies on the state of the nodes.
As explained in Lexical Scanning in Go:
As of today, the GoMachine expects a node to be in those possible states:
If a node is carrying an operator may have an extra state:
Later, a new state will eventually be added when implementing automatic differentiation: computing gradient
This leads to this state graph of the possible states of a node:
The node is a private structure:
type node struct {
// ...
}We define a type stateFn that represents an action to perform on a *node in a specific context, and returns a new state. This type is a func:
type stateFn func(context.Context, *node) stateFnNote: It is the responsibility of every state function to handle context cancelation mechanism. This means that if a cancelation signal is received, the node should return the end state. For simplicity:
func mystate(ctx context.Context, *node) stateFn {
// ...
select {
// ...
case <- ctx.Done():
n.err = ctx.Error()
return nil
}
}We define four functions of type stateFn to implement the actions required by the node:
func defaultState(context.Context, *node) stateFn { ... }
func receiveInput(context.Context, *node) stateFn { ... }
func computeFwd(context.Context, *node) stateFn { ... }
func emitOutput(context.Context, *node) stateFn { ... }Note: the end state is nil (the zero value of the stateFn)
Each node is a state machine.
To run it, we set a method run that takes a context as an argument.
func (n *node) Compute(ctx context.Context) error {
for state := defaultState; state != nil; {
state = state(ctx, n)
}
return n.err
}Note: the *node stores an error that should be set by a stateFn that indicates the reason why it broke the state machine (for example, if an error occurs during the computation, this error contains the reason)
Then every *node is triggered in its own goroutine by the machine.
We use the paradigm of reactive programming to switch from a state to another.
A change in the *node structure triggers an action that induces the change of state.
For example, let’s take a simple calculator that computes a+b.
When we send a value to $a$
$+$ is notified of this event ($a$ owns a value); it receives and stores the value internally.
Then we send a value to $b$, $+$ is notified, and receives the value. Then its state changes to compute.
Once computed, the $+$ sends the result to whoever is interested in using it.
In Go sending and receiving values, and events programming are implemented with channels.
The node structure owns two channels, one to receive the input (inputC), and one to emit the output (outputC):
type node struct {
outputC chan gorgonia.Value
inputC chan ioValue
err error
// ...
}Note: the ioValue structure is explained later in this doc; for now, consider ioValue = gorgonia.Value
Now we have all nodes running in goroutines; we need to wire them together actually to compute formulae.
For example, in: $ a\times x+b$, we need to send the result of $a\times x$ into the node carrying the addition operator.
which is roughly:
var aTimesX *node{op: mul}
var aTimesXPlusB *node{op: sum}
var a,b,c gorgonia.Value
aTimesX.inputC <- a
aTimesX.inputC <- x
aTimesXPlusB.inputC <- <- aTimesX.outputC
aTimesXPlusB.inputC <- <- bThe problem is that a channel is not a “topic” and it does not handle subscriptions natively. The first consumer takes a value, and drain the channel.
Therefore if we take this equation $(a + b) \times c + (a + b) \times d$, the implementation would not work:
| |
This will provide a deadlock because aPlusB.outputC is emptied at line 9 and therefore line 12 will never receive value anymore.
The solution is to use temporary channels and a broadcast mechanism as described in the article Go Concurrency Patterns: Pipelines and cancellation.
A node is publishing some content to some subscribers. A node is also subscribing to content sent by publishers.
We setup two structures:
type publisher struct {
id int64
publisher <-chan gorgonia.Value
subscribers []chan<- gorgonia.Value
}
type subscriber struct {
id int64
publishers []<-chan gorgonia.Value
subscriber chan<- ioValue
}Each node providing output via the outputC is a publisher, and all the nodes in the graph reaching this node are its subscribers. This defines a publisher object. The ID of the object is the ID of the node providing its output.
Each node expecting inputs via its inputC is a subscriber. The publishers are the node reached by this node in the *ExprGraph
publishers are broadcasting their data to the subscriber by calling
func broadcast(ctx context.Context, globalWG *sync.WaitGroup, ch <-chan gorgonia.Value, cs ...chan<- gorgonia.Value) { ... } subscribers are merging the results from the publishers by calling:
func merge(ctx context.Context, globalWG *sync.WaitGroup, out chan<- ioValue, cs ...<-chan gorgonia.Value) { ... }Note: both functions are handling context cancelation
To wire all the publishers and subscribers, we use a top-level structure: pubsub
type pubsub struct {
publishers []*publisher
subscribers []*subscriber
}pubsub is in charge of setting up the network of channels.
Then a run(context.Context) method is triggering the broadcast and merge for all elements:
func (p *pubsub) run(ctx context.Context) (context.CancelFunc, *sync.WaitGroup) { ... }This method returns a context.CancelFunc and a sync.WaitGroup that will be down to zero when all pubsubs are settled after a cancelation.
ioValueThe subscriber has a single input channel; the input values can be sent in any order. The subscriber’s merge function tracks the order of the subscribers, wraps the value into the ioValue structure, and adds the position of the operator emitting the value:
type ioValue struct {
pos int
v gorgonia.Value
}The Machine is the only exported structure of the package.
It is a support for nodes and pubsub.
type Machine struct {
nodes []*node
pubsub *pubsub
}A machine is created from an *ExprGraph by calling
func NewMachine(g *gorgonia.ExprGraph) *Machine { ... }Under the hood, it parses the graph and generates a *node for each *gorgonia.Node.
If a node carries an Op (= an object that implements a Do(... Value) Value method), a pointer to the Op is added to the structure.
For transitioning, the package declares a Doer interface.
This interface is fulfilled by the *gorgonia.Node structure.
Two individual cases are handled:
*ExprGraph have outputC = nil*ExprGraph have inputC = nilThen the NewMachine calls the createNetwork methods to create the *pubsub elements.
A call to the Run method of the Machine triggers the computation.
The call to this function is blocking.
It returns an error and stops the process if:
- all the nodes have reached their final states
- one node’s execution state returns an error
In case of error, a cancel signal is automatically sent to the *pubsub infrastructure to avoid leakage.
After the computation, it is safe to call Close to avoid a memory leak.
Close() closes all the channels hold by the *node and the *pubsub
It is important to notice that the machine is independent of the *ExprGraph. Therefore the values held by the *gorgonia.Node are not updated.
To access the data, you must call the GetResult method of the machine. This method takes a node ID as input (*node and *gorgonia.Node have the same IDs)
Ex:
var add, err := gorgonia.Add(a,b)
fmt.Println(machine.GetResult(add.ID()))This is a trivial example that computes two float 32
func main(){
g := gorgonia.NewGraph()
forty := gorgonia.F32(40.0)
two := gorgonia.F32(2.0)
n1 := gorgonia.NewScalar(g, gorgonia.Float32, gorgonia.WithValue(&forty), gorgonia.WithName("n1"))
n2 := gorgonia.NewScalar(g, gorgonia.Float32, gorgonia.WithValue(&two), gorgonia.WithName("n2"))
added, err := gorgonia.Add(n1, n2)
if err != nil {
log.Fatal(err)
}
machine := NewMachine(g)
ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Millisecond)
defer cancel()
defer machine.Close()
err = machine.Run(ctx)
if err != nil {
log.Fatal(err)
}
fmt.Println(machine.GetResult(added.ID()))
}prints
42