Component
Components are the building blocks of our pipeline system. There are three ways to create a Component:
Class-based Component (Component.category: “class” or “static”) Inherit from the
Componentclass ingpt_graph.core.component:class CustomComponent(Component): step_type = "node_to_list" input_schema = {"source_url": {"type": str}} cache_schema = {} output_schema = {"video_data": {"type": "list"}} output_format = "node_like"
Decorator-based Static Component (Component.category: “static”) Use the
@componentdecorator fromgpt_graph.decorators.component:@component(step_type="list_to_node") def f5(x): return np.sum(x)
Method-based Component (Component.category: “method”) Define the component as a method inside a Pipeline class, still using the
@componentdecorator.
Component Categories
The Component.category determines how the function will be executed when wrapped inside a Step:
“class”: The Step initiates the component as a class and calls
cp.run().“static”: The Step directly calls the static function inside the Component.
“method”: The Step plugs in
selfas the current running Pipeline, so the method runs as if bound to the Pipeline.
Pipeline Execution
When Pipeline.run() is called:
Components in the pipeline are checked against their criteria (bindings) or the last running step’s criteria (linkings).
If criteria are met, Steps are created by the Component. Then Steps will be insert into Pipeline’s priority queue Pipeline.sub_steps_q
If Component.category is
static. That means Component.run is a static method, it will be run during Step.run
class. During Step.run, a new Component will be created and its run method will be called. important, so Component create Step and then Step will also create several Components using Component.clone() and eventually run them.
method. This Component’s self is bound to the outside Pipeline, therefore it can get access to all the Pipeline’s methods and data, including Pipeline.route_to.
Steps run according to their priorities, adding nodes to
Pipeline.sub_node_graph(a wrapper of a networkx graph).
Step type
There are several possible values: “node_to_list”, “list_to_node”, “node_to_node” and “list_to_list” The main idea is, each previous Step will produce a list of nodes. How should the next Step process it? It can use “group” mode or “apply” mode. e.g.
sum should use “list_to_node”
split should use “node_to_list”
if you want to multiply each node content by 2, use “node_to_node”
filter should use “list_to_list”
The step_type of a Component directly influences the dim (dimension) key in its input_schema. This relationship can be summarized as follows:
If step_type is “node_to_x” (where x can be node or list), then dim is typically ≥ 0.
If step_type is “list_to_x” (where x can be node or list), then dim is typically -1.
Input Schema
The Component.input_schema defines how input data is processed. For example:
input_schema = {
"y": {
"type": Any,
"field": "extra.haha",
"dim": 0,
"group": g,
"filter_cri": {"step_name": {"$regex": "f6", "$order": -1}}
}
}
“y” here is the parameter name of the Component.run. In Step.run, parameters and previous Step’s nodes will be processed and plugged into Component.run.
Key concepts:
dim
Determines how input nodes are processed.
-1: All nodes from the previous step are input as a list.≥0: The Component runs multiple times, once for each node.
Note: When multiple inputs have the same dimension, they should have the same number of elements. If they do not have the same number of elements, they will be pruned/duplicated until the numbers are the same.
Example
Consider a Component with the following input schema:
input_schema = {
"x": {"dim": 0, "field": "content", "filter_cri": {"step_name": {"$regex": "step_x", "$order": -1}}},
"y": {"dim": 0, "field": "content", "filter_cri": {"step_name": {"$regex": "step_y", "$order": -1}}},
}
After applying the filter_cri for each parameter, let’s assume we have the following filtered nodes:
# Filtered nodes for x
x_nodes = [
{"node_id": "001", "content": 1, "step_name": "step_x"},
{"node_id": "002", "content": 2, "step_name": "step_x"},
{"node_id": "003", "content": 3, "step_name": "step_x"},
{"node_id": "004", "content": 4, "step_name": "step_x"}
]
# Filtered nodes for y
y_nodes = [
{"node_id": "101", "content": 10, "step_name": "step_y", "parent_ids": ["001", "003"]},
{"node_id": "102", "content": 20, "step_name": "step_y", "parent_ids": ["002"]}
]
Here’s how the inputs would be processed:
First,
xandyare aligned based on their relationships:x: [ {"node_id": "001", "content": 1, "step_name": "step_x"}, {"node_id": "003", "content": 3, "step_name": "step_x"}, {"node_id": "002", "content": 2, "step_name": "step_x"} ] y: [ {"node_id": "101", "content": 10, "step_name": "step_y", "parent_ids": ["001", "003"]}, {"node_id": "101", "content": 10, "step_name": "step_y", "parent_ids": ["001", "003"]}, {"node_id": "102", "content": 20, "step_name": "step_y", "parent_ids": ["002"]} ]
Note that:
xnodes are reordered to match the parent relationships iny.The node with
node_id: "004"fromxis discarded as it has no correspondingynode.The node with
node_id: "101"fromyis duplicated to match its two parent nodes inx.
Then, the
fieldspecified in theinput_schemais used to extract the actual input values:x_values = [1, 3, 2] y_values = [10, 10, 20]
The Component will run three times with the following inputs. The results are will be stored in both the Step and Pipeline.sub_node_graph as nodes(dict)
Run 1: f(x=1, y=10) Run 2: f(x=3, y=10) Run 3: f(x=2, y=20)
group
Groups, created by Group(), are used to plug values into function parameters:
g = Group(
filter_cri={"step_name": {"$regex": "f6", "$order": -1}},
parent_filter_cri={"step_name": {"$regex": "f4", "$order": -1}},
)
s.p6 = s.f4 | s.f6 | s.f5.prepend(g)
In this example:
For the Group g:
Parent Node Selection:
Selects nodes with
step_namematching the regex pattern “f4”."$order": -1selects the last (most recent) matching node if multiple nodes match.
Child Node Filtering:
Filters for nodes with
step_namematching the regex pattern “f6”."$order": -1selects the most recent matching node.
Expected Output: When the Group’s
run()method is called, it produces a structured output:A list of Group objects (stored in
self.contains).Each Group contains “f6” nodes that are children of a specific “f4” node.
Example structure:
[ sub-group1 containing [f6_1, f6_2], # Children of f4_1 sub-group2 containing [f6_3, f6_4, f6_5] # Children of f4_2 ]
Pipeline Execution:
gis prepended to Components.f5in Pipelines.p6.When
s.p6.run()is called, after runningp6;f4.0, a new Stepp6;f5.0is created and callsg.greturns a list of lists of nodes (e.g., 6 lists of 5 nodes each).f5is copied and run 6 times, each with a list of 5 nodes as input to f5.run method.
type
Purpose and Behavior
Currently, only matters if it is set as “node”
Format:
{'x':{'type': 'node' or anything else}}If type is “node”:
Previous Step’s nodes (a special dict) will be used as the input
If type is not “node”:
By default, the ‘content’ value of the nodes will be used. If “field” is defined in input_schema, the value corresponding to the field will be used
Example Node Structure
s.p6.sub_node_graph.nodes[1026]
Out:
{'node_id': uuid_ex(1026),
'content': 10,
'type': str,
'name': 'input',
'level': 0,
'step_name': 'p6;InputInitializer:sp0',
'step_id': 0,
'extra': {'x': 15},
'parent_ids': [],
'if_output': True,
'cp_name': 'p6;InputInitializer'}
Behavior based on step_type
If type is “node” and step_type is “node_to_node”:
Each of the above nodes will be used as an input for the next Component
If type is not “node”:
By default, the content (10 in this example) will be used as the input for the next Component
if “field” is “step_id” for example, 0 will be used.
field
Purpose and Behavior
The input of a Component.run can be nodes (a special dict) or anything inside the nodes
For example (check Example Node Structure of the previous section), Component.input_schema= {“y”: {“type”: str, “field”: “extra.x”}}
First, get all the nodes(dict) from the previous Step, you have a list of nodes.
As the type is not “node”, 15 (from ‘extra.x’) will be used as the input for the next Component
filter_cri
Purpose and Behavior
Uses MongoDB query language with some extensions
By default, all nodes from the previous one step will be processed
If defined, the specified filter_cri will be used instead as the filtering criteria of nodes.
Example
filter_cri={"step_name": {"$regex": "f6", "$order": -1}}
This means:
$regex:”f6” means that step_name should contain “f6”.
$order:-1 means that
If many nodes confirm with this criteria, they will be grouped by step_name as list of list of nodes
The last list of nodes will be selected, as their order is -1
cache_schema
Purpose and Behavior
The Cache Schema is a mechanism used in Pipelines and their sub-Components and Steps to manage shared caching of values. It provides a flexible way to store, retrieve, and initialize cached values across different parts of the pipeline.
The cache is shared across the pipeline, allowing for efficient data sharing between components.
Structure and Example
The cache schema is defined as a dictionary:
cache_schema = {
"z": {
"key": "[cp_or_pp.name]",
"initializer": lambda: z(),
},
"<SELF>": {"key": "haha"},
"a": {"key": "<TEMP>", "initializer": "[node_graph]"}
}
Key Components
param_name: The name of the parameter in the component (e.g., “z”, “
”, “a”). key: Specifies the cache key stored in cache.
initializer: A function or expression to initialize the value if not found in cache.
Special param_name
<SELF>: A special parameter that refers to the component itself.
Special Keys and Behaviors
Anything enclosed in []: A dynamic key that uses the name of the current component or pipeline. (e.g.,
"[cp_or_pp.name]"means the key stored in cache isself.cp_or_pp.name)<TEMP>: Indicates a temporary value that should be reinitialized each time and not stored in cache.Use
<TEMP>for values that should not persist in the cache between runs.
Caching Process
For each parameter in the cache schema:
Generate the cache key using the specified “key” (e.g., k).
Check if the value exists in the cache.
If not found, use the initializer to create the value.
Store the value in the cache (except for
<TEMP>keys). (e.g., cache = {k:v})Update the parameter with the cached or initialized value.
Special handling for
<SELF>:If found in cache, use it directly as the current component in
self.cp_run_func.
Initializer Types
Callable: A function that can accept arguments based on available parameters and cache values.
String Expression: Enclosed in square brackets, refers to an attribute path in the current object.
Explanation of the Example
Parameter “z”:
"key": "[cp_or_pp.name]": Cache key is dynamically generated using the current component or pipeline name."initializer": lambda: z(): If not in cache, initialized by callingz().
Parameter “
”: Refers to the component itself.
"key": "haha": Cache key for storing the entire component instance.No initializer provided as it refers to the existing component instance.
Parameter “a”:
"key": "<TEMP>": Recomputed each time, not stored in persistent cache."initializer": "[node_graph]": Initial value obtained by evaluatingself.node_graph.
Resulting cache:
cache = {self.cp_or_pp.name: z(), "haha": self} # <TEMP> is ignored
output_schema
Purpose and Behavior
The ‘type’ and ‘name’ attributes will be passed into the nodes the Step created
output_format
As mentioned previously, when Step.run, Components will be cloned and the result of Component.run will be stored in Pipeline.sub_node_graph(and Step.node_graph). The output_format variable determines how the result of a Component.run is processed and added to the node graph. It supports several formats:
1. “node_like” or “node”
The output of the Component should be a node-like dict.
New nodes are created based on the output dict of the Component, especially “content” and “extra” attributes of the dict.
if the output nodes of the Component contains “parent_nodes” attributes, then it will be used to identify parent nodes.
2. “graph”
Expects the result to be a networkx graph.
Adds missing information (step_id, step_name, cp_name) to nodes in the result graph.
Combines the result graph with the existing node graph.
Identifies newly added nodes and appends them to
new_nodes.
3. “dict”
For each key-value pair in the result:
Creates a new node with the value as content.
if output is {x:y, a:b}, then 2 nodes will be created with node’s name x and a respectively
4. “none”
Does not process the result or add any nodes.
5. Default (if not “node”)
Creates a new node with the entire result as content.
Bindings and Linkings
Overview
Bindings and linkings determine the execution order of Components within a Pipeline.
Bindings
Bindings define conditions for Component execution after a Step completes.
Key Concepts:
After each Step, the Pipeline checks if Components should be triggered based on their bindings.
Components use
if_trigger_bindingsto determine execution.If bindings are satisfied, a new Step is created and added to the Pipeline’s priority queue.
Linkings
Linkings define direct connections between Components for specific execution order.
Key Concepts:
Linkings create direct relationships between Components.
After a Step completes, the Pipeline checks linkings to determine the next Component to execute.
The previous Component checks which other Components it can link to.
Example
s = Session()
s.f4, s.f6, s.f5 = f4(), f6(), f5()
s.p6 = s.f4 | s.f6 | s.f5
[c.full_name for c in s.p6.contains]
# ['p6;InputInitializer', 'p6;f4.0', 'p6;f6.0', 'p6;f5.0']
[c.uuid for c in s.p6.contains]
# [uuid_ex(1057), uuid_ex(1064), uuid_ex(1069), uuid_ex(1074)]
[c.bindings for c in s.p6.contains]
# [
# [],
# [{'cp_or_pp.uuid': {'$eq': uuid_ex(1057)}}],
# [{'cp_or_pp.uuid': {'$eq': uuid_ex(1064)}}],
# [{'cp_or_pp.uuid': {'$eq': uuid_ex(1069)}}]
# ]
This example demonstrates how Components are linked and their bindings are set up in a Pipeline.
Execution Process
The Pipeline’s run method manages Step execution:
Execute the next Step from the priority queue (Pipeline.sub_steps_q).
Check linkings to determine the next Component.
Check all Components’ bindings for potential triggering.
Create new Steps for Components with satisfied bindings.
This process ensures correct execution order based on linkings and bindings.