This page explains the plumbing inside the GoMachine.
GoMachine is an experimental feature hold in the xvm
package.
The API of the package and its name may change.
This document is based on commit 7538ab3
The principle relies on the state of the nodes.
As explained in Lexical Scanning in Go:
As of today, the GoMachine expects a node to be in those possible states:
If a node is carrying an operator may have an extra state:
Later, a new state will eventually be added when implementing automatic differentiation: computing gradient
This leads to this state graph of the possible states of a node:
The node
is a private structure:
type node struct {
// ...
}
We define a type stateFn
that represents an action to perform on a *node
in a specific context
, and returns a new state. This type is a func
:
type stateFn func(context.Context, *node) stateFn
Note: It is the responsibility of every state function to handle context cancelation mechanism. This means that if a cancelation signal is received, the node should return the end state. For simplicity:
func mystate(ctx context.Context, *node) stateFn {
// ...
select {
// ...
case <- ctx.Done():
n.err = ctx.Error()
return nil
}
}
We define four functions of type stateFn
to implement the actions required by the node:
func defaultState(context.Context, *node) stateFn { ... }
func receiveInput(context.Context, *node) stateFn { ... }
func computeFwd(context.Context, *node) stateFn { ... }
func emitOutput(context.Context, *node) stateFn { ... }
Note: the end
state is nil
(the zero value of the stateFn
)
Each node is a state machine.
To run it, we set a method run
that takes a context as an argument.
func (n *node) Compute(ctx context.Context) error {
for state := defaultState; state != nil; {
state = state(ctx, n)
}
return n.err
}
Note: the *node
stores an error that should be set by a stateFn that indicates the reason why it broke the state machine (for example, if an error occurs during the computation, this error contains the reason)
Then every *node
is triggered in its own goroutine by the machine.
We use the paradigm of reactive programming to switch from a state to another.
A change in the *node
structure triggers an action that induces the change of state.
For example, let’s take a simple calculator that computes a+b
.
When we send a value to $a$
$+$ is notified of this event ($a$ owns a value); it receives and stores the value internally.
Then we send a value to $b$, $+$ is notified, and receives the value. Then its state changes to compute
.
Once computed, the $+$ sends the result to whoever is interested in using it.
In Go sending and receiving values, and events programming are implemented with channels.
The node structure owns two channels, one to receive the input (inputC
), and one to emit the output (outputC
):
type node struct {
outputC chan gorgonia.Value
inputC chan ioValue
err error
// ...
}
Note: the ioValue
structure is explained later in this doc; for now, consider ioValue
= gorgonia.Value
Now we have all nodes running in goroutines; we need to wire them together actually to compute formulae.
For example, in: $ a\times x+b$, we need to send the result of $a\times x$ into the node carrying the addition operator.
which is roughly:
var aTimesX *node{op: mul}
var aTimesXPlusB *node{op: sum}
var a,b,c gorgonia.Value
aTimesX.inputC <- a
aTimesX.inputC <- x
aTimesXPlusB.inputC <- <- aTimesX.outputC
aTimesXPlusB.inputC <- <- b
The problem is that a channel is not a “topic” and it does not handle subscriptions natively. The first consumer takes a value, and drain the channel.
Therefore if we take this equation $(a + b) \times c + (a + b) \times d$, the implementation would not work:
|
|
This will provide a deadlock because aPlusB.outputC
is emptied at line 9 and therefore line 12 will never receive value anymore.
The solution is to use temporary channels and a broadcast mechanism as described in the article Go Concurrency Patterns: Pipelines and cancellation.
A node is publishing some content to some subscribers. A node is also subscribing to content sent by publishers.
We setup two structures:
type publisher struct {
id int64
publisher <-chan gorgonia.Value
subscribers []chan<- gorgonia.Value
}
type subscriber struct {
id int64
publishers []<-chan gorgonia.Value
subscriber chan<- ioValue
}
Each node providing output via the outputC
is a publisher, and all the nodes in the graph reaching this node are its subscribers. This defines a publisher
object. The ID of the object is the ID of the node providing its output.
Each node expecting inputs via its inputC
is a subscriber. The publishers are the node reached by this node in the *ExprGraph
publishers are broadcasting their data to the subscriber by calling
func broadcast(ctx context.Context, globalWG *sync.WaitGroup, ch <-chan gorgonia.Value, cs ...chan<- gorgonia.Value) { ... }
subscribers are merging the results from the publishers by calling:
func merge(ctx context.Context, globalWG *sync.WaitGroup, out chan<- ioValue, cs ...<-chan gorgonia.Value) { ... }
Note: both functions are handling context cancelation
To wire all the publishers and subscribers, we use a top-level structure: pubsub
type pubsub struct {
publishers []*publisher
subscribers []*subscriber
}
pubsub
is in charge of setting up the network of channels.
Then a run(context.Context)
method is triggering the broadcast
and merge
for all elements:
func (p *pubsub) run(ctx context.Context) (context.CancelFunc, *sync.WaitGroup) { ... }
This method returns a context.CancelFunc
and a sync.WaitGroup
that will be down to zero when all pubsubs are settled after a cancelation.
ioValue
The subscriber has a single input channel; the input values can be sent in any order. The subscriber’s merge function tracks the order of the subscribers, wraps the value into the ioValue structure, and adds the position of the operator emitting the value:
type ioValue struct {
pos int
v gorgonia.Value
}
The Machine
is the only exported structure of the package.
It is a support for nodes and pubsub.
type Machine struct {
nodes []*node
pubsub *pubsub
}
A machine is created from an *ExprGraph
by calling
func NewMachine(g *gorgonia.ExprGraph) *Machine { ... }
Under the hood, it parses the graph and generates a *node
for each *gorgonia.Node
.
If a node carries an Op (= an object that implements a Do(... Value) Value
method), a pointer to the Op is added to the structure.
For transitioning, the package declares a Doer
interface.
This interface is fulfilled by the *gorgonia.Node
structure.
Two individual cases are handled:
*ExprGraph
have outputC = nil
*ExprGraph
have inputC = nil
Then the NewMachine
calls the createNetwork
methods to create the *pubsub
elements.
A call to the Run
method of the Machine triggers the computation.
The call to this function is blocking.
It returns an error and stops the process if:
- all the nodes have reached their final states
- one node’s execution state returns an error
In case of error, a cancel signal is automatically sent to the *pubsub
infrastructure to avoid leakage.
After the computation, it is safe to call Close
to avoid a memory leak.
Close()
closes all the channels hold by the *node
and the *pubsub
It is important to notice that the machine is independent of the *ExprGraph
. Therefore the values held by the *gorgonia.Node
are not updated.
To access the data, you must call the GetResult
method of the machine. This method takes a node ID as input (*node
and *gorgonia.Node
have the same IDs)
Ex:
var add, err := gorgonia.Add(a,b)
fmt.Println(machine.GetResult(add.ID()))
This is a trivial example that computes two float 32
func main(){
g := gorgonia.NewGraph()
forty := gorgonia.F32(40.0)
two := gorgonia.F32(2.0)
n1 := gorgonia.NewScalar(g, gorgonia.Float32, gorgonia.WithValue(&forty), gorgonia.WithName("n1"))
n2 := gorgonia.NewScalar(g, gorgonia.Float32, gorgonia.WithValue(&two), gorgonia.WithName("n2"))
added, err := gorgonia.Add(n1, n2)
if err != nil {
log.Fatal(err)
}
machine := NewMachine(g)
ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Millisecond)
defer cancel()
defer machine.Close()
err = machine.Run(ctx)
if err != nil {
log.Fatal(err)
}
fmt.Println(machine.GetResult(added.ID()))
}
prints
42