Streaming
Get output streams from your pipelines
Building a streaming pipeline
To return a streaming output from a pipeline you need to use the Stream
object:
from pipeline.objects.graph import Stream
This object should be initialised and returned from a function as shown in the below example:
import time
from pipeline import Pipeline, pipe
from pipeline.objects.graph import Stream, Variable
@pipe
def streamer(input_data: str) -> Stream:
def iter_output(data):
for word in data.split():
time.sleep(0.1)
yield word + " "
return Stream(iter_output(input_data))
with Pipeline() as builder:
in_1 = Variable(str)
result = streamer(in_1)
builder.output(result)
my_pipeline = builder.get_pipeline()
The Stream
object takes in an Iterator object. Be careful not to pass in an Iterable, which is very different. For example:
# This is an iterator
my_iterator = i in range(10)
my_stream = Stream(my_iterator)
# This is an iterable and will not work
my_array = [1, 2, 3, 4, 5]
my_stream = Stream(my_array) # This will not work
Performing a run
Performing a streaming run is similar to a regular run, except there is a different endpoint: /v4/runs/stream
. The input schema is identical, however while the output schema is the same, you will actually receive back a series of run results as they become available, each one separated by a newline.
Errors should be returned by the API in a similar way to the regular run endpoint.
Using the Python SDK
There is a stream_pipeline
generator function that can be used to perform a streaming run with the Python SDK. To use it you can do something similar to the example in the pipeline repo:
from pipeline.cloud.pipelines import stream_pipeline
pipeline = "pipeline_id_or_pointer"
inputs = ["example input string"]
print("Streaming pipeline:\n")
for result in stream_pipeline(pipeline, *inputs):
if result.error:
print(result)
else:
print(result.outputs_formatted()[0], flush=True)
Calling the API directly
Here is an example using CURL (note the different endpoint and the use of the -N
option, which disables buffering of the output stream, i.e. returns data as it becomes available):
curl -X POST 'https://www.mystic.ai/v4/runs/stream' \
--header 'Authorization: Bearer YOUR_TOKEN' \
--header 'Content-Type: application/json' \
--data '{
"pipeline": "my_user/my_pipeline:v1",
"inputs":
[
{"type":"string","value":"my_input_string"},
{"type":"integer","value":5}
]
}
' \
-N
Updated 5 months ago