streamsx.objectstorage package

Object Storage integration for IBM Streams

For details of implementing applications in Python for IBM Streams including IBM Cloud Pak for Data and the Streaming Analytics service running on IBM Cloud see:

Overview

IBM® Cloud Object Storage (COS) makes it possible to store practically limitless amounts of data, simply and cost effectively. It is commonly used for data archiving and backup, web and mobile applications, and as scalable, persistent storage for analytics.

Cloud Object Storage or any other S3 compatible object storage can be used.

This module allows a Streams application to create objects in parquet format WriteParquet or to write string messages with Write from a stream of tuples. Objects can be listed with Scan and read with Read.

Credentials

Select one of the following options to define your Cloud Object Storage credentials:

By default an application configuration named cos is used, a different configuration name can be specified using the credentials parameter to Write, WriteParquet, Scan or Read.

In addition to IAM token-based authentication, it is also possible to authenticate using a signature created from a pair of access and secret keys. Provide the HMAC keys with the credentials parameter as dictionary, for example:

credentials = {}
credentials['access_key_id'] = '7exampledonotusea6440da12685eee02'
credentials['secret_access_key'] = '8not8ed850cddbece407exampledonotuse43r2d2586'

Endpoints

It is required that you create a bucket before launching an application using this module.

When running the application in a Streaming Analytics service instance, it is recommended, for best performance, to create a bucket with:

  • Resiliency: regional

  • Location: Near your Streaming Analytics service, for example us-south

  • Storage class: Standard

With these setting above it is recommended to use the private endpoint for the US-South region:

endpoint='s3.private.us-south.cloud-object-storage.appdomain.cloud'

Note:

  • Use public endpoints to point your application that are hosted outside of the IBM cloud.

  • Use cross-region endpoints for buckets creation with cross-region resiliency.

  • Set the URL to object storage service with the endpoint parameter.

Find the list of endpoints and the endpoint description here: IBM® Cloud Object Storage Endpoints

To access any other Amazon S3 compatible object storage server you need set the endpoint parameter, for example the MinIO server running at https://play.min.io:9000 needs a value for the endpoint parameter like below:

endpoint='play.min.io:9000'

Sample

A simple hello world example of a Streams application writing string messages to an object. Scan for created object and read the content:

from streamsx.topology.topology import *
from streamsx.topology.schema import CommonSchema
from streamsx.topology.context import submit
import streamsx.objectstorage as cos

topo = Topology('ObjectStorageHelloWorld')

to_cos = topo.source(['Hello', 'World!'])
to_cos = to_cos.as_string()

# sample bucket with resiliency "regional" and location "us-south"
bucket = 'streamsx-py-sample'
# US-South region private endpoint
endpoint='s3.private.us-south.cloud-object-storage.appdomain.cloud'
# provide COS credentials as dict
credentials = json.loads(your_cos_json)

# Write a stream to COS
to_cos.for_each(cos.Write(bucket=bucket, endpoint=endpoint, object='/sample/hw%OBJECTNUM.txt', credentials=credentials))

scanned = topo.source(cos.Scan(bucket=bucket, endpoint=endpoint, directory='/sample', credentials=credentials))

# read text file line by line
r = scanned.map(cos.Read(bucket=bucket, endpoint=endpoint, credentials=credentials))

# print each line (tuple)
r.print()

submit('STREAMING_ANALYTICS_SERVICE', topo)
# Use for IBM Streams including IBM Cloud Pak for Data
# submit ('DISTRIBUTED', topo)
class streamsx.objectstorage.Scan(bucket, endpoint, pattern='.*', directory='/', credentials=None, protocol='cos', **options)

Bases: streamsx.topology.composite.Source

Scan a directory in a bucket for object names.

Scans an object storage directory and emits the names of new or modified objects that are found in the directory.

Example scanning a directory /sample for objects matching the pattern:

import streamsx.objectstorage as cos

scans = topo.source(cos.Scan(bucket='your-bucket-name', directory='/sample', pattern='SAMPLE_[0-9]*\.ascii\.text$'))

New in version 1.5.

bucket

Bucket name. Bucket must have been created in your Cloud Object Storage service before using this class.

Type

str

endpoint

Endpoint for Cloud Object Storage. Select the endpoint for your bucket location and resiliency: IBM® Cloud Object Storage Endpoints. Use a private enpoint when running in IBM cloud Streaming Analytics service.

Type

str

pattern

Limits the object names that are listed to the names that match the specified regular expression.

Type

str

directory

Specifies the name of the directory to be scanned. Any subdirectories are not scanned.

