Go Machine

Cette page explique la tuyauterie à l’intérieur de la Go Machine.

GoMachine est une fonctionnalité expérimentale contenue dans le package xvm. L’API du package et son nom pourraient changer. Ce document s’appuie sur le commit 7538ab3

Les états des noeuds

Le principe repose sur les états des noeuds.

Comme expliqué dans la vidéo Lexical Scanning in Go:

  • un état représente où nous sommes
  • une action représente ce que nous faisons
  • les actions activent un nouvel état

A ce jour, la GoMachine attend d’un noeud d’être dans ces divers états:

  • waiting for input
  • emitting output

Si un noeud contient un opérateur, il peut y avoir un nouvel état:

  • computing

Ultérieurement, un nouvel état va être ajouté quand la différenciation automatique sera implémentée

Ceci amène à ce graphique des différents états d’un noeud:

graph TB; A(Initial Stage) --> BB{input is an op} BB -->|no| D[Emit output] BB -->|yes| B[Waiting for input] B --> C{inputs == arity} C -->|no| B C -->|yes| Computing Computing --> E{Has error} E -->|no| D E -->|yes| F D --> F(end)

Implémentation

Le noeud (node) est une structure privée:

type node struct {
    // ...
}

On définit un type stateFn qui représente une action pour éxécuter un noeud (*node) dans un contexte spécifique (context) et entraine un nouvel état. L’objet stateFn est de typefunc:

type stateFn func(context.Context, *node) stateFn

Note: C’est la responsabilité de chaque fonction d’état de maintenir le mécanisme d’annulation du contexte. Cela signifie que si un signal d’annulation est reçu, le noeud devrait renvoyer à l’état final. pour faire simple:

func mystate(ctx context.Context, *node) stateFn { 
    // ...
    select {
        // ...
        case <- ctx.Done():
            n.err = ctx.Error()
            return nil
    }
}

on définit 4 fonctions de type stateFn qui vont implémenter les actions requises par les opérations portées par les noeuds:

func defaultState(context.Context, *node) stateFn { ... }

func receiveInput(context.Context, *node) stateFn { ... }

func computeFwd(context.Context, *node) stateFn { ... }

func emitOutput(context.Context, *node) stateFn { ... }

Note: Le statut final est nil (la valeur nulle de stateFn)

Exécuter la machine d’état

Chaque noeud est une machine d’état. Pour l’éxécuter, on fixe une méthode run qui utilise le contexte comme argument.

func (n *node) Compute(ctx context.Context) error {
	for state := defaultState; state != nil; {
		state = state(ctx, n)
	}
	return n.err
}

Note: le noeud (*node) stocke une erreur qui devrait être écrite par une stateFn. Cette fonction d’état indique la raison pour laquelle la machine d’état a été cassée (par exemple, si une erreur survient durant le calcul, cette erreur contient la raison.)

Puis chaque noeud (*node) est déclenché dans sa propre Goroutine par la machine.

Modification d’état dans un événement

On utilise le paradigme de la programmation réactive pour passer d’un état à un autre.

Un changement dans la stucture du noeud (*node) déclenche une action qui va induire un changement d’état.

Par exemple, prenons un simple calculateur qui calcule a+b.

  • $+$ attend 2 valeurs d’entrée pour faire la somme de $a$ et $b$
  • $a$ attend une valeur
  • $b$ attend une valeur

Quand on envoie une valeur à $a$, $+$ est notifié de l’événement ($a$ possède sa propre valeur); il reçoit et stocke en interne la valeur

Quand on envoie une valeur $b$, $+$ est informé, et reçoit la valeur. Son état change alors en compute.

Une fois compilé, le $+$ envoie le résultat à quiconque est intéressé par son usage.

En Go, envoyer et recevoir des valeurs, et programmer des événements nécessitent d’être implémentés avec des canaux (channels).

La structure du noyeau possède 2 canaux, un pour recevoir les entrées (inputC), et un pour émettre les sorties (outputC):

type node struct {
	outputC        chan gorgonia.Value
	inputC         chan ioValue
    err            error
    // ...
}

Note: La structure ioValue est expliquée plus loin dans ce document; pour le moment, considérons que ioValue = gorgonia.Value

HUB de communication

Désormais, tous les noeuds tournent dans des goroutines; on doit les cabler entre elles pour calculer une formule.

Par exemple, dans: $ a\times x+b$, on doit envoyer le résultat de $a\times x$ au noeud qui porte l’opération addition.

ce qui donne à peu près:

var aTimesX *node{op: mul}
var aTimesXPlusB *node{op: sum}

var a,b,c gorgonia.Value

aTimesX.inputC <- a
aTimesX.inputC <- x
aTimesXPlusB.inputC <- <- aTimesX.outputC 
aTimesXPlusB.inputC <- <- b

Le problème est que le canal n’est pas un “topic” et il ne gère pas les abonnements de manière native. Le premier consommateur prend une valeur, et vide le canal.

Donc si on prend l’équation $(a + b) \times c + (a + b) \times d$, l’implémentation ne devrait pas fonctionner:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
var aPlusB *node{op: add}
var aPlusBTimesC *node{op: mul}
var aPlusBTimesCPlusAPlusB *node{op: add}

var a,b,c gorgonia.Value

aPlusB.inputC <- a
aPlusB.inputC <- b
aPlusBTimesC.inputC <- <- aPlusB.outputC
aPlusBTimesC.inputC <- c
aPlusBTimesCPlusAPlusB <- <- aPlusBTimesC.outputC
aPlusBTimesCPlusAPlusB <- <- aPlusB.outputC // Deadlock

Ceci devrait provoquer une impasse car aPlusB.outputC est vide à la ligne 9 et donc la ligne 12 ne recevra plus jamais de valeur.

