(*********************************************************************)
(* Title:	Threads of Control - Implementation		     *)
(* LastEdit:	"Wed Oct 31 16:58:05 1984" by Mick Jordan	     *)
(* Author: 	Mick Jordan					     *)
(* 		Copyright (C) 1984 by Acorn Research Centre	     *)
(*********************************************************************)

(* An implementation in terms of coroutines *)

IMPLEMENTATION MODULE Threads;
FROM SYSTEM IMPORT NEWPROCESS, PROCESS, WORD, ADDRESS, SIZE, TRANSFER;

FROM Storage IMPORT ALLOCATE, DEALLOCATE;

TYPE
    CoRoutine = PROCESS;
    Stack = POINTER TO ARRAY [0..1023] OF WORD;

    ThreadStatus = (Alive, Finished, Dead);
    Thread = POINTER TO ThreadState;
    ThreadState = RECORD
            link: Thread;            (* to next thread in list *)
            joinQueue: Thread;       (* list of processes waiting to Join *)
            state: CoRoutine;
            realForkee: Forkee;      (* user supplied procedure *)
            status: ThreadStatus;
            stack: Stack;
            argOrResult: WORD;
        END (* record *);

    Mutex = POINTER TO RECORD
            isLocked: BOOLEAN;
            queue: Thread;
        END (* record *);

    Condition = POINTER TO RECORD
            wakeupWaiting: BOOLEAN;
            queue: Thread;
        END (* record *);

VAR readyQueue, currentThread: Thread;

PROCEDURE InitMutex(VAR mutex: Mutex);
    BEGIN
        NEW(mutex);
        WITH mutex^ DO
            isLocked := FALSE;
            queue := NIL;
        END (* with *);
    END InitMutex;

PROCEDURE InitCondition(VAR cv: Condition);
    BEGIN
        NEW(cv);
        WITH cv^ DO
            wakeupWaiting := FALSE;
            queue := NIL;
        END (* with *);
    END InitCondition;

PROCEDURE Acquire(VAR mutex: Mutex);
    BEGIN
        IF mutex^.isLocked THEN
            Queue.InsertAtTail(mutex^.queue, currentThread);
        ELSE
            mutex^.isLocked := TRUE;
            Queue.InsertAtTail(readyQueue, currentThread);
        END (* if *);
        RunNext;
    END Acquire;

PROCEDURE Release(VAR mutex: Mutex);
    VAR
        thread: Thread;
    BEGIN
        IF mutex^.queue # NIL THEN
            thread := Queue.RemoveFromHead(mutex^.queue);
            Queue.InsertAtTail(readyQueue, thread);
        ELSE
            mutex^.isLocked := FALSE;
        END (* if *);
        Queue.InsertAtTail(readyQueue, currentThread);
        RunNext;
    END Release;

PROCEDURE Wait(VAR mutex: Mutex; VAR condition: Condition);
    VAR
        thread: Thread;
    BEGIN
        Queue.InsertAtTail(condition^.queue, currentThread);
        (* release mutex *)
        IF mutex^.queue # NIL THEN
            thread := Queue.RemoveFromHead(mutex^.queue);
            Queue.InsertAtTail(readyQueue, thread);
        ELSE
            mutex^.isLocked := FALSE;
        END (* if *);
        RunNext;
        Acquire(mutex);
    END Wait;

PROCEDURE Signal(VAR cv: Condition);
    VAR
        thread: Thread;
    BEGIN
        IF cv^.queue # NIL THEN
            thread := Queue.RemoveFromHead(cv^.queue);
            Queue.InsertAtTail(readyQueue, thread);
        END (* if *);
        Queue.InsertAtTail(readyQueue, currentThread);
        RunNext;
    END Signal;