Type

str

credentials

Credentials as dict or name of the application configuration containing the credentials for Cloud Object Storage. When set to None the application configuration cos is used.

Type

str|dict

protocol

Protocol used by the S3 client, either cos (IAM and HMAC authentication supported) or s3a (requires HMAC authentication). Protocol s3a supports multipart upload. Protocol selection

Type

str

options

The additional optional parameters as variable keyword arguments.

Type

kwargs

Returns

Stream: Object names stream with schema CommonSchema.String.

populate(topology, name, **options)

Populate the topology with this composite source.

Parameters
  • topology – Topology containing the source.

  • name – Name passed into source.

  • **options – Future options passed to source.

Returns

Single stream representing the source.

Return type

Stream

property ssl_enabled

Set to False if you want to use HTTP instead of HTTPS. Per default SSL is enabled and HTTPS is used.

Type

bool

property vm_arg

Arbitrary JVM arguments can be passed. For example, increase JVM’s maximum heap size '-Xmx 8192m'.

Type

str

class streamsx.objectstorage.Read(bucket, endpoint, credentials=None, protocol='cos', **options)

Bases: streamsx.topology.composite.Map

Read an object in a bucket.

Reads the object specified in the input stream and emits content of the object. Expects CommonSchema.String in the input stream.

Example of reading object with the objects names from the scanned stream:

import streamsx.objectstorage as cos

r = scanned.map(cos.Read(bucket=bucket, endpoint=endpoint))

New in version 1.5.

bucket

Bucket name. Bucket must have been created in your Cloud Object Storage service before using this class.

Type

str

endpoint

Endpoint for Cloud Object Storage. Select the endpoint for your bucket location and resiliency: IBM® Cloud Object Storage Endpoints. Use a private enpoint when running in IBM cloud Streaming Analytics service.

Type

str

credentials

Credentials as dict or name of the application configuration containing the credentials for Cloud Object Storage. When set to None the application configuration cos is used.

Type

str|dict

protocol

Protocol used by the S3 client, either cos (IAM and HMAC authentication supported) or s3a (requires HMAC authentication). Protocol s3a supports multipart upload. Protocol selection

Type

str

options

The additional optional parameters as variable keyword arguments.

Type

kwargs

Returns

streamsx.topology.topology.Stream: Object content line by line with schema CommonSchema.String.

populate(topology, stream, schema, name, **options)

Populate the topology with this composite map transformation.

Parameters
  • topology – Topology containing the composite map.

  • stream – Stream to be transformed.

  • schema – Schema passed into map.

  • name – Name passed into map.

  • **options – Future options passed to map.

Returns

Single stream representing the transformation of stream.

Return type

Stream

property ssl_enabled

Set to False if you want to use HTTP instead of HTTPS. Per default SSL is enabled and HTTPS is used.

Type

bool

property vm_arg

Arbitrary JVM arguments can be passed. For example, increase JVM’s maximum heap size '-Xmx 8192m'.

Type

str

class streamsx.objectstorage.Write(bucket, endpoint, object, time_per_object=10.0, credentials=None, protocol='cos', **options)

Bases: streamsx.topology.composite.ForEach

Write strings to an object.

Adds a COS-Writer where each tuple on stream is written into an object.

Expects CommonSchema.String in the input stream.

Example of creating an object with two lines:

import streamsx.objectstorage as cos
to_cos = topo.source(['Hello', 'World!'])
to_cos = to_cos.as_string()
to_cos.for_each(cos.Write(bucket, endpoint, '/sample/hw%OBJECTNUM.txt'))

New in version 1.5.

bucket

Bucket name. Bucket must have been created in your Cloud Object Storage service before using this class.

Type

str

endpoint

Endpoint for Cloud Object Storage. Select the endpoint for your bucket location and resiliency: IBM® Cloud Object Storage Endpoints. Use a private enpoint when running in IBM cloud Streaming Analytics service.

Type

str

object

Name of the object to be created in your bucket. For example, SAMPLE_%OBJECTNUM.text, %OBJECTNUM is an object number, starting at 0. When a new object is opened for writing the number is incremented.

Type

str

time_per_object

Specifies the approximate time, in seconds, after which the current output object is closed and a new object is opened for writing.

Type

int|float|datetime.timedelta

credentials

Credentials as dict or name of the application configuration containing the credentials for Cloud Object Storage. When set to None the application configuration cos is used.

Type

str|dict

protocol

Protocol used by the S3 client, either cos (IAM and HMAC authentication supported) or s3a (requires HMAC authentication). Protocol s3a supports multipart upload. Protocol selection

