Pipeline
Typed pipeline framework with composable stages.
Stages are composable transformations executed with reader (immutable environment), error (collecting validation errors), and acc (non-fatal warnings) effects. The run function wires up all handlers and returns { value, errors, warnings, typeErrors }.
let
stage1 = pipeline.mkStage {
name = "discover";
transform = data:
bind (pipeline.asks (env: env.config)) (cfg:
pure (data // { config = cfg; }));
};
result = pipeline.run { config = "prod"; } [ stage1 ];
in result # => { config = "prod"; }
ask
ask: reader-effect helper returning the full pipeline environment.
ask : Computation Env
Convenience re-export of fx.effects.reader.ask.
asks
asks: reader-effect helper applying a projection to the pipeline environment.
asks : (Env -> a) -> Computation a
Convenience re-export of fx.effects.reader.asks.
bind
bind: re-export of fx.kernel.bind for pipeline stage implementations.
bind : Computation a -> (a -> Computation b) -> Computation b
Re-export of fx.kernel.bind.
compose
compose: chain a list of stages into a single computation; each stage's transform receives the previous output and returns the next stage's input wrapped in a computation.
compose : [Stage] -> Data -> Computation Data
Chain stages into a single computation.
Each stage's transform receives the output of the previous stage and returns a computation producing the next stage's input. Initial data seeds the pipeline.
map
map: re-export of fx.kernel.map for pipeline stage implementations.
map : (a -> b) -> Computation a -> Computation b
Re-export of fx.kernel.map.
mkStage
mkStage: build a named pipeline stage carrying a transform, optional inputType/outputType schemas, and a description; stages chain through compose.
mkStage : { name, description ? "", transform, inputType ? null, outputType ? null } -> Stage
Create a named pipeline stage.
transform : Data -> Computation Data
Takes current pipeline data, uses effects (ask, raise, warn),
returns computation producing updated pipeline data.
inputType/outputType : optional type schemas for validation at stage boundaries (checked when provided). Validation uses fx.types.validate which sends typeCheck effects.
pure
pure: re-export of fx.kernel.pure for pipeline stage implementations.
pure : a -> Computation a
Re-export of fx.kernel.pure.
raise
raise: collecting-error helper for pipeline stages.
raise : String -> Computation a
Convenience re-export of fx.effects.error.raise.
raiseWith
raiseWith: collecting-error helper with context for pipeline stages.
raiseWith : Context -> String -> Computation a
Convenience re-export of fx.effects.error.raiseWith.
run
run: execute a pipeline with reader/error/acc/typecheck handlers wired up; returns { value, errors, warnings, typeErrors } from the final state.
run : Args -> [Stage] -> { value : Data, errors : [Err], warnings : [Warn], typeErrors : [Err] }
Execute a pipeline with effect handling.
args : { ... }
Becomes the reader environment -- stages access via ask/asks.
stages : [Stage] Ordered list of stages to execute.
Returns: value -- final pipeline data from last stage errors -- list of { message, context } from validation failures warnings -- list of non-fatal warning items typeErrors -- list of type validation errors
warn
warn: accumulator helper for non-fatal pipeline warnings.
warn : a -> Computation null
Convenience re-export of fx.effects.acc.emit.