La solution est d’utiliser des canaux temporaires et un mécanisme diffusé comme décrit dans l’article Go Concurrency Patterns: Pipelines and cancellation.

Publier / souscrire

Un noeud publie du contenu pour des abonnés. Le noeud inscrit aussi du contenu pour des producteurs.

On associe 2 structures:

type publisher struct {
	id          int64
	publisher   <-chan gorgonia.Value
	subscribers []chan<- gorgonia.Value
}

type subscriber struct {
	id         int64
	publishers []<-chan gorgonia.Value
	subscriber chan<- ioValue
}

Chaque noeud qui fournit une sortie via outputC est un producteur, et tous les noeuds du graphique qui rejoignent ce premier noeud sont ses abonnés. Ceci définit un objet producteur. L’identifiant de l’objet est l’identifiant du noeud qui envoie sa sortie (output).

Chaque noeud qui attend une entrée via soninputC est un abonné. Les producteurs sont les noeuds atteints par ce premier noeud dans le *ExprGraph

Fusionner et diffuser

Les producteurs diffusent leurs données à l’abonné en appelant la fonction broadcast.

func broadcast(ctx context.Context, globalWG *sync.WaitGroup, ch <-chan gorgonia.Value, cs ...chan<- gorgonia.Value) { ... } 

Les abonnés fusionnent les résultats issus des producteurs par appel à la fonction merge

func merge(ctx context.Context, globalWG *sync.WaitGroup, out chan<- ioValue, cs ...<-chan gorgonia.Value) { ... }

Note:les 2 fonctions gèrent l’annulation du contexte

pubsub

Pour cabler les producteurs et les abonnés, on utilise la structure de plus haut niveau: pubsub

type pubsub struct {
	publishers  []*publisher
	subscribers []*subscriber
}

pubsub est chargé de mettre en place le réseau de canaux.

Une méthode run(context.Context) déclenche la diffusion ( broadcast) et fusion (merge) de tous les éléments:

func (p *pubsub) run(ctx context.Context) (context.CancelFunc, *sync.WaitGroup) { ... }

Cett méthode retourne un context.CancelFunc et un sync.WaitGroup qui vont tomber à 0 quand tous les pubsubs sont colonisés après une annulation.

A propos de ioValue

L’abonné a un seul canal d’entrée; la valeur de sortie peut être envoyée dans n’importe quel ordre. La fonction merge de l’abonné traque l’ordre des abonnés, inclut la valeur dans la structure ioValue, et ajoute la position de l’opérateur qui a émis cette valeur:

type ioValue struct {
	pos int
	v   gorgonia.Value
}

La machine

La Machine est la seule structure exportée du package.

C’est un support pour les noeuds et pubsub.

type Machine struct {
	nodes  []*node
	pubsub *pubsub
}

Création de la machine

Une machine est créée à partir de *ExprGraph en appelant la fonction:

func NewMachine(g *gorgonia.ExprGraph) *Machine { ... }

De manière sous-jascente, il analyse le graphique et génère un noeud (*node) pour chaque noeud gorgonia (*gorgonia.Node). Si un noeud porte une opération Op (= un objet qui implémente une méthode Do(... Value) Value ), un pointeur sur l’opération est ajouté à la structure.

Pour faire la transition, le package déclare une interface Doer. Cette interface est validée par la strucure *gorgonia.Node.

Deux cas particuliers sont pris en charge:

  • Le noeud de plus haut niveau du graphe *ExprGraph contientoutputC = nil
  • les derniers noeuds du *ExprGraph présentent inputC = nil

puis la nouvelle machine(NewMachine) fait appel aux méthodes de création de réseau pour créer les éléments *pubsub.

Exécuter la machine

Un appel à la méthodeRun de la machine déclenche le calcul. L’appel à cette fonction est bloqué. Il renvoie une erreur et stoppe le process si:

  • tous les noeuds ont atteint leur état final
  • ou l’état d’éxécution d’un noeud renvoie une erreur

En cas d’erreur, un signal d’annulation est automatiquement envoyé à l’infrastructure *pubsub pour éviter les fuites mémoire.

Fermer la machine

Après le calcul, il est sécuritaire d’appeler Close pour éviter une fuite mémoire. Close() ferme tous les canaux tenus par le noeud *node et le *pubsub

Divers

Il est important de remarquer que la machine est indépendante du *ExprGraph. Donc les valeurs contenues par le *gorgonia.Node ne sont pas mises à jour.

Pour accéder aux données, on doit appeler la méthode GetResult de la machine. cette méthode utilise l’identifiant d’un noeud comme entrée ( le noeud (*node) et noeud gorgonia ( *gorgonia.Node) ont les mêmes identifiants)

Ex:

var add, err := gorgonia.Add(a,b)
fmt.Println(machine.GetResult(add.ID()))

Exemple

Voici un exemple trivial qui effectue un calcul sur des float32

func main(){
    g := gorgonia.NewGraph()
    forty := gorgonia.F32(40.0)
    two := gorgonia.F32(2.0)
    n1 := gorgonia.NewScalar(g, gorgonia.Float32, gorgonia.WithValue(&forty), gorgonia.WithName("n1"))
    n2 := gorgonia.NewScalar(g, gorgonia.Float32, gorgonia.WithValue(&two), gorgonia.WithName("n2"))

    added, err := gorgonia.Add(n1, n2)
    if err != nil {
        log.Fatal(err)
    }
    machine := NewMachine(g)
    ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Millisecond)
    defer cancel()
    defer machine.Close()
    err = machine.Run(ctx)
    if err != nil {
        log.Fatal(err)
    }
    fmt.Println(machine.GetResult(added.ID()))
}

affiche

42