Type

str

options

The additional optional parameters as variable keyword arguments.

Type

kwargs

Returns

streamsx.topology.topology.Sink: Stream termination.

property header

Specify the content of the header row. This header is added as first line in the object. Use this parameter when writing strings in CSV format and you like to query the objects with the IBM SQL Query service. By default no header row is generated.

Type

str

populate(topology, stream, name, **options) → streamsx.topology.topology.Sink

Populate the topology with this composite for each transformation.

Parameters
  • topology – Topology containing the composite map.

  • stream – Stream to be transformed.

  • name – Name passed into for_each.

  • **options – Future options passed to for_each.

Returns

Termination for this composite transformation of stream.

Return type

Sink

property ssl_enabled

Set to False if you want to use HTTP instead of HTTPS. Per default SSL is enabled and HTTPS is used.

Type

bool

property vm_arg

Arbitrary JVM arguments can be passed. For example, increase JVM’s maximum heap size '-Xmx 8192m'.

Type

str

class streamsx.objectstorage.WriteParquet(bucket, endpoint, object, time_per_object=10.0, credentials=None, protocol='cos', **options)

Bases: streamsx.topology.composite.ForEach

Create objects in parquet format.

Adds a COS-Writer where each tuple on stream is written into an object in parquet format.

Example of creating objects in parquet format from a stream named ‘js’ in JSON format:

import streamsx.objectstorage as cos
...
# JSON to tuple
to_cos = js.map(schema='tuple<rstring a, int32 b>')
to_cos.for_each(cos.write(bucket=bucket, endpoint=endpoint, object='/parquet/sample/hw%OBJECTNUM.parquet'))

New in version 1.5.

bucket

Bucket name. Bucket must have been created in your Cloud Object Storage service before using this class.

Type

str

endpoint

Endpoint for Cloud Object Storage. Select the endpoint for your bucket location and resiliency: IBM® Cloud Object Storage Endpoints. Use a private enpoint when running in IBM cloud Streaming Analytics service.

Type

str

object

Name of the object to be created in your bucket. For example, SAMPLE_%OBJECTNUM.text, %OBJECTNUM is an object number, starting at 0. When a new object is opened for writing the number is incremented.

Type

str

time_per_object

Specifies the approximate time, in seconds, after which the current output object is closed and a new object is opened for writing.

Type

int|float|datetime.timedelta

credentials

Credentials as dict or name of the application configuration containing the credentials for Cloud Object Storage. When set to None the application configuration cos is used.

Type

str|dict

protocol

Protocol used by the S3 client, either cos (IAM and HMAC authentication supported) or s3a (requires HMAC authentication). Protocol s3a supports multipart upload. Protocol selection

Type

str

options

The additional optional parameters as variable keyword arguments.

Type

kwargs

Returns

streamsx.topology.topology.Sink: Stream termination.

populate(topology, stream, name, **options) → streamsx.topology.topology.Sink

Populate the topology with this composite for each transformation.

Parameters
  • topology – Topology containing the composite map.

  • stream – Stream to be transformed.

  • name – Name passed into for_each.

  • **options – Future options passed to for_each.

Returns

Termination for this composite transformation of stream.

Return type

Sink

property ssl_enabled

Set to False if you want to use HTTP instead of HTTPS. Per default SSL is enabled and HTTPS is used.

Type

bool

property vm_arg

Arbitrary JVM arguments can be passed. For example, increase JVM’s maximum heap size '-Xmx 8192m'.

Type

str

streamsx.objectstorage.download_toolkit(url=None, target_dir=None)

Downloads the latest Objectstorage toolkit from GitHub.

Example for updating the Objectstorage toolkit for your topology with the latest toolkit from GitHub:

import streamsx.objectstorage as objectstorage
# download toolkit from GitHub
objectstorage_toolkit_location = objectstorage.download_toolkit()
# add the toolkit to topology
streamsx.spl.toolkit.add_toolkit(topology, objectstorage_toolkit_location)

Example for updating the topology with a specific version of the Objectstorage toolkit using a URL:

import streamsx.objectstorage as objectstorage
url1100 = 'https://github.com/IBMStreams/streamsx.objectstorage/releases/download/v1.10.0/streamsx.objectstorage.toolkits-1.10.0-20190730-1132.tgz'
objectstorage_toolkit_location = objectstorage.download_toolkit(url=url1100)
streamsx.spl.toolkit.add_toolkit(topology, objectstorage_toolkit_location)
Parameters
  • url (str) – Link to toolkit archive (*.tgz) to be downloaded. Use this parameter to download a specific version of the toolkit.

  • target_dir (str) – the directory where the toolkit is unpacked to. If a relative path is given, the path is appended to the system temporary directory, for example to /tmp on Unix/Linux systems. If target_dir is None a location relative to the system temporary directory is chosen.