PROCEDURE Broadcast(VAR cv: Condition);
    VAR
        thread: Thread;
    BEGIN
        IF cv^.queue # NIL THEN
            thread := Queue.RemoveFromHead(cv^.queue);
            WHILE thread # NIL DO
                Queue.InsertAtTail(readyQueue, thread);
                thread := Queue.RemoveFromHead(cv^.queue);
            END (* while *);
        END (* if *);
        Queue.InsertAtTail(readyQueue, currentThread);
        RunNext;
    END Broadcast;

PROCEDURE Fork(forkee: Forkee; forkeeArg: ForkeeArg): Thread;
    VAR
        thread: Thread;
    BEGIN
        NEW(thread);
        WITH thread^ DO
          NEW(stack);
          NEWPROCESS(RunForkee, ADDRESS(stack), SIZE(stack^), state);
          argOrResult := forkeeArg;
          realForkee := forkee;
          joinQueue := NIL;
          status := Alive;
        END;
        Queue.InsertAtTail(readyQueue, thread);
        Queue.InsertAtTail(readyQueue, currentThread);
        RunNext;
        RETURN thread
    END Fork;

PROCEDURE RunForkee;
(* This procedure is the place that the first TRANSFER
takes place to.  Now we can call the 'forkee' with the
argument passed in 'Fork'
*)
  VAR
    thread: Thread;
  BEGIN
    WITH currentThread^ DO
      argOrResult := realForkee(argOrResult);
      (* another thread may have tried to Join *)
      IF joinQueue # NIL THEN
        thread := Queue.RemoveFromHead(joinQueue);
        WHILE thread # NIL DO
          Queue.InsertAtTail(readyQueue, thread);
          thread := Queue.RemoveFromHead(joinQueue);
        END;
      END;
      (* we can now safely destroy currentThread.
      Actually we cant dispose the ThreadState cos RunNext
      needs it to do the transfer.
      *)
      status := Finished;
      DISPOSE(stack);
      (* DISPOSE(currentThread); *)
    END;
    RunNext;
  END RunForkee;

PROCEDURE Join(thread: Thread): ForkeeReturn;
  BEGIN
    IF thread^.status = Alive THEN
        Queue.InsertAtTail(thread^.joinQueue, currentThread);
        RunNext;
    ELSE
        currentThread^.argOrResult := thread^.argOrResult;
    END;
    (* when control returns here 'thread' must have completed,
       and 'RunForkee' will have copied its result to
       (what is now) currentThread^.argOrResult 
    *)
    RETURN currentThread^.argOrResult
  END Join;

PROCEDURE RunNext;
    VAR
        oldThread: Thread;
    BEGIN
        oldThread := currentThread;
        currentThread := Queue.RemoveFromHead(readyQueue);
        IF currentThread = NIL THEN
            HALT    (* deadmutex *)
        ELSE
            TRANSFER(oldThread^.state, currentThread^.state);
        END (* if *);
    END RunNext;

(* ----------------------------------------------------------------------- *)

MODULE Queue;

IMPORT Thread;

EXPORT QUALIFIED InsertAtTail, RemoveFromHead;

PROCEDURE InsertAtTail(VAR q: Thread; p: Thread);
    BEGIN
        IF q = NIL THEN
            p^.link := p;
        ELSE
            p^.link := q^.link; (* connect to head *)
            q^.link := p;       (* connect old tail to new *)
        END (* if *);
        q := p;
    END InsertAtTail;

PROCEDURE RemoveFromHead(VAR q: Thread): Thread;
    VAR
        head: Thread;
    BEGIN
        IF q = NIL THEN
            head := NIL
        ELSE
            head := q^.link;
            q^.link := head^.link;
            IF head = q THEN
                q := NIL;
            END (* if *);
        END (* if *);
        RETURN head;
    END RemoveFromHead;

END Queue;

BEGIN
    readyQueue := NIL; 
    (* create anonymous main thread *)
    NEW(currentThread);
    WITH currentThread^ DO
      joinQueue := NIL;
      status := Alive;
    END;
END Threads.
