Pipeline
Typed pipeline framework with composable stages.
Stages are composable transformations executed with reader (immutable environment), error (collecting validation errors), and acc (non-fatal warnings) effects. The run function wires up all handlers and returns { value, errors, warnings, typeErrors }.
let
stage1 = pipeline.mkStage {
name = "discover";
transform = data:
bind (pipeline.asks (env: env.config)) (cfg:
pure (data // { config = cfg; }));
};
result = pipeline.run { config = "prod"; } [ stage1 ];
in result # => { config = "prod"; }
compose
Chain stages into a single computation.
compose : [Stage] -> Data -> Computation Data
Each stage's transform receives the output of the previous stage and returns a computation producing the next stage's input. Initial data seeds the pipeline.
mkStage
Create a named pipeline stage.
mkStage : { name, description?, transform, inputType?, outputType? } -> Stage
transform : Data -> Computation Data Takes current pipeline data, uses effects (ask, raise, warn), returns computation producing updated pipeline data.
inputType/outputType : optional type schemas for validation at stage boundaries (checked when provided). Validation uses fx.types.validate which sends typeCheck effects.
run
Execute a pipeline with effect handling.
run : args -> [Stage] -> { value, errors, warnings, typeErrors }
args : { ... } Becomes the reader environment -- stages access via ask/asks.
stages : [Stage] Ordered list of stages to execute.
Returns: value -- final pipeline data from last stage errors -- list of { message, context } from validation failures warnings -- list of non-fatal warning items typeErrors -- list of type validation errors