streamsx.objectstorage package

Object Storage integration for IBM Streams

For details of implementing applications in Python for IBM Streams including IBM Cloud Pak for Data and the Streaming Analytics service running on IBM Cloud see:

Overview

IBM® Cloud Object Storage (COS) makes it possible to store practically limitless amounts of data, simply and cost effectively. It is commonly used for data archiving and backup, web and mobile applications, and as scalable, persistent storage for analytics.

Cloud Object Storage or any other S3 compatible object storage can be used.

This module allows a Streams application to create objects in parquet format write_parquet or to write string messages with write from a stream of tuples. Objects can be listed with scan and read with read.

Credentials

Select one of the following options to define your Cloud Object Storage credentials:

By default an application configuration named cos is used, a different configuration name can be specified using the credentials parameter to write(), write_parquet(), scan() or read().

In addition to IAM token-based authentication, it is also possible to authenticate using a signature created from a pair of access and secret keys. Provide the HMAC keys with the credentials parameter as dictionary, for example:

credentials = {}
credentials['access_key_id'] = '7exampledonotusea6440da12685eee02'
credentials['secret_access_key'] = '8not8ed850cddbece407exampledonotuse43r2d2586'

Endpoints

It is required that you create a bucket before launching an application using this module.

When running the application in a Streaming Analytics service instance, it is recommended, for best performance, to create a bucket with:

  • Resiliency: regional
  • Location: Near your Streaming Analytics service, for example us-south
  • Storage class: Standard

With these setting above it is recommended to use the private endpoint for the US-South region:

endpoint='s3.private.us-south.cloud-object-storage.appdomain.cloud'

Note:

  • Use public endpoints to point your application that are hosted outside of the IBM cloud.
  • Use cross-region endpoints for buckets creation with cross-region resiliency.
  • Set the URL to object storage service with the endpoint parameter.

Find the list of endpoints and the endpoint description here: IBM® Cloud Object Storage Endpoints

To access any other Amazon S3 compatible object storage server you need set the endpoint parameter, for example the MinIO server running at https://play.min.io:9000:

endpoint='play.min.io:9000'

Sample

A simple hello world example of a Streams application writing string messages to an object. Scan for created object and read the content:

from streamsx.topology.topology import *
from streamsx.topology.schema import CommonSchema
from streamsx.topology.context import submit
import streamsx.objectstorage as cos

topo = Topology('ObjectStorageHelloWorld')

to_cos = topo.source(['Hello', 'World!'])
to_cos = to_cos.as_string()

# sample bucket with resiliency "regional" and location "us-south"
bucket = 'streamsx-py-sample'
# US-South region private endpoint
endpoint='s3.private.us-south.cloud-object-storage.appdomain.cloud'

# Write a stream to COS
cos.write(to_cos, bucket, endpoint, '/sample/hw%OBJECTNUM.txt')

scanned = cos.scan(topo, bucket=bucket, endpoint=endpoint, directory='/sample')

# read text file line by line
r = cos.read(scanned, bucket=bucket, endpoint=endpoint)

# print each line (tuple)
r.print()

submit('STREAMING_ANALYTICS_SERVICE', topo)
# Use for IBM Streams including IBM Cloud Pak for Data
# submit ('DISTRIBUTED', topo)
streamsx.objectstorage.scan(topology, bucket, endpoint, pattern='.*', directory='/', credentials=None, ssl_enabled=None, vm_arg=None, name=None)

Scan a directory in a bucket for object names.

Scans an object storage directory and emits the names of new or modified objects that are found in the directory.

Example scanning a directory /sample for objects matching the pattern:

import streamsx.objectstorage as cos

scans = cos.scan(topo, bucket='your-bucket-name', directory='/sample', pattern='SAMPLE_[0-9]*\.ascii\.text$')
Parameters:
  • topology (Topology) – Topology to contain the returned stream.
  • bucket (str) – Bucket name. Bucket must have been created in your Cloud Object Storage service before using this function.
  • endpoint (str) –

    Endpoint for Cloud Object Storage. Select the endpoint for your bucket location and resiliency: IBM® Cloud Object Storage Endpoints. Use a private enpoint when running in IBM cloud Streaming Analytics service.

  • pattern (str) – Limits the object names that are listed to the names that match the specified regular expression.
  • directory (str) – Specifies the name of the directory to be scanned. Any subdirectories are not scanned.
  • credentials (str|dict) – Credentials in JSON or name of the application configuration containing the credentials for Cloud Object Storage. When set to None the application configuration cos is used.
  • ssl_enabled (bool) – Set to False if you want to use HTTP instead of HTTPS. Per default SSL is enabled and HTTPS is used.
  • vm_arg (str) – Arbitrary JVM arguments can be passed. For example, increase JVM’s maximum heap size '-Xmx 8192m'.
  • name (str) – Sink name in the Streams context, defaults to a generated name.
