Cette page explique la tuyauterie à l’intérieur de la Go Machine.
GoMachine est une fonctionnalité expérimentale contenue dans le package xvm
.
L’API du package et son nom pourraient changer.
Ce document s’appuie sur le commit 7538ab3
Le principe repose sur les états des noeuds.
Comme expliqué dans la vidéo Lexical Scanning in Go:
A ce jour, la GoMachine attend d’un noeud d’être dans ces divers états:
Si un noeud contient un opérateur, il peut y avoir un nouvel état:
Ultérieurement, un nouvel état va être ajouté quand la différenciation automatique sera implémentée
Le noeud (node
) est une structure privée:
type node struct {
// ...
}
On définit un type stateFn
qui représente une action pour éxécuter un noeud (*node
) dans un contexte spécifique (context
) et entraine un nouvel état. L’objet stateFn
est de typefunc
:
type stateFn func(context.Context, *node) stateFn
Note: C’est la responsabilité de chaque fonction d’état de maintenir le mécanisme d’annulation du contexte. Cela signifie que si un signal d’annulation est reçu, le noeud devrait renvoyer à l’état final. pour faire simple:
func mystate(ctx context.Context, *node) stateFn {
// ...
select {
// ...
case <- ctx.Done():
n.err = ctx.Error()
return nil
}
}
on définit 4 fonctions de type stateFn
qui vont implémenter les actions requises par les opérations portées par les noeuds:
func defaultState(context.Context, *node) stateFn { ... }
func receiveInput(context.Context, *node) stateFn { ... }
func computeFwd(context.Context, *node) stateFn { ... }
func emitOutput(context.Context, *node) stateFn { ... }
Note: Le statut final est nil
(la valeur nulle de stateFn
)
Chaque noeud est une machine d’état.
Pour l’éxécuter, on fixe une méthode run
qui utilise le contexte comme argument.
func (n *node) Compute(ctx context.Context) error {
for state := defaultState; state != nil; {
state = state(ctx, n)
}
return n.err
}
Note: le noeud (*node
) stocke une erreur qui devrait être écrite par une stateFn
. Cette fonction d’état indique la raison pour laquelle la machine d’état a été cassée (par exemple, si une erreur survient durant le calcul, cette erreur contient la raison.)
Puis chaque noeud (*node
) est déclenché dans sa propre Goroutine par la machine.
On utilise le paradigme de la programmation réactive pour passer d’un état à un autre.
Un changement dans la stucture du noeud (*node
) déclenche une action qui va induire un changement d’état.
Par exemple, prenons un simple calculateur qui calcule a+b
.
Quand on envoie une valeur à $a$, $+$ est notifié de l’événement ($a$ possède sa propre valeur); il reçoit et stocke en interne la valeur
Quand on envoie une valeur $b$, $+$ est informé, et reçoit la valeur. Son état change alors en compute
.
Une fois compilé, le $+$ envoie le résultat à quiconque est intéressé par son usage.
En Go, envoyer et recevoir des valeurs, et programmer des événements nécessitent d’être implémentés avec des canaux (channels).
La structure du noyeau possède 2 canaux, un pour recevoir les entrées (inputC
), et un pour émettre les sorties (outputC
):
type node struct {
outputC chan gorgonia.Value
inputC chan ioValue
err error
// ...
}
Note: La structure ioValue
est expliquée plus loin dans ce document; pour le moment, considérons que ioValue
= gorgonia.Value
Désormais, tous les noeuds tournent dans des goroutines; on doit les cabler entre elles pour calculer une formule.
Par exemple, dans: $ a\times x+b$, on doit envoyer le résultat de $a\times x$ au noeud qui porte l’opération addition.
ce qui donne à peu près:
var aTimesX *node{op: mul}
var aTimesXPlusB *node{op: sum}
var a,b,c gorgonia.Value
aTimesX.inputC <- a
aTimesX.inputC <- x
aTimesXPlusB.inputC <- <- aTimesX.outputC
aTimesXPlusB.inputC <- <- b
Le problème est que le canal n’est pas un “topic” et il ne gère pas les abonnements de manière native. Le premier consommateur prend une valeur, et vide le canal.
Donc si on prend l’équation $(a + b) \times c + (a + b) \times d$, l’implémentation ne devrait pas fonctionner:
|
|
Ceci devrait provoquer une impasse car aPlusB.outputC
est vide à la ligne 9 et donc la ligne 12 ne recevra plus jamais de valeur.
La solution est d’utiliser des canaux temporaires et un mécanisme diffusé comme décrit dans l’article Go Concurrency Patterns: Pipelines and cancellation.
Un noeud publie du contenu pour des abonnés. Le noeud inscrit aussi du contenu pour des producteurs.
On associe 2 structures:
type publisher struct {
id int64
publisher <-chan gorgonia.Value
subscribers []chan<- gorgonia.Value
}
type subscriber struct {
id int64
publishers []<-chan gorgonia.Value
subscriber chan<- ioValue
}
Chaque noeud qui fournit une sortie via outputC
est un producteur, et tous les noeuds du graphique qui rejoignent ce premier noeud sont ses abonnés. Ceci définit un objet producteur. L’identifiant de l’objet est l’identifiant du noeud qui envoie sa sortie (output).
Chaque noeud qui attend une entrée via soninputC
est un abonné. Les producteurs sont les noeuds atteints par ce premier noeud dans le *ExprGraph
Les producteurs diffusent leurs données à l’abonné en appelant la fonction broadcast
.
func broadcast(ctx context.Context, globalWG *sync.WaitGroup, ch <-chan gorgonia.Value, cs ...chan<- gorgonia.Value) { ... }
Les abonnés fusionnent les résultats issus des producteurs par appel à la fonction merge
func merge(ctx context.Context, globalWG *sync.WaitGroup, out chan<- ioValue, cs ...<-chan gorgonia.Value) { ... }
Note:les 2 fonctions gèrent l’annulation du contexte
Pour cabler les producteurs et les abonnés, on utilise la structure de plus haut niveau: pubsub
type pubsub struct {
publishers []*publisher
subscribers []*subscriber
}
pubsub
est chargé de mettre en place le réseau de canaux.
Une méthode run(context.Context)
déclenche la diffusion ( broadcast
) et fusion (merge
) de tous les éléments:
func (p *pubsub) run(ctx context.Context) (context.CancelFunc, *sync.WaitGroup) { ... }
Cett méthode retourne un context.CancelFunc
et un sync.WaitGroup
qui vont tomber à 0 quand tous les pubsubs sont colonisés après une annulation.
ioValue
L’abonné a un seul canal d’entrée; la valeur de sortie peut être envoyée dans n’importe quel ordre.
La fonction merge
de l’abonné traque l’ordre des abonnés, inclut la valeur dans la structure ioValue, et ajoute la position de l’opérateur qui a émis cette valeur:
type ioValue struct {
pos int
v gorgonia.Value
}
La Machine
est la seule structure exportée du package.
C’est un support pour les noeuds et pubsub.
type Machine struct {
nodes []*node
pubsub *pubsub
}
Une machine est créée à partir de *ExprGraph
en appelant la fonction:
func NewMachine(g *gorgonia.ExprGraph) *Machine { ... }
De manière sous-jascente, il analyse le graphique et génère un noeud (*node
) pour chaque noeud gorgonia (*gorgonia.Node
).
Si un noeud porte une opération Op
(= un objet qui implémente une méthode Do(... Value) Value
), un pointeur sur l’opération est ajouté à la structure.
Pour faire la transition, le package déclare une interface Doer
.
Cette interface est validée par la strucure *gorgonia.Node
.
Deux cas particuliers sont pris en charge:
*ExprGraph
contientoutputC = nil
*ExprGraph
présentent inputC = nil
puis la nouvelle machine(NewMachine
) fait appel aux méthodes de création de réseau pour créer les éléments *pubsub
.
Un appel à la méthodeRun
de la machine déclenche le calcul.
L’appel à cette fonction est bloqué.
Il renvoie une erreur et stoppe le process si:
En cas d’erreur, un signal d’annulation est automatiquement envoyé à l’infrastructure *pubsub
pour éviter les fuites mémoire.
Après le calcul, il est sécuritaire d’appeler Close
pour éviter une fuite mémoire.
Close()
ferme tous les canaux tenus par le noeud *node
et le *pubsub
Il est important de remarquer que la machine est indépendante du *ExprGraph
. Donc les valeurs contenues par le *gorgonia.Node
ne sont pas mises à jour.
Pour accéder aux données, on doit appeler la méthode GetResult
de la machine. cette méthode utilise l’identifiant d’un noeud comme entrée ( le noeud (*node
) et noeud gorgonia ( *gorgonia.Node
) ont les mêmes identifiants)
Ex:
var add, err := gorgonia.Add(a,b)
fmt.Println(machine.GetResult(add.ID()))
Voici un exemple trivial qui effectue un calcul sur des float32
func main(){
g := gorgonia.NewGraph()
forty := gorgonia.F32(40.0)
two := gorgonia.F32(2.0)
n1 := gorgonia.NewScalar(g, gorgonia.Float32, gorgonia.WithValue(&forty), gorgonia.WithName("n1"))
n2 := gorgonia.NewScalar(g, gorgonia.Float32, gorgonia.WithValue(&two), gorgonia.WithName("n2"))
added, err := gorgonia.Add(n1, n2)
if err != nil {
log.Fatal(err)
}
machine := NewMachine(g)
ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Millisecond)
defer cancel()
defer machine.Close()
err = machine.Run(ctx)
if err != nil {
log.Fatal(err)
}
fmt.Println(machine.GetResult(added.ID()))
}
affiche
42