Returns

the location of the downloaded Objectstorage toolkit

Return type

str

Note

This function requires an outgoing Internet connection

New in version 1.3.

streamsx.objectstorage.configure_connection(instance, name='cos', credentials=None)

Configures IBM Streams for a certain connection.

Creates an application configuration object containing the required properties with connection information.

Example for creating a configuration for a Streams instance with connection details:

from icpd_core import icpd_util
from streamsx.rest_primitives import Instance
import streamsx.objectstorage as cos

cfg = icpd_util.get_service_instance_details(name='your-streams-instance', instance_type='streams')
cfg[streamsx.topology.context.ConfigParams.SSL_VERIFY] = False
instance = Instance.of_service(cfg)
app_cfg = cos.configure_connection(instance, credentials='my_credentials_json')
Parameters
  • instance (streamsx.rest_primitives.Instance) – IBM Streams instance object.

  • name (str) – Name of the application configuration, default name is ‘cos’.

  • credentials (str|dict) – The service credentials for IBM Cloud Object Storage.

Returns

Name of the application configuration.

Warning

The function can be used only in IBM Cloud Pak for Data

New in version 1.1.

streamsx.objectstorage.scan(topology, bucket, endpoint, pattern='.*', directory='/', credentials=None, ssl_enabled=None, vm_arg=None, name=None)

Scan a directory in a bucket for object names.

Scans an object storage directory and emits the names of new or modified objects that are found in the directory.

Example scanning a directory /sample for objects matching the pattern:

import streamsx.objectstorage as cos

scans = cos.scan(topo, bucket='your-bucket-name', directory='/sample', pattern='SAMPLE_[0-9]*\.ascii\.text$')
Parameters
  • topology (Topology) – Topology to contain the returned stream.

  • bucket (str) – Bucket name. Bucket must have been created in your Cloud Object Storage service before using this function.

  • endpoint (str) –

    Endpoint for Cloud Object Storage. Select the endpoint for your bucket location and resiliency: IBM® Cloud Object Storage Endpoints. Use a private enpoint when running in IBM cloud Streaming Analytics service.

  • pattern (str) – Limits the object names that are listed to the names that match the specified regular expression.

  • directory (str) – Specifies the name of the directory to be scanned. Any subdirectories are not scanned.

  • credentials (str|dict) – Credentials as dict or name of the application configuration containing the credentials for Cloud Object Storage. When set to None the application configuration cos is used.

  • ssl_enabled (bool) – Set to False if you want to use HTTP instead of HTTPS. Per default SSL is enabled and HTTPS is used.

  • vm_arg (str) – Arbitrary JVM arguments can be passed. For example, increase JVM’s maximum heap size '-Xmx 8192m'.

  • name (str) – Sink name in the Streams context, defaults to a generated name.

Returns

Object names stream with schema CommonSchema.String.

Return type

Stream

Deprecated since version 1.5.0: Use the Scan.

streamsx.objectstorage.read(stream, bucket, endpoint, credentials=None, ssl_enabled=None, vm_arg=None, name=None)

Read an object in a bucket.

Reads the object specified in the input stream and emits content of the object.

Example of reading object with the objects names from the scanned stream:

import streamsx.objectstorage as cos

r = cos.read(scanned, bucket=bucket, endpoint=endpoint)
Parameters
  • stream (streamsx.topology.topology.Stream) – Stream of tuples with object names to be read. Expects CommonSchema.String in the input stream.

  • bucket (str) – Bucket name. Bucket must have been created in your Cloud Object Storage service before using this function.

  • endpoint (str) –

    Endpoint for Cloud Object Storage. Select the endpoint for your bucket location and resiliency: IBM® Cloud Object Storage Endpoints. Use a private enpoint when running in IBM cloud Streaming Analytics service.

  • credentials (str|dict) – Credentials as dict or name of the application configuration containing the credentials for Cloud Object Storage. When set to None the application configuration cos is used.

  • ssl_enabled (bool) – Set to False if you want to use HTTP instead of HTTPS. Per default SSL is enabled and HTTPS is used.

  • vm_arg (str) – Arbitrary JVM arguments can be passed. For example, increase JVM’s maximum heap size '-Xmx 8192m'.

  • name (str) – Sink name in the Streams context, defaults to a generated name.

