streamsx.objectstorage package¶
Object Storage integration for IBM Streams¶
For details of implementing applications in Python for IBM Streams including IBM Cloud Pak for Data and the Streaming Analytics service running on IBM Cloud see:
Overview¶
IBM® Cloud Object Storage (COS) makes it possible to store practically limitless amounts of data, simply and cost effectively. It is commonly used for data archiving and backup, web and mobile applications, and as scalable, persistent storage for analytics.
Cloud Object Storage or any other S3 compatible object storage can be used.
This module allows a Streams application to create objects in parquet format WriteParquet
or
to write string messages with Write
from a stream
of tuples.
Objects can be listed with Scan
and read with Read
.
Credentials¶
Select one of the following options to define your Cloud Object Storage credentials:
Streams application configuration
Setting the Cloud Object Storage service credentials JSON as dict to the
credentials
parameter of the functions.
By default an application configuration named cos is used,
a different configuration name can be specified using the credentials
parameter to Write
, WriteParquet
, Scan
or Read
.
In addition to IAM token-based authentication, it is also possible to authenticate using a signature created from a pair of access and secret keys.
Provide the HMAC keys with the credentials
parameter as dictionary, for example:
credentials = {}
credentials['access_key_id'] = '7exampledonotusea6440da12685eee02'
credentials['secret_access_key'] = '8not8ed850cddbece407exampledonotuse43r2d2586'
Endpoints¶
It is required that you create a bucket before launching an application using this module.
When running the application in a Streaming Analytics service instance, it is recommended, for best performance, to create a bucket with:
Resiliency: regional
Location: Near your Streaming Analytics service, for example us-south
Storage class: Standard
With these setting above it is recommended to use the private endpoint for the US-South region:
endpoint='s3.private.us-south.cloud-object-storage.appdomain.cloud'
Note:
Use public endpoints to point your application that are hosted outside of the IBM cloud.
Use cross-region endpoints for buckets creation with cross-region resiliency.
Set the URL to object storage service with the
endpoint
parameter.
Find the list of endpoints and the endpoint description here: IBM® Cloud Object Storage Endpoints
To access any other Amazon S3 compatible object storage server you need set the endpoint
parameter, for example the MinIO server running at https://play.min.io:9000 needs a value for the endpoint parameter like below:
endpoint='play.min.io:9000'
Sample¶
A simple hello world example of a Streams application writing string messages to an object. Scan for created object and read the content:
from streamsx.topology.topology import *
from streamsx.topology.schema import CommonSchema
from streamsx.topology.context import submit
import streamsx.objectstorage as cos
topo = Topology('ObjectStorageHelloWorld')
to_cos = topo.source(['Hello', 'World!'])
to_cos = to_cos.as_string()
# sample bucket with resiliency "regional" and location "us-south"
bucket = 'streamsx-py-sample'
# US-South region private endpoint
endpoint='s3.private.us-south.cloud-object-storage.appdomain.cloud'
# provide COS credentials as dict
credentials = json.loads(your_cos_json)
# Write a stream to COS
to_cos.for_each(cos.Write(bucket=bucket, endpoint=endpoint, object='/sample/hw%OBJECTNUM.txt', credentials=credentials))
scanned = topo.source(cos.Scan(bucket=bucket, endpoint=endpoint, directory='/sample', credentials=credentials))
# read text file line by line
r = scanned.map(cos.Read(bucket=bucket, endpoint=endpoint, credentials=credentials))
# print each line (tuple)
r.print()
submit('STREAMING_ANALYTICS_SERVICE', topo)
# Use for IBM Streams including IBM Cloud Pak for Data
# submit ('DISTRIBUTED', topo)
-
class
streamsx.objectstorage.
Scan
(bucket, endpoint, pattern='.*', directory='/', credentials=None, protocol='cos', **options)¶ Bases:
streamsx.topology.composite.Source
Scan a directory in a bucket for object names.
Scans an object storage directory and emits the names of new or modified objects that are found in the directory.
Example scanning a directory
/sample
for objects matching the pattern:import streamsx.objectstorage as cos scans = topo.source(cos.Scan(bucket='your-bucket-name', directory='/sample', pattern='SAMPLE_[0-9]*\.ascii\.text$'))
New in version 1.5.
-
bucket
¶ Bucket name. Bucket must have been created in your Cloud Object Storage service before using this class.
- Type
str
-
endpoint
¶ Endpoint for Cloud Object Storage. Select the endpoint for your bucket location and resiliency: IBM® Cloud Object Storage Endpoints. Use a private enpoint when running in IBM cloud Streaming Analytics service.
- Type
str
-
pattern
¶ Limits the object names that are listed to the names that match the specified regular expression.
- Type
str
-
directory
¶ Specifies the name of the directory to be scanned. Any subdirectories are not scanned.
- Type
str
-
credentials
¶ Credentials as dict or name of the application configuration containing the credentials for Cloud Object Storage. When set to
None
the application configurationcos
is used.- Type
str|dict
-
protocol
¶ Protocol used by the S3 client, either
cos
(IAM and HMAC authentication supported) ors3a
(requires HMAC authentication). Protocols3a
supports multipart upload. Protocol selection- Type
str
-
options
¶ The additional optional parameters as variable keyword arguments.
- Type
kwargs
-
Returns
¶ Stream: Object names stream with schema
CommonSchema.String
.
-
populate
(topology, name, **options)¶ Populate the topology with this composite source. Subclasses must implement the
populate
function.populate
is called when the composite is added to the topology with:topo = Topology() source_stream = topo.source(mySourceComposite)
- Parameters
topology – Topology containing the source.
name – Name passed into
source
.**options – Future options passed to
source
.
- Returns
Single stream representing the source.
- Return type
Stream
-
property
ssl_enabled
¶ Set to
False
if you want to use HTTP instead of HTTPS. Per default SSL is enabled and HTTPS is used.- Type
bool
-
property
vm_arg
¶ Arbitrary JVM arguments can be passed. For example, increase JVM’s maximum heap size
'-Xmx 8192m'
.- Type
str
-
-
class
streamsx.objectstorage.
Read
(bucket, endpoint, credentials=None, protocol='cos', **options)¶ Bases:
streamsx.topology.composite.Map
Read an object in a bucket.
Reads the object specified in the input stream and emits content of the object. Expects
CommonSchema.String
in the input stream.Example of reading object with the objects names from the
scanned
stream:import streamsx.objectstorage as cos r = scanned.map(cos.Read(bucket=bucket, endpoint=endpoint))
New in version 1.5.
-
bucket
¶ Bucket name. Bucket must have been created in your Cloud Object Storage service before using this class.
- Type
str
-
endpoint
¶ Endpoint for Cloud Object Storage. Select the endpoint for your bucket location and resiliency: IBM® Cloud Object Storage Endpoints. Use a private enpoint when running in IBM cloud Streaming Analytics service.
- Type
str
-
credentials
¶ Credentials as dict or name of the application configuration containing the credentials for Cloud Object Storage. When set to
None
the application configurationcos
is used.- Type
str|dict
-
protocol
¶ Protocol used by the S3 client, either
cos
(IAM and HMAC authentication supported) ors3a
(requires HMAC authentication). Protocols3a
supports multipart upload. Protocol selection- Type
str
-
options
¶ The additional optional parameters as variable keyword arguments.
- Type
kwargs
-
Returns
¶ streamsx.topology.topology.Stream
: Object content line by line with schemaCommonSchema.String
.
-
populate
(topology, stream, schema, name, **options)¶ Populate the topology with this composite map transformation. Subclasses must implement the
populate
function.populate
is called when the composite is added to the topology with:transformed_stream = input_stream.map(myMapComposite)
- Parameters
topology – Topology containing the composite map.
stream – Stream to be transformed.
schema – Schema passed into
map
.name – Name passed into
map
.**options – Future options passed to
map
.
- Returns
Single stream representing the transformation of stream.
- Return type
Stream
-
property
ssl_enabled
¶ Set to
False
if you want to use HTTP instead of HTTPS. Per default SSL is enabled and HTTPS is used.- Type
bool
-
property
vm_arg
¶ Arbitrary JVM arguments can be passed. For example, increase JVM’s maximum heap size
'-Xmx 8192m'
.- Type
str
-
-
class
streamsx.objectstorage.
Write
(bucket, endpoint, object, time_per_object=10.0, credentials=None, protocol='cos', **options)¶ Bases:
streamsx.topology.composite.ForEach
Write strings to an object.
Adds a COS-Writer where each tuple on stream is written into an object.
Expects
CommonSchema.String
in the input stream.Example of creating an object with two lines:
import streamsx.objectstorage as cos to_cos = topo.source(['Hello', 'World!']) to_cos = to_cos.as_string() to_cos.for_each(cos.Write(bucket, endpoint, '/sample/hw%OBJECTNUM.txt'))
New in version 1.5.
-
bucket
¶ Bucket name. Bucket must have been created in your Cloud Object Storage service before using this class.
- Type
str
-
endpoint
¶ Endpoint for Cloud Object Storage. Select the endpoint for your bucket location and resiliency: IBM® Cloud Object Storage Endpoints. Use a private enpoint when running in IBM cloud Streaming Analytics service.
- Type
str
-
object
¶ Name of the object to be created in your bucket. For example,
SAMPLE_%OBJECTNUM.text
, %OBJECTNUM is an object number, starting at 0. When a new object is opened for writing the number is incremented.- Type
str
-
time_per_object
¶ Specifies the approximate time, in seconds, after which the current output object is closed and a new object is opened for writing.
- Type
int|float|datetime.timedelta
-
credentials
¶ Credentials as dict or name of the application configuration containing the credentials for Cloud Object Storage. When set to
None
the application configurationcos
is used.- Type
str|dict
-
protocol
¶ Protocol used by the S3 client, either
cos
(IAM and HMAC authentication supported) ors3a
(requires HMAC authentication). Protocols3a
supports multipart upload. Protocol selection- Type
str
-
options
¶ The additional optional parameters as variable keyword arguments.
- Type
kwargs
-
Returns
¶ streamsx.topology.topology.Sink
: Stream termination.
-
property
header
¶ Specify the content of the header row. This header is added as first line in the object. Use this parameter when writing strings in CSV format and you like to query the objects with the IBM SQL Query service. By default no header row is generated.
- Type
str
-
populate
(topology, stream, name, **options) → streamsx.topology.topology.Sink¶ Populate the topology with this composite for each transformation. Subclasses must implement the
populate
function.populate
is called when the composite is added to the topology with:sink = input_stream.for_each(myForEachComposite)
- Parameters
topology – Topology containing the composite map.
stream – Stream to be transformed.
name – Name passed into
for_each
.**options – Future options passed to
for_each
.
- Returns
Termination for this composite transformation of stream.
- Return type
-
property
ssl_enabled
¶ Set to
False
if you want to use HTTP instead of HTTPS. Per default SSL is enabled and HTTPS is used.- Type
bool
-
property
vm_arg
¶ Arbitrary JVM arguments can be passed. For example, increase JVM’s maximum heap size
'-Xmx 8192m'
.- Type
str
-
-
class
streamsx.objectstorage.
WriteParquet
(bucket, endpoint, object, time_per_object=10.0, credentials=None, protocol='cos', **options)¶ Bases:
streamsx.topology.composite.ForEach
Create objects in parquet format.
Adds a COS-Writer where each tuple on stream is written into an object in parquet format.
Example of creating objects in parquet format from a stream named ‘js’ in JSON format:
import streamsx.objectstorage as cos ... # JSON to tuple to_cos = js.map(schema='tuple<rstring a, int32 b>') to_cos.for_each(cos.write(bucket=bucket, endpoint=endpoint, object='/parquet/sample/hw%OBJECTNUM.parquet'))
New in version 1.5.
-
bucket
¶ Bucket name. Bucket must have been created in your Cloud Object Storage service before using this class.
- Type
str
-
endpoint
¶ Endpoint for Cloud Object Storage. Select the endpoint for your bucket location and resiliency: IBM® Cloud Object Storage Endpoints. Use a private enpoint when running in IBM cloud Streaming Analytics service.
- Type
str
-
object
¶ Name of the object to be created in your bucket. For example,
SAMPLE_%OBJECTNUM.text
, %OBJECTNUM is an object number, starting at 0. When a new object is opened for writing the number is incremented.- Type
str
-
time_per_object
¶ Specifies the approximate time, in seconds, after which the current output object is closed and a new object is opened for writing.
- Type
int|float|datetime.timedelta
-
credentials
¶ Credentials as dict or name of the application configuration containing the credentials for Cloud Object Storage. When set to
None
the application configurationcos
is used.- Type
str|dict
-
protocol
¶ Protocol used by the S3 client, either
cos
(IAM and HMAC authentication supported) ors3a
(requires HMAC authentication). Protocols3a
supports multipart upload. Protocol selection- Type
str
-
options
¶ The additional optional parameters as variable keyword arguments.
- Type
kwargs
-
Returns
¶ streamsx.topology.topology.Sink
: Stream termination.
-
populate
(topology, stream, name, **options) → streamsx.topology.topology.Sink¶ Populate the topology with this composite for each transformation. Subclasses must implement the
populate
function.populate
is called when the composite is added to the topology with:sink = input_stream.for_each(myForEachComposite)
- Parameters
topology – Topology containing the composite map.
stream – Stream to be transformed.
name – Name passed into
for_each
.**options – Future options passed to
for_each
.
- Returns
Termination for this composite transformation of stream.
- Return type
-
property
ssl_enabled
¶ Set to
False
if you want to use HTTP instead of HTTPS. Per default SSL is enabled and HTTPS is used.- Type
bool
-
property
vm_arg
¶ Arbitrary JVM arguments can be passed. For example, increase JVM’s maximum heap size
'-Xmx 8192m'
.- Type
str
-
-
streamsx.objectstorage.
download_toolkit
(url=None, target_dir=None)¶ Downloads the latest Objectstorage toolkit from GitHub.
Example for updating the Objectstorage toolkit for your topology with the latest toolkit from GitHub:
import streamsx.objectstorage as objectstorage # download toolkit from GitHub objectstorage_toolkit_location = objectstorage.download_toolkit() # add the toolkit to topology streamsx.spl.toolkit.add_toolkit(topology, objectstorage_toolkit_location)
Example for updating the topology with a specific version of the Objectstorage toolkit using a URL:
import streamsx.objectstorage as objectstorage url1100 = 'https://github.com/IBMStreams/streamsx.objectstorage/releases/download/v1.10.0/streamsx.objectstorage.toolkits-1.10.0-20190730-1132.tgz' objectstorage_toolkit_location = objectstorage.download_toolkit(url=url1100) streamsx.spl.toolkit.add_toolkit(topology, objectstorage_toolkit_location)
- Parameters
url (str) – Link to toolkit archive (*.tgz) to be downloaded. Use this parameter to download a specific version of the toolkit.
target_dir (str) – the directory where the toolkit is unpacked to. If a relative path is given, the path is appended to the system temporary directory, for example to /tmp on Unix/Linux systems. If target_dir is
None
a location relative to the system temporary directory is chosen.
- Returns
the location of the downloaded Objectstorage toolkit
- Return type
str
Note
This function requires an outgoing Internet connection
New in version 1.3.
-
streamsx.objectstorage.
configure_connection
(instance, name='cos', credentials=None)¶ Configures IBM Streams for a certain connection.
Creates an application configuration object containing the required properties with connection information.
Example for creating a configuration for a Streams instance with connection details:
from icpd_core import icpd_util from streamsx.rest_primitives import Instance import streamsx.objectstorage as cos cfg = icpd_util.get_service_instance_details(name='your-streams-instance', instance_type='streams') cfg[streamsx.topology.context.ConfigParams.SSL_VERIFY] = False instance = Instance.of_service(cfg) app_cfg = cos.configure_connection(instance, credentials='my_credentials_json')
- Parameters
instance (streamsx.rest_primitives.Instance) – IBM Streams instance object.
name (str) – Name of the application configuration, default name is ‘cos’.
credentials (str|dict) – The service credentials for IBM Cloud Object Storage.
- Returns
Name of the application configuration.
Warning
The function can be used only in IBM Cloud Pak for Data
New in version 1.1.
-
streamsx.objectstorage.
scan
(topology, bucket, endpoint, pattern='.*', directory='/', credentials=None, ssl_enabled=None, vm_arg=None, name=None)¶ Scan a directory in a bucket for object names.
Scans an object storage directory and emits the names of new or modified objects that are found in the directory.
Example scanning a directory
/sample
for objects matching the pattern:import streamsx.objectstorage as cos scans = cos.scan(topo, bucket='your-bucket-name', directory='/sample', pattern='SAMPLE_[0-9]*\.ascii\.text$')
- Parameters
topology (Topology) – Topology to contain the returned stream.
bucket (str) – Bucket name. Bucket must have been created in your Cloud Object Storage service before using this function.
endpoint (str) –
Endpoint for Cloud Object Storage. Select the endpoint for your bucket location and resiliency: IBM® Cloud Object Storage Endpoints. Use a private enpoint when running in IBM cloud Streaming Analytics service.
pattern (str) – Limits the object names that are listed to the names that match the specified regular expression.
directory (str) – Specifies the name of the directory to be scanned. Any subdirectories are not scanned.
credentials (str|dict) – Credentials as dict or name of the application configuration containing the credentials for Cloud Object Storage. When set to
None
the application configurationcos
is used.ssl_enabled (bool) – Set to
False
if you want to use HTTP instead of HTTPS. Per default SSL is enabled and HTTPS is used.vm_arg (str) – Arbitrary JVM arguments can be passed. For example, increase JVM’s maximum heap size
'-Xmx 8192m'
.name (str) – Sink name in the Streams context, defaults to a generated name.
- Returns
Object names stream with schema
CommonSchema.String
.- Return type
Stream
Deprecated since version 1.5.0: Use the
Scan
.
-
streamsx.objectstorage.
read
(stream, bucket, endpoint, credentials=None, ssl_enabled=None, vm_arg=None, name=None)¶ Read an object in a bucket.
Reads the object specified in the input stream and emits content of the object.
Example of reading object with the objects names from the
scanned
stream:import streamsx.objectstorage as cos r = cos.read(scanned, bucket=bucket, endpoint=endpoint)
- Parameters
stream (streamsx.topology.topology.Stream) – Stream of tuples with object names to be read. Expects
CommonSchema.String
in the input stream.bucket (str) – Bucket name. Bucket must have been created in your Cloud Object Storage service before using this function.
endpoint (str) –
Endpoint for Cloud Object Storage. Select the endpoint for your bucket location and resiliency: IBM® Cloud Object Storage Endpoints. Use a private enpoint when running in IBM cloud Streaming Analytics service.
credentials (str|dict) – Credentials as dict or name of the application configuration containing the credentials for Cloud Object Storage. When set to
None
the application configurationcos
is used.ssl_enabled (bool) – Set to
False
if you want to use HTTP instead of HTTPS. Per default SSL is enabled and HTTPS is used.vm_arg (str) – Arbitrary JVM arguments can be passed. For example, increase JVM’s maximum heap size
'-Xmx 8192m'
.name (str) – Sink name in the Streams context, defaults to a generated name.
- Returns
Object content line by line with schema
CommonSchema.String
.- Return type
Deprecated since version 1.5.0: Use the
Read
.
-
streamsx.objectstorage.
write
(stream, bucket, endpoint, object, time_per_object=10.0, header=None, credentials=None, ssl_enabled=None, vm_arg=None, name=None)¶ Write strings to an object.
Adds a COS-Writer where each tuple on stream is written into an object.
Example of creating an object with two lines:
import streamsx.objectstorage as cos to_cos = topo.source(['Hello', 'World!']) to_cos = to_cos.as_string() cos.write(to_cos, bucket, endpoint, '/sample/hw%OBJECTNUM.txt')
- Parameters
stream (streamsx.topology.topology.Stream) – Stream of tuples to be written to an object. Expects
CommonSchema.String
in the input stream.bucket (str) – Bucket name. Bucket must have been created in your Cloud Object Storage service before using this function.
endpoint (str) –
Endpoint for Cloud Object Storage. Select the endpoint for your bucket location and resiliency: IBM® Cloud Object Storage Endpoints. Use a private enpoint when running in IBM cloud Streaming Analytics service.
object (str) – Name of the object to be created in your bucket. For example,
SAMPLE_%OBJECTNUM.text
, %OBJECTNUM is an object number, starting at 0. When a new object is opened for writing the number is incremented.time_per_object (int|float|datetime.timedelta) – Specifies the approximate time, in seconds, after which the current output object is closed and a new object is opened for writing.
header (str) – Specify the content of the header row. This header is added as first line in the object. Use this parameter when writing strings in CSV format and you like to query the objects with the IBM SQL Query service. By default no header row is generated.
credentials (str|dict) – Credentials as dict or name of the application configuration containing the credentials for Cloud Object Storage. When set to
None
the application configurationcos
is used.ssl_enabled (bool) – Set to
False
if you want to use HTTP instead of HTTPS. Per default SSL is enabled and HTTPS is used.vm_arg (str) – Arbitrary JVM arguments can be passed. For example, increase JVM’s maximum heap size
'-Xmx 8192m'
.name (str) – Sink name in the Streams context, defaults to a generated name.
- Returns
Stream termination.
- Return type
Deprecated since version 1.5.0: Use the
Write
.
-
streamsx.objectstorage.
write_parquet
(stream, bucket, endpoint, object, time_per_object=10.0, credentials=None, ssl_enabled=None, vm_arg=None, name=None)¶ Create objects in parquet format.
Adds a COS-Writer where each tuple on stream is written into an object in parquet format.
Example of creating objects in parquet format from a stream named ‘js’ in JSON format:
import streamsx.objectstorage as cos ... # JSON to tuple to_cos = js.map(schema='tuple<rstring a, int32 b>') cos.write(to_cos, bucket=bucket, endpoint=endpoint, object='/parquet/sample/hw%OBJECTNUM.parquet')
- Parameters
stream (streamsx.topology.topology.Stream) – Stream of tuples to be written to an object. Supports
streamsx.topology.schema.StreamSchema
(schema for a structured stream) as input. Attributes are mapped to parquet columns.bucket (str) – Bucket name. Bucket must have been created in your Cloud Object Storage service before using this function.
endpoint (str) –
Endpoint for Cloud Object Storage. Select the endpoint for your bucket location and resiliency: IBM® Cloud Object Storage Endpoints. Use a private enpoint when running in IBM cloud Streaming Analytics service.
object (str) – Name of the object to be created in your bucket. For example,
SAMPLE_%OBJECTNUM.parquet
, %OBJECTNUM is an object number, starting at 0. When a new object is opened for writing the number is incremented.time_per_object (int|float|datetime.timedelta) – Specifies the approximate time, in seconds, after which the current output object is closed and a new object is opened for writing.
credentials (str|dict) – Credentials as dict or name of the application configuration containing the credentials for Cloud Object Storage. When set to
None
the application configurationcos
is used.ssl_enabled (bool) – Set to
False
if you want to use HTTP instead of HTTPS. Per default SSL is enabled and HTTPS is used.vm_arg (str) – Arbitrary JVM arguments can be passed. For example, increase JVM’s maximum heap size
'-Xmx 8192m'
.name (str) – Sink name in the Streams context, defaults to a generated name.
- Returns
Stream termination.
- Return type
Deprecated since version 1.5.0: Use the
WriteParquet
.