Chapter 5: Processes¶
5.1 The Basic Concept¶
A process in DSSim is a unit of simulation behavior that suspends at wait points and resumes when the right event arrives. From the outside, a process is just a Python object registered in the simulation. From the inside, it is a generator or coroutine that pauses at yield / await and resumes when the scheduler delivers a matching event.
There are several ways to write process logic in DSSim. The table below shows the trade-offs:
| Style | How it looks | Profile | Use case |
|---|---|---|---|
| Plain Python generator | def proc(): yield ... |
Both | One-off, low overhead, no condition logic |
@DSSchedulable function |
@DSSchedulable def proc(): return ... |
Both | Adapts a plain function to the scheduling protocol |
DSProcess (recommended) |
sim.process(gen) |
PubSubLayer2 | Full lifecycle: conditions, abort, join, timeout |
DSAgent subclass |
class W(DSAgent): async def process(self): ... |
PubSubLayer2 | High-level component with queue/resource helpers |
Recommendation: Use DSProcess (or DSAgent) in new code. Raw generators work in both profiles but lack condition filtering, abort support, and the join interface.
5.2 Creating and Scheduling a Process¶
A process wraps an already-instantiated generator or coroutine. You must pass either a generator object or a coroutine object — not a generator function:
from dssim import DSSimulation
sim = DSSimulation()
def my_gen():
print("started")
yield # suspend
print("resumed")
p = sim.process(my_gen()) # wrap the generator instance
p.schedule(0) # start at time 0
sim.run(until=10)
schedule(time) places the process in the time queue so the simulation kicks it off at time. Calling schedule() is usually the last step before sim.run().
Shorthand:
p = sim.process(my_gen()).schedule(0)
5.3 Waiting Inside a Process¶
The simulation only makes time advance by returning control to the scheduler at a wait point. There are three wait primitives:
5.3.1 sleep / gsleep — pure time delay¶
async def ticker():
while True:
await sim.sleep(1) # advance time by 1
print(f"tick at {sim.time}")
sleep blocks for the given duration, ignoring all events. It is available in both profiles.
5.3.2 wait / gwait — wait with a condition¶
PubSubLayer2
The cond parameter and the no-spurious-wakeup guarantee are PubSubLayer2-specific. In LiteLayer2, use sleep for timed delays; event-driven wake-up is handled through the queue/resource sentinel mechanism rather than a general condition.
# async style (coroutine)
async def producer():
while True:
event = await sim.wait(timeout=5, cond=lambda e: isinstance(e, MyEvent))
if event is None:
print("timed out")
else:
print("got event:", event)
# generator style
def producer():
while True:
event = yield from sim.gwait(timeout=5, cond=lambda e: isinstance(e, MyEvent))
...
timeout— maximum time to wait. If the timeout expires before any matching event arrives, the wait returnsNone.cond— callable or value that must match. Only events that pass the condition wake the process. Events that do not match are ignored without waking the caller.
This is the no spurious wakeup guarantee: user code only resumes when the condition is actually satisfied, or when the timeout expires.
5.3.3 check_and_wait — pre-check before blocking¶
PubSubLayer2
check_and_wait relies on condition filtering and is available in PubSubLayer2 only.
async def consumer(queue):
# Check if already non-empty; if yes, return without blocking
event = await sim.check_and_wait(timeout=10, cond=lambda _: len(queue) > 0)
check_and_wait evaluates the condition once immediately. If satisfied, it returns without entering the scheduler. If not satisfied, it behaves like wait.
5.4 @DSSchedulable — wrapping plain functions¶
Some frameworks expect a generator-compatible callable. @DSSchedulable turns a plain function into one:
from dssim import DSSimulation, DSSchedulable
@DSSchedulable
def one_shot_task():
return "done"
sim = DSSimulation()
p = sim.process(one_shot_task()).schedule(0)
sim.run()
print(p.value) # "done"
The decorator is mainly useful for adapting non-generator code for use inside DSProcess or sim.schedule(). For new code, writing a generator or coroutine directly is usually clearer.
5.5 Process Lifecycle¶
PubSubLayer2
The full lifecycle — including condition stacking, join(), and DSFuture integration — is a DSProcess (PubSubLayer2) feature. In LiteLayer2, processes are lightweight wrappers with scheduled/running/finished states and abort(), but no join interface.
A DSProcess transitions through three stages:
stateDiagram-v2
[*] --> scheduled: schedule(time)
scheduled --> started: time queue delivers startup event
started --> finished: generator returns / StopIteration
started --> finished: abort() / exception
finished --> [*]
- scheduled — the process sits in the time queue, waiting for
timeto arrive. No generator code runs yet. - started — the generator has received its first kick. Wait points are active.
- finished — the generator has returned (or failed).
p.valueholds the return value;p.excholds the exception if it failed.
Check state at any time:
p.started() # True after first kick
p.finished() # True after StopIteration or abort
p.value # return value (set on each yield in generator style)
5.6 Joining Another Process¶
PubSubLayer2
Joining relies on DSFuture, which is a PubSubLayer2 primitive.
You can wait for a process to complete from inside another process:
async def master():
worker = sim.process(worker_gen()).schedule(0)
result = await sim.wait(cond=worker) # wait until worker finishes
print(worker.value)
When cond is a DSFuture (which DSProcess is a subtype of), DSSim automatically subscribes to the finish endpoint and resumes only when the future completes.
5.7 Aborting a Process¶
PubSubLayer2
abort() and DSAbortException are part of the DSProcess lifecycle and are available in PubSubLayer2 only.
p.abort() # sends DSAbortException into the process
p.abort(MyException("cancelled")) # send a custom exception
Inside the process, DSAbortException behaves like any other exception — it can be caught or allowed to propagate:
async def cancellable():
try:
await sim.sleep(100)
except DSAbortException:
print("I was cancelled, cleaning up")
If abort() is called before the process starts (still in scheduled state), the process is immediately marked as failed without running any generator code.
5.8 Subscribing to a Publisher Endpoint¶
PubSubLayer2
Publisher subscription context managers use delivery tiers and are specific to PubSubLayer2. In LiteLayer2, use DSLitePub.add_subscriber directly; there are no tier-based context managers.
A process can register as a subscriber on a DSPub endpoint for the duration of a wait. DSSim provides context managers for this:
async def listener(publisher):
with sim.consume(publisher): # subscribe to CONSUME phase
event = await sim.wait(timeout=5)
# automatically unsubscribed here
Available context managers:
| Method | Phase | Meaning |
|---|---|---|
sim.observe_pre(pub) |
PRE |
See all events before consumers |
sim.consume(pub) |
CONSUME |
Compete to consume events |
sim.observe_consumed(pub) |
POST_HIT |
Notified after a consumer accepts |
sim.observe_unconsumed(pub) |
POST_MISS |
Notified when all consumers reject |
These contexts can be combined:
with sim.observe_pre(pub1) + sim.consume(pub2):
event = await sim.wait(timeout=10)
A DSProcess used as a subscriber has its own condition stack. Events delivered to the process are checked against the current cond before the generator is resumed.
A minimal full example — a producer sends events, a consumer process listens:
from dssim import DSSimulation, DSPub
sim = DSSimulation()
source = sim.publisher(name="source")
async def producer():
for i in range(3):
await sim.sleep(1)
source.signal(i)
async def consumer():
with sim.consume(source):
while True:
val = await sim.wait(timeout=5)
if val is None:
break
print(f"t={sim.time}: got {val}")
sim.process(producer()).schedule(0)
sim.process(consumer()).schedule(0)
sim.run(until=10)
Advanced: Non-consuming subscription via val=False¶
PubSubLayer2
val=False is a PubSubLayer2-only feature. LiteLayer2 has no delivery tiers and no return-value mechanism — every event is delivered to all subscribers unconditionally.
By default, a process subscribed in the CONSUME tier claims the event — its send() returns True, stopping delivery to any subsequent subscribers in that tier. Sometimes you want a process to receive the event but deliberately not claim it, so the next subscriber still gets a chance.
Pass val=False to sim.wait():
async def observer_proc(source):
with sim.consume(source):
while True:
event = await sim.wait(val=False) # receive event, but do not consume it
if event is None:
break
print(f"t={sim.time}: observed {event}")
The val is the return value that the process hands back to the publisher's dispatch loop when it is woken. True (the default) means "consumed — stop here". False means "seen but not consumed — continue to the next subscriber".
The key distinction from PRE:
PREtier is intended for pure observers — loggers and probes that watch the stream with no role in processing. PRE always runs before anyCONSUMEdecision.CONSUMEwithval=Falseis intended for pipeline participants — components that genuinely process the event (update state, trigger logic) but must not block other components from also receiving it. Because PRE is guaranteed to have finished beforeCONSUMEbegins, usingCONSUMEwithval=Falseplaces the component clearly inside the processing tier, after all passive observers have already run.
This PRE / CONSUME separation is a convention, not an enforced rule. DSSim does not prevent a PRE subscriber from having side effects, nor a CONSUME subscriber from using val=False. The recommendation exists to keep intent readable: readers of the wiring code can tell at a glance which subscribers are observers and which are active participants.
val |
Effect on delivery |
|---|---|
True (default) |
Event is consumed; no later subscriber in CONSUME sees it |
False |
Event passes through; next subscriber in CONSUME still gets it |
5.9 Advanced: Interruptible Context¶
PubSubLayer2
sim.interruptible() uses condition filtering internally and is available in PubSubLayer2 only.
Sometimes a long sequence of waits should be cleanly interrupted when an external condition fires. The sim.interruptible() context manager handles this without scattering try/except through every wait:
async def long_task(cancel_signal):
with sim.interruptible(cond=cancel_signal) as ctx:
# any wait inside this block can be interrupted
await sim.sleep(10)
event = await sim.wait(timeout=5, cond=lambda e: e == "data")
await sim.sleep(3)
# after the block:
if ctx.interrupted():
print(f"interrupted with value: {ctx.value}")
else:
print("completed normally")
When an event matches cancel_signal during any wait inside the with block, a DSInterruptibleContextError is raised internally and caught by the context manager. Execution continues after the with block. The ctx.interrupted() flag and ctx.value tell you what happened.
5.10 Advanced: Timeout Context¶
PubSubLayer2
sim.timeout() is a DSProcess context manager available in PubSubLayer2 only.
sim.timeout() lets you set a deadline around a code block:
async def bounded_task():
with sim.timeout(time=5) as ctx:
await sim.sleep(3)
await some_long_operation()
if ctx.interrupted():
print("did not finish in time")
If the deadline fires before the with block exits, a DSTimeoutContextError is raised and caught. The ctx.interrupted() flag reflects whether the timeout fired.
You can also reschedule the timeout deadline:
with sim.timeout(time=2) as ctx:
result = await quick_attempt()
ctx.reschedule(5) # give more time if first phase succeeded
result2 = await slow_attempt()
5.11 DSAgent — Process-Centric Components¶
PubSubLayer2
DSAgent is built on DSProcess and is available in PubSubLayer2 only.
DSAgent is a base class that bundles a DSProcess with queue and resource helper methods. It is the recommended pattern when a component drives its own behavior:
from dssim import DSSimulation, DSAgent
class Worker(DSAgent):
def __init__(self, work_queue, **kwargs):
self.work_queue = work_queue
super().__init__(**kwargs)
async def process(self):
while True:
item = await self.pop(self.work_queue, timeout=float('inf'))
await self.sim.sleep(1) # model processing time
print(f"{self.name}: processed {item}")
sim = DSSimulation()
q = sim.queue(capacity=10)
worker = Worker(q, name="worker", sim=sim)
for i in range(5):
q.put_nowait(f"task-{i}")
sim.run(until=20)
DSAgent auto-starts its own process at time 0. The process() method can be a coroutine (async def) or a generator function (def ... yield). Queue and resource helpers (enter, pop, get, put, etc.) are available as methods on the agent.
See Chapter 6 for details on DSQueue, DSContainer, and DSResource.
5.12 Key Takeaways¶
- Plain generators and coroutines plus
sleepare available in both profiles. sim.wait(timeout, cond)andcheck_and_waitare PubSubLayer2 — condition filtering with no spurious wakeups.DSProcessfull lifecycle (abort, join viaDSFuture) is PubSubLayer2.- Publisher subscription context managers (
sim.consume,sim.observe_pre, etc.) are PubSubLayer2 — they rely on delivery tiers. sim.interruptible()andsim.timeout()are PubSubLayer2 context managers for bounded cancellation.DSAgentis the recommended self-driving component pattern and is PubSubLayer2.