Returns

Object content line by line with schema CommonSchema.String.

Return type

streamsx.topology.topology.Stream

Deprecated since version 1.5.0: Use the Read.

streamsx.objectstorage.write(stream, bucket, endpoint, object, time_per_object=10.0, header=None, credentials=None, ssl_enabled=None, vm_arg=None, name=None)

Write strings to an object.

Adds a COS-Writer where each tuple on stream is written into an object.

Example of creating an object with two lines:

import streamsx.objectstorage as cos
to_cos = topo.source(['Hello', 'World!'])
to_cos = to_cos.as_string()
cos.write(to_cos, bucket, endpoint, '/sample/hw%OBJECTNUM.txt')
Parameters
  • stream (streamsx.topology.topology.Stream) – Stream of tuples to be written to an object. Expects CommonSchema.String in the input stream.

  • bucket (str) – Bucket name. Bucket must have been created in your Cloud Object Storage service before using this function.

  • endpoint (str) –

    Endpoint for Cloud Object Storage. Select the endpoint for your bucket location and resiliency: IBM® Cloud Object Storage Endpoints. Use a private enpoint when running in IBM cloud Streaming Analytics service.

  • object (str) – Name of the object to be created in your bucket. For example, SAMPLE_%OBJECTNUM.text, %OBJECTNUM is an object number, starting at 0. When a new object is opened for writing the number is incremented.

  • time_per_object (int|float|datetime.timedelta) – Specifies the approximate time, in seconds, after which the current output object is closed and a new object is opened for writing.

  • header (str) – Specify the content of the header row. This header is added as first line in the object. Use this parameter when writing strings in CSV format and you like to query the objects with the IBM SQL Query service. By default no header row is generated.

  • credentials (str|dict) – Credentials as dict or name of the application configuration containing the credentials for Cloud Object Storage. When set to None the application configuration cos is used.

  • ssl_enabled (bool) – Set to False if you want to use HTTP instead of HTTPS. Per default SSL is enabled and HTTPS is used.

  • vm_arg (str) – Arbitrary JVM arguments can be passed. For example, increase JVM’s maximum heap size '-Xmx 8192m'.

  • name (str) – Sink name in the Streams context, defaults to a generated name.

Returns

Stream termination.

Return type

streamsx.topology.topology.Sink

Deprecated since version 1.5.0: Use the Write.

streamsx.objectstorage.write_parquet(stream, bucket, endpoint, object, time_per_object=10.0, credentials=None, ssl_enabled=None, vm_arg=None, name=None)

Create objects in parquet format.

Adds a COS-Writer where each tuple on stream is written into an object in parquet format.

Example of creating objects in parquet format from a stream named ‘js’ in JSON format:

import streamsx.objectstorage as cos
...
# JSON to tuple
to_cos = js.map(schema='tuple<rstring a, int32 b>')
cos.write(to_cos, bucket=bucket, endpoint=endpoint, object='/parquet/sample/hw%OBJECTNUM.parquet')
Parameters
  • stream (streamsx.topology.topology.Stream) – Stream of tuples to be written to an object. Supports streamsx.topology.schema.StreamSchema (schema for a structured stream) as input. Attributes are mapped to parquet columns.

  • bucket (str) – Bucket name. Bucket must have been created in your Cloud Object Storage service before using this function.

  • endpoint (str) –

    Endpoint for Cloud Object Storage. Select the endpoint for your bucket location and resiliency: IBM® Cloud Object Storage Endpoints. Use a private enpoint when running in IBM cloud Streaming Analytics service.

  • object (str) – Name of the object to be created in your bucket. For example, SAMPLE_%OBJECTNUM.parquet, %OBJECTNUM is an object number, starting at 0. When a new object is opened for writing the number is incremented.

  • time_per_object (int|float|datetime.timedelta) – Specifies the approximate time, in seconds, after which the current output object is closed and a new object is opened for writing.

  • credentials (str|dict) – Credentials as dict or name of the application configuration containing the credentials for Cloud Object Storage. When set to None the application configuration cos is used.

  • ssl_enabled (bool) – Set to False if you want to use HTTP instead of HTTPS. Per default SSL is enabled and HTTPS is used.

  • vm_arg (str) – Arbitrary JVM arguments can be passed. For example, increase JVM’s maximum heap size '-Xmx 8192m'.

  • name (str) – Sink name in the Streams context, defaults to a generated name.

Returns

Stream termination.

Return type

streamsx.topology.topology.Sink

Deprecated since version 1.5.0: Use the WriteParquet.

Indices and tables