Navigation

Pipeline

Typed pipeline framework with composable stages.

Stages are composable transformations executed with reader (immutable environment), error (collecting validation errors), and acc (non-fatal warnings) effects. The run function wires up all handlers and returns { value, errors, warnings, typeErrors }.

let
  stage1 = pipeline.mkStage {
    name = "discover";
    transform = data:
      bind (pipeline.asks (env: env.config)) (cfg:
        pure (data // { config = cfg; }));
  };
  result = pipeline.run { config = "prod"; } [ stage1 ];
in result  # => { config = "prod"; }

ask

ask: reader-effect helper returning the full pipeline environment.

ask : Computation Env

Convenience re-export of fx.effects.reader.ask.

asks

asks: reader-effect helper applying a projection to the pipeline environment.

asks : (Env -> a) -> Computation a

Convenience re-export of fx.effects.reader.asks.

bind

bind: re-export of fx.kernel.bind for pipeline stage implementations.

bind : Computation a -> (a -> Computation b) -> Computation b

Re-export of fx.kernel.bind.

compose

compose: chain a list of stages into a single computation; each stage's transform receives the previous output and returns the next stage's input wrapped in a computation.

compose : [Stage] -> Data -> Computation Data

Chain stages into a single computation.

Each stage's transform receives the output of the previous stage and returns a computation producing the next stage's input. Initial data seeds the pipeline.

map

map: re-export of fx.kernel.map for pipeline stage implementations.

map : (a -> b) -> Computation a -> Computation b

Re-export of fx.kernel.map.

mkStage

mkStage: build a named pipeline stage carrying a transform, optional inputType/outputType schemas, and a description; stages chain through compose.

mkStage : { name, description ? "", transform, inputType ? null, outputType ? null } -> Stage

Create a named pipeline stage.

transform : Data -> Computation Data Takes current pipeline data, uses effects (ask, raise, warn), returns computation producing updated pipeline data.

inputType/outputType : optional type schemas for validation at stage boundaries (checked when provided). Validation uses fx.types.validate which sends typeCheck effects.

pure

pure: re-export of fx.kernel.pure for pipeline stage implementations.

pure : a -> Computation a

Re-export of fx.kernel.pure.

raise

raise: collecting-error helper for pipeline stages.

raise : String -> Computation a

Convenience re-export of fx.effects.error.raise.

raiseWith

raiseWith: collecting-error helper with context for pipeline stages.

raiseWith : Context -> String -> Computation a

Convenience re-export of fx.effects.error.raiseWith.

run

run: execute a pipeline with reader/error/acc/typecheck handlers wired up; returns { value, errors, warnings, typeErrors } from the final state.

run : Args -> [Stage] -> { value : Data, errors : [Err], warnings : [Warn], typeErrors : [Err] }

Execute a pipeline with effect handling.

args : { ... } Becomes the reader environment -- stages access via ask/asks.

stages : [Stage] Ordered list of stages to execute.

Returns: value -- final pipeline data from last stage errors -- list of { message, context } from validation failures warnings -- list of non-fatal warning items typeErrors -- list of type validation errors

warn

warn: accumulator helper for non-fatal pipeline warnings.

warn : a -> Computation null

Convenience re-export of fx.effects.acc.emit.