The Initializer
is a component responsible for the initialization of a pipeline's "payload".
Payloads are used for two things:
- they are a piece of data inferred from the input, as such they can be used for patterns where your pipelines work on a domain entity which has to be resolved from external input
- they are taken as input by the
Indexer
functions in order to determine the object set which influences howStep
functions are executed
It is important to note that payloads are not always relevant to a pipeline, most of the time you won't need them and arguably if you can avoid them it will result in lower complexity.
By default, if you don't specify an Initializer
(and thus, an Indexer
) the Pipeline
will use a placeholder payload and run all steps on it.
If that is your case, you can directly skip to the next section.
The Indexer
functions are responsible for picking what objects the pipeline steps will be executed on.
Each Step
will run once for each indexed "object", as a result, the Indexer
can be considered as the place that determines the step execution flow.
In the example below, if the Indexer A
indexes a single object, all defined steps will be executed on it once:
flowchart LR
subgraph INIT_PHASE[Initialization]
direction LR
INITIALIZER(Initializer):::init
INDEXER(Indexer):::init
OBJECT_A[Object A]:::init
end
subgraph STEP_PHASE[Step Phase]
direction LR
STEP_A1(Step 1A):::step
STEP_A2(Step 2A):::step
STEP_A3(Step 3A):::step
end
INITIALIZER --> INDEXER
INDEXER -- index --> OBJECT_A --> STEP_A1 --> STEP_A2 --> STEP_A3
classDef init stroke:yellow;
classDef step stroke:#0f0;
If the Indexer A
returns a collection of objects, or if several Indexer
functions index different objects, steps will be executed once on each of them:
flowchart LR
subgraph INIT_PHASE[Initialization]
direction LR
INITIALIZER(Initializer):::init
INDEXER(Indexer):::init
OBJECT_A[Object A]:::init
OBJECT_B[Object B]:::init
end
subgraph STEP_PHASE[Step Phase]
direction LR
STEP_A1(Step 1A):::step
STEP_A2(Step 2A):::step
STEP_A3(Step 3A):::step
STEP_B1(Step 1B):::step
STEP_B2(Step 2B):::step
STEP_B3(Step 3B):::step
end
INITIALIZER --> INDEXER
INDEXER -- index --> OBJECT_A --> STEP_A1 --> STEP_A2 --> STEP_A3
INDEXER -- index --> OBJECT_B --> STEP_B1 --> STEP_B2 --> STEP_B3
classDef init stroke:yellow;
classDef step stroke:#0f0;
Each Step
sequence has its own control flow, so for instance if you apply a ResultEvaluator
on a Step
and trigger a DISCARD_AND_CONTINUE
strategy, you can block a Step
branch and follow-up on the others:
flowchart LR
subgraph INIT_PHASE[Initialization]
direction LR
INITIALIZER(Initializer):::init
INDEXER(Indexer):::init
OBJECT_A[Object A]:::init
OBJECT_B[Object B]:::init
end
subgraph STEP_PHASE[Step Phase]
direction LR
STEP_A1(Step 1A):::step
STEP_A2(Step 2A):::step
STEP_A3(Step 3A):::step
STEP_B1(Step 1B):::step
STEP_B2(Step 2B):::disabled
STEP_B3(Step 3B):::disabled
end
INITIALIZER --> INDEXER
INDEXER -- index --> OBJECT_A --> STEP_A1 -- CONTINUE --> STEP_A2 -- CONTINUE --> STEP_A3
INDEXER -- index --> OBJECT_B --> STEP_B1 -- DISCARD_AND_CONTINUE --x STEP_B2 -.- STEP_B3:::disabled
classDef init stroke:yellow;
classDef step stroke:#0f0;
classDef disabled stroke-dasharray: 3 3;
🚨 Currently, an
Indexer
can only returnIndexable
objects, i.e. with a declareduid()
method. ThePipeline
will consider the indexable'suid
and only index a given value once.
💡 If no
Indexer
is specified, the default indexing strategy isSingleIndexer.auto()
which will attempt to index the payload as a whole (as a result, the payload is expected to beIndexable
).
An Initializer
is a function that takes the pipeline input and context as arguments, and return the pipeline's payload.
For the sake of the following example, we'll consider a hypothetical scenario where we get a variety of inputs pertaining to vehicles that need to be analyzed
For each pipeline input we'll get a bunch of data that allow us to reconstruct our Vehicle
entity.
public record Vehicle(
String uid, // we'll use the vehicle's own uid as the Indexable identifier
Weight weight,
Metadata metadata
) implements Indexable {}
A corresponding Initializer
could look something like the following:
public class MyInitializer implements Initializer<MyInput, Vehicle>
{
@Override
public Vehicle initialize(MyInput input, Context<Vehicle> context, UIDGenerator generator)
{
return new Vehicle(
//the constructor arguments are suggestive obviously
input.getVehicleUid(),
computeWeight(input),
extractMetadata(context)
);
}
}
You will need at least one Indexable
for the pipeline to run, by default the Pipeline
will attempt to index the payload itself, which is possible here given we made the Vehicle
an Indexable
.
From there, you can register it the following way.
Pipeline<MyInput> pipeline = Pipeline.of("my-pipeline", new MyInitializer())
/* ...and others */
.build()
;
From there, you steps registered in this pipeline can access the payload via the @Payload
annotation:
@StepConfig
public MyResult doStuff(@Payload Vehicle vehicle) { /**/ }
One more scenario we'll consider is having a payload containing multiple indexables, in essence we can represent our target structure like this:
flowchart LR
subgraph INIT_PHASE[Initialization]
direction LR
INITIALIZER(Initializer):::init
INDEXER(Vehicle Indexer):::init
OBJECT_A[Vehicle A]:::init
OBJECT_B[Vehicle B]:::init
OBJECT_N[Vehicle n]:::init
end
subgraph STEP_PHASE[Step Phase]
direction LR
STEP_A1(Step 1A):::step
STEP_A2(Step 2A):::step
STEP_A3(Step 3A):::step
STEP_B1(Step 1B):::step
STEP_B2(Step 2B):::step
STEP_B3(Step 3B):::step
STEP_N1(Step 1n):::step
STEP_N2(Step 2n):::step
STEP_N3(Step 3n):::step
end
INITIALIZER --> INDEXER
INDEXER -- index --> OBJECT_A --> STEP_A1 --> STEP_A2 --> STEP_A3
INDEXER -- index --> OBJECT_B --> STEP_B1 --> STEP_B2 --> STEP_B3
INDEXER -. index .-> OBJECT_N:::optional -.-> STEP_N1:::optional -.-> STEP_N2:::optional -.-> STEP_N3:::optional
classDef init stroke:yellow;
classDef step stroke:#0f0;
classDef optional stroke-dasharray: 6 6;
For each vehicle in the payload, we'll perform a series of operations, for a start we'll have our new Initializer
with a new payload structure:
public record VehicleGroup(
List<Vehicle> vehicles
) {}
public class MyInitializer implements Initializer<MyInput>
{
@Override
public VehicleGroup initialize(MyInput input, Context context, UIDGenerator generator)
{
//the code below is an illustration only
return new VehicleGroup(input.someData().stream()
.map(data -> new Vehicle(
data.getVehicleUid(),
computeWeight(data),
extractMetadata(context)
))
.toList()
);
}
}
Then we'll use a MultiIndexer
on it:
public static class MyIndexer implements MultiIndexer<VehicleGroup>
{
@Override
public Collection<? extends Indexable> resolve(VehicleGroup payload)
{
return payload.vehicles();
}
}
Then build our pipeline:
Pipeline<MyInput> pipeline = Pipeline.of("my-pipeline", new MyInitializer())
.registerIndexer(new MyIndexer())
/* ...and others */
.build()
;
In this pipeline, we can have our steps accessing their respective Vehicle
by leveraging the @Object
annotation, and just like before we can also access the @Payload
:
@StepConfig
public MyResult doStuff(@Object Vehicle vehicle, @Payload VehicleGroup group) { /**/ }
Initializer
functions can be supplied to a pipeline builder "as-is", passing it directly to the of
builder method:
Pipeline<MyInput, VehicleGroup> pipeline = Pipeline.of("my-pipeline", new MyInitializer())
/* ...and others */
.build()
;
This is fine for simple setups and already gets you some of the data-pipeline
feature-set (component architecture, various integrations, pipeline-level default behaviours, etc.).
But at some point you will want finer-grained configuration at the component level, and this is where the InitializerAssembler
steps in.
The InitializerAssembler
accepts an Initializer
(the one you would be providing directly to the pipeline) and offers you a way to plug in local tweaks:
Pipeline<MyInput> pipeline = Pipeline.of("my-pipeline", builder -> builder
.initializer(new MyInitializer())
.withId("special-initializer")
.withErrorHandler(mySpecialErrorHandler)
)
/* ...and others */
.build()
;
In the snippet above, we added an error handler on the initializer. This is the kind of thing you could be doing if your component has to interact with an unreliable database or external API, for instance.
As we will show in the following sections, some of these options can be set through the @InitializerConfig
annotation. It should be noted that the InitializerAssembler
options have precedence over the @InitializerConfig
, so the latter is a good place to put your initializer defaults for instance.
The InitializerAssembler
accepts a variety of function modifiers which will alter how the Initializer
is executed.
All of these are optional, but can be very useful in implementing more sophisticated patterns.
The InitializerErrorHandler
is a wrapper which role is to act on exceptions thrown by the Initializer
it is applied to.
These handlers are useful:
- as exception wrappers: their contract gives access to the original
Initializer
exception, you can wrap the exception in order to standardize their signature, or introduce an exception type that can encapsulate metadata - as error recovery procedures: they can be leveraged for running fallback code
Error handlers also have a dedicated documentation section.
Pipeline<MyInput> pipeline = Pipeline.of("my-pipeline", builder -> builder
.initializer(new MyInitializer())
.withErrorHandler((ex, ctx, gen) -> {
logger.error("Error: {}", ex.getMessage());
return produceRecoveryVehicleGroup(); // the InitializerErrorHandler can choose between re-throwing/wrapping the exception, or returning a backup payload
})
)
.build()
;
An error handler can be applied via the @InitializerConfig
annotation if the InitializerErrorHandler
has a default constructor:
@InitializerConfig(errorHandler = MyErrorHandler.class)
public MyPayload doStuff() { /**/ }
Initializer
functions accept a variety of inputs, which can be combined as needed.
Some will be mapped by type directly as they are related to Pipeline
internals (e.g. Context
), others will need additional semantics via annotations.
When an argument is annotated with @Input
, the pipeline will attempt to map it to its own input:
@InitializerConfig
public MyPayload doStuff(@Input SomeType in) { /**/ }
If the requested input type do not match with the pipeline's input type, an IllegalArgumentException
will be thrown at execution time.
The PipelineTag
can be passed as argument, they are mapped by type so no specific annotation is required.
Pipeline tags are generated at the very start of the pipeline and contain the following properties:
- a
uid
as generated by theUIDGenerator
- a
pipeline
name as defined by its configuration - an
author
name as extracted by theAuthorResolver
@InitializerConfig(id = "my-initializer")
public MyPayload doStuff(PipelineTag tag)
{
/*
* The following would print something along the lines of:
* PipelineTag[uid=2zongloalw6vwnbrs7joqgf0cxh, pipeline=my-pipeline, author=anonymous]
*/
System.out.println(tag);
//return ...
}
The ComponentTag
can be passed as argument, they are mapped by type so no specific annotation is required.
Component tags are generated at the start of each component run and contain the following properties:
- a
uid
as generated by theUIDGenerator
- an
id
name as defined by its configuration - a
family
name depending on the type of component (INITIALIZER
,STEP
orSINK
) - a
pipelineTag
reference to current pipeline'sPipelineTag
@InitializerConfig(id = "my-initializer")
public MyPayload doStuff(ComponentTag tag)
{
/*
* The following would print something along the lines of:
* ComponentTag[uid=2zongm8nvgu7jrlc5tl0tbgcexk, pipelineTag=PipelineTag[uid=2zongloalw6vwnbrs7joqgf0cxh, pipeline=my-pipeline, author=anonymous], id=my-initializer, family=INITIALIZER]
*/
System.out.println(tag);
//return ...
}
The Context
can be mapped by type, it gives you access to the pipeline's context:
@InitializerConfig
public MyPayload doStuff(Context context)
{
//context.get("my_metadata_key", SomeType.class).orElseThrow();
}
More info on the context in the pipeline's section.
Single entries in the context can be passed via the @Context
annotation:
@InitializerConfig
public MyPayload doStuff(@Context("some_entry") String someEntry) { /**/ }
/* The argument can be an `Optional`, it will be empty if no match can be found */
@InitializerConfig
public MyPayload doStuff(@Context("some_entry") Optional<String> someEntry) { /**/ }
You can access the UIDGenerator
currently in use by the pipeline by requesting it as an argument:
@InitializerConfig
public MyPayload doStuff(UIDGenerator generator)
{
//generator.generate();
}
This can be useful if you want to harmonize the generation of UIDs between data-pipeline
managed data and more business-centric data.
Typically, if data-lineage is a concern, you might want to persist the PipelineTag
or ComponentTag
in some data store.
It may then be relevant to use the same UID generation strategy for other data models, as these UIDs can have properties such as being time-stamped or lexicographically sortable.
The LogMarker
is a component that can produce a LabelMarker
out of the TagResolver
currently in use by the pipeline.
Its use is encouraged for annotating your own logs with contextual information (see the relevant TagResolver
section).
@InitializerConfig
public MyPayload doStuff(LogMarker marker)
{
logger.info(marker.mark(), "This is my log: {}", 123);
logger.info(marker.mark("my_local_key", "my_value"), "This is my log: {}", 234);
//return ...
}