Returns:

Object names stream with schema CommonSchema.String.

Return type:

Stream

streamsx.objectstorage.read(stream, bucket, endpoint, credentials=None, ssl_enabled=None, vm_arg=None, name=None)

Read an object in a bucket.

Reads the object specified in the input stream and emits content of the object.

Example of reading object with the objects names from the scanned stream:

import streamsx.objectstorage as cos

r = cos.read(scanned, bucket=bucket, endpoint=endpoint)
Parameters:
  • stream (streamsx.topology.topology.Stream) – Stream of tuples with object names to be read. Expects CommonSchema.String in the input stream.
  • bucket (str) – Bucket name. Bucket must have been created in your Cloud Object Storage service before using this function.
  • endpoint (str) –

    Endpoint for Cloud Object Storage. Select the endpoint for your bucket location and resiliency: IBM® Cloud Object Storage Endpoints. Use a private enpoint when running in IBM cloud Streaming Analytics service.

  • credentials (str|dict) – Credentials in JSON or name of the application configuration containing the credentials for Cloud Object Storage. When set to None the application configuration cos is used.
  • ssl_enabled (bool) – Set to False if you want to use HTTP instead of HTTPS. Per default SSL is enabled and HTTPS is used.
  • vm_arg (str) – Arbitrary JVM arguments can be passed. For example, increase JVM’s maximum heap size '-Xmx 8192m'.
  • name (str) – Sink name in the Streams context, defaults to a generated name.
Returns:

Object content line by line with schema CommonSchema.String.

Return type:

streamsx.topology.topology.Stream

streamsx.objectstorage.write(stream, bucket, endpoint, object, time_per_object=10.0, header=None, credentials=None, ssl_enabled=None, vm_arg=None, name=None)

Write strings to an object.

Adds a COS-Writer where each tuple on stream is written into an object.

Example of creating an object with two lines:

import streamsx.objectstorage as cos
to_cos = topo.source(['Hello', 'World!'])
to_cos = to_cos.as_string()
cos.write(to_cos, bucket, endpoint, '/sample/hw%OBJECTNUM.txt')
Parameters:
  • stream (streamsx.topology.topology.Stream) – Stream of tuples to be written to an object. Expects CommonSchema.String in the input stream.
  • bucket (str) – Bucket name. Bucket must have been created in your Cloud Object Storage service before using this function.
  • endpoint (str) –

    Endpoint for Cloud Object Storage. Select the endpoint for your bucket location and resiliency: IBM® Cloud Object Storage Endpoints. Use a private enpoint when running in IBM cloud Streaming Analytics service.

  • object (str) – Name of the object to be created in your bucket. For example, SAMPLE_%OBJECTNUM.text, %OBJECTNUM is an object number, starting at 0. When a new object is opened for writing the number is incremented.
  • time_per_object (int|float|datetime.timedelta) – Specifies the approximate time, in seconds, after which the current output object is closed and a new object is opened for writing.
  • header (str) – Specify the content of the header row. This header is added as first line in the object. Use this parameter when writing strings in CSV format and you like to query the objects with the IBM SQL Query service. By default no header row is generated.
  • credentials (str|dict) – Credentials in JSON or name of the application configuration containing the credentials for Cloud Object Storage. When set to None the application configuration cos is used.
  • ssl_enabled (bool) – Set to False if you want to use HTTP instead of HTTPS. Per default SSL is enabled and HTTPS is used.
  • vm_arg (str) – Arbitrary JVM arguments can be passed. For example, increase JVM’s maximum heap size '-Xmx 8192m'.
  • name (str) – Sink name in the Streams context, defaults to a generated name.
Returns:

Stream termination.

Return type:

streamsx.topology.topology.Sink

streamsx.objectstorage.write_parquet(stream, bucket, endpoint, object, time_per_object=10.0, credentials=None, ssl_enabled=None, vm_arg=None, name=None)

Create objects in parquet format.

Adds a COS-Writer where each tuple on stream is written into an object in parquet format.

Example of creating objects in parquet format from a stream named ‘js’ in JSON format:

import streamsx.objectstorage as cos
...
# JSON to tuple
to_cos = js.map(schema='tuple<rstring a, int32 b>')
cos.write(to_cos, bucket=bucket, endpoint=endpoint, object='/parquet/sample/hw%OBJECTNUM.parquet')
Parameters:
  • stream (streamsx.topology.topology.Stream) – Stream of tuples to be written to an object. Supports streamsx.topology.schema.StreamSchema (schema for a structured stream) as input. Attributes are mapped to parquet columns.
  • bucket (str) – Bucket name. Bucket must have been created in your Cloud Object Storage service before using this function.
  • endpoint (str) –

    Endpoint for Cloud Object Storage. Select the endpoint for your bucket location and resiliency: IBM® Cloud Object Storage Endpoints. Use a private enpoint when running in IBM cloud Streaming Analytics service.

  • object (str) – Name of the object to be created in your bucket. For example, SAMPLE_%OBJECTNUM.parquet, %OBJECTNUM is an object number, starting at 0. When a new object is opened for writing the number is incremented.
  • time_per_object (int|float|datetime.timedelta) – Specifies the approximate time, in seconds, after which the current output object is closed and a new object is opened for writing.
  • credentials (str|dict) – Credentials in JSON or name of the application configuration containing the credentials for Cloud Object Storage. When set to None the application configuration cos is used.
  • ssl_enabled (bool) – Set to False if you want to use HTTP instead of HTTPS. Per default SSL is enabled and HTTPS is used.
  • vm_arg (str) – Arbitrary JVM arguments can be passed. For example, increase JVM’s maximum heap size '-Xmx 8192m'.
  • name (str) – Sink name in the Streams context, defaults to a generated name.
Returns:

Stream termination.

Return type:

streamsx.topology.topology.Sink

streamsx.objectstorage.download_toolkit(url=None, target_dir=None)

Downloads the latest Objectstorage toolkit from GitHub.

Example for updating the Objectstorage toolkit for your topology with the latest toolkit from GitHub:

import streamsx.objectstorage as objectstorage
# download toolkit from GitHub
objectstorage_toolkit_location = objectstorage.download_toolkit()
# add the toolkit to topology
streamsx.spl.toolkit.add_toolkit(topology, objectstorage_toolkit_location)

Example for updating the topology with a specific version of the Objectstorage toolkit using a URL:

import streamsx.objectstorage as objectstorage
url1100 = 'https://github.com/IBMStreams/streamsx.objectstorage/releases/download/v1.10.0/streamsx.objectstorage.toolkits-1.10.0-20190730-1132.tgz'
objectstorage_toolkit_location = objectstorage.download_toolkit(url=url1100)
streamsx.spl.toolkit.add_toolkit(topology, objectstorage_toolkit_location)
Parameters:
  • url (str) – Link to toolkit archive (*.tgz) to be downloaded. Use this parameter to download a specific version of the toolkit.
  • target_dir (str) – the directory where the toolkit is unpacked to. If a relative path is given, the path is appended to the system temporary directory, for example to /tmp on Unix/Linux systems. If target_dir is None a location relative to the system temporary directory is chosen.
Returns:

the location of the downloaded Objectstorage toolkit

Return type:

str

Note

This function requires an outgoing Internet connection

New in version 1.3.

streamsx.objectstorage.configure_connection(instance, name='cos', credentials=None)

Configures IBM Streams for a certain connection.

Creates an application configuration object containing the required properties with connection information.

Example for creating a configuration for a Streams instance with connection details:

streamsx.rest import Instance
import streamsx.topology.context
from icpd_core import icpd_util
import streamsx.objectstorage as cos

cfg = icpd_util.get_service_instance_details(name='your-streams-instance')
cfg[streamsx.topology.context.ConfigParams.SSL_VERIFY] = False
instance = Instance.of_service(cfg)
app_cfg = cos.configure_connection(instance, credentials='my_credentials_json')
Parameters:
  • instance (streamsx.rest_primitives.Instance) – IBM Streams instance object.
  • name (str) – Name of the application configuration, default name is ‘cos’.
  • credentials (str|dict) – The service credentials for IBM Cloud Object Storage.
Returns:

Name of the application configuration.

Warning

The function can be used only in IBM Cloud Pak for Data

New in version 1.1.

Indices and tables