XStream

class xframes.XStream(impl=None, verbose=False)[source]

Provides for streams of XFrames.

An XStream represents a time sequence of XFrames. These are usually read from a live sources, and are processed in batches at a selectable interval.

XStream objects encapsulate the logic associated with the stream. The interface includes a number of class methods that act as factory methods, connecting up to external systems are returning an XStream.

XStream also includes a number of transformers, taking one or two XStreams and transforming them into another XStream.

Finally, XStream includes a number of sinks, that print, save, or expose the stream to external systems.

XFrame instances are created immediately (and can be used in Jupyter notebooks without restrictions). But data does not flow through the streams until the application calls “start”. This data flow happens in another thread, so your program gets control back immediately after calling “start”.

Methods that print data (such as print_frames) do not produce output until data starts flowing. Their output goes to stdout, along with anything that you main thread is doing, which works well in a notebook environment.

As with the other parts of XFrames (and Spark) many of the operators take functional arguments, containing the actions to be applied to the data structures. These functions run in a worker environment, not on the main thread (they run in another process, generally on another machine). Thus you will not see anythin that you write to stdout or stderr from these functions. If you know where to look, you can find this output in the Spark worker log files.

__init__(impl=None, verbose=False)[source]

Construct an XStream. You rarely construct an XStream directly, but through the factory methods.

Parameters:

verbose : bool, optional

If True, print progress messages.

See also

xframes.XStream.create_from_text_files
Create an XStream from text files.
xframes.XStream.create_from_socket_stream
Create an XStream from data received over a socket.
xframes.XStream.create_from_kafka_topic
Create from a kafka topic.
add_column(col, name='')[source]

Add a column to every XFrame in this XStream. The length of the new column must match the length of the existing XFrame. This operation returns new XFrames with the additional columns. If no name is given, a default name is chosen.

Parameters:

col : XArray

The ‘column’ of data to add.

name : string, optional

The name of the column. If no name is given, a default name is chosen.

Returns:

XStream of XFrame

A new XStream of XFrame with the new column.

See also

xframes.XFrame.add_column
Corresponding function on individual frame.
add_columns(cols, namelist=None)[source]

Adds multiple columns to this XFrame. The length of the new columns must match the length of the existing XFrame. This operation returns a new XFrame with the additional columns.

Parameters:

cols : list of XArray or XFrame

The columns to add. If cols is an XFrame, all columns in it are added.

namelist : list of string, optional

A list of column names. All names must be specified. Namelist is ignored if cols is an XFrame. If there are columns with duplicate names, they will be made unambiguous by adding .1 to the second copy.

Returns:

XStream of XFrame

The XStream with additional columns.

See also

xframes.XFrame.add_columns
Corresponding function on individual frame.
apply(fn, dtype)[source]

Transform each XFrame in an XStream to an XArray according to a specified function. Returns a XStream of XArray of dtype where each element in this XArray is transformed by fn(x) where x is a single row in the xframe represented as a dictionary. The fn should return exactly one value which can be cast into type dtype.

Parameters:

fn : function

The function to transform each row of the XFrame. The return type should be convertible to dtype if dtype is not None.

dtype : data type

The dtype of the new XArray. If None, the first 100 elements of the array are used to guess the target data type.

Returns:

XStream of XFrame

The stream of XFrame transformed by fn. Each element of the XArray is of type dtype

static await_termination(timeout=None)[source]

Wait for streaming execution to stop.

Parameters:

timeout : int, optional

The maximum time to wait, in seconds. If not given, wait indefinitely.

Returns:

status : boolean

True if the stream has stopped. False if the given timeout has expired and the timeout expired.

column_names()[source]

The name of each column in the XStream.

Returns:

list[string]

Column names of the XStream.

See also

xframes.XFrame.column_names
Corresponding function on individual frame.
column_types()[source]

The type of each column in the XFrame.

Returns:

list[type]

Column types of the XFrame.

See also

xframes.XFrame.column_types
Corresponding function on individual frame.
count_distinct(col)[source]

Counts the number of different values in a column of each XFrame in the stream.

Returns:

XStream of XFrame

Returns a new XStream consisting of one-row XFrames. Each XFrame has one column, “count” containing the number of rows in each consittuent XFrame.

static create_from_kafka_topic(topics, kafka_servers=None, kafka_params=None)[source]

Create XStream (stream of XFrames) from one or more kafka topics.

Records will be read from a kafka topic or topics. Each read delivers a group of messages, as controlled by the consumer params. These records are converted into an XFrame using the ingest function, and are processed sequentially.

Parameters:

topics : string | list

A single topic name, or a list of topic names. These are kafka topics that are used to get data from kafka.

kafka_servers : string | list, optional

A single kafka server or a list of kafka servers. Each server is of the form server-name:port. If no server is given, the server “localhost:9002” is used.

kafka_params : dict, optional

A dictionary of param name - value pairs. These are passed to kafka as consumer configuration parameters.. See kafka documentation http://kafka.apache.org/documentation.html#newconsumerconfigs for more details on kafka consumer configuration params. If no kafka params are supplied, the list of kafka servers specified in this function is passed as the “bootstrap.servers” param.

Returns:

XStream

An XStream (of XFrames) made up or rows read from the socket.

static create_from_socket_stream(hostname, port)[source]

Create XStream (stream of XFrames) from text gathered from a socket.

Parameters:

hostname : str

The data hostname.

port : str

The port to connect to.

Returns:

XStream

An XStream (of XFrames) made up or rows read from the socket.

static create_from_text_files(directory_path)[source]

Create XStream (stream of XFrames) from text gathered files in a directory.

Monitors the directory. As new files are added, they are read into XFrames and introduced to the stream.

Parameters:

directory_path : str

The directory where files are stored.

Returns:

XStream

An XStream (of XFrames) made up or rows read from files in the directory.

dtype()[source]

The type of each column in the XFrame.

Returns:

list[type]

Column types of the XFrame.

See also

xframes.XFrame.dtype
Corresponding function on individual frame.
dump_debug_info()[source]

Print information about the Spark RDD associated with this XFrame.

See also

xframes.XFrame.dump_debug_info
Corresponding function on individual frame.
filterby(values, col_name, exclude=False)[source]

Filter an XStream by values inside an iterable object. Result is an XStream that only includes (or excludes) the rows that have a column with the given column_name which holds one of the values in the given values XArray. If values is not an XArray, we attempt to convert it to one before filtering.

Parameters:

values : XArray | list |tuple | set | iterable | numpy.ndarray | pandas.Series | str | function

The values to use to filter the XFrame. The resulting XFrame will only include rows that have one of these values in the given column. If this is f function, it is called on each row and is passed the value in the column given by ‘column_name’. The result includes rows where the function returns True.

col_name : str | None

The column of the XFrame to match with the given values. This can only be None if the values argument is a function. In this case, the function is passed the whole row.

exclude : bool

If True, the result XFrame will contain all rows EXCEPT those that have one of values in column_name.

Returns:

XStream of XFrame

The filtered XStream.

See also

xframes.XFrame.filterby
Corresponding function on individual frame.
flat_map(column_names, fn, column_types='auto')[source]

Map each row of each XFrame to multiple rows in a new XFrame via a function.

The output of fn must have type list[list[...]]. Each inner list will be a single row in the new output, and the collection of these rows within the outer list make up the data for the output XFrame. All rows must have the same length and the same order of types to make sure the result columns are homogeneously typed. For example, if the first element emitted into the outer list by fn is [43, 2.3, 'string'], then all other elements emitted into the outer list must be a list with three elements, where the first is an int, second is a float, and third is a string. If column_types is not specified, the first 10 rows of the XFrame are used to determine the column types of the returned XFrame.

Parameters:

column_names : list[str]

The column names for the returned XFrame.

fn : function

The function that maps each of the xframe rows into multiple rows, returning list[list[...]]. All output rows must have the same length and order of types. The function is passed a dictionary of column name: value for each row.

column_types : list[type]

The column types of the output XFrame.

Returns:

XStream

A new XStream containing the results of the flat_map of the XFrames in the XStream.

See also

xframes.XFrame.flat_map
Corresponding function on individual frame.
classmethod from_dstream(dstream, col_names, column_types)[source]

Create a XStream from a spark DStream. The data should be:

Parameters:

dstream : spark.DStream

Data used to populate the XStream

col_names : list of string

The column names to use.

column_types : list of type

The column types to use.

Returns:

XStream

See also

from_rdd
Converts from a Spark RDD. Corresponding function on individual frame.
groupby(key_columns, operations=None, *args)[source]

Perform a group on the key_columns followed by aggregations on the columns listed in operations.

The operations parameter is a dictionary that indicates which aggregation operators to use and which columns to use them on. The available operators are SUM, MAX, MIN, COUNT, MEAN, VARIANCE, STD, CONCAT, SELECT_ONE, ARGMIN, ARGMAX, and QUANTILE. See aggregate for more detail on the aggregators.

Parameters:

key_columns : string | list[string]

Column(s) to group by. Key columns can be of any type other than dictionary.

operations : dict, list, optional

Dictionary of columns and aggregation operations. Each key is a output column name and each value is an aggregator. This can also be a list of aggregators, in which case column names will be automatically assigned.

*args

All other remaining arguments will be interpreted in the same way as the operations argument.

Returns:

XStream

An XStream (of XFrames) made up or rows read from the socket.

A new XFrame, with a column for each groupby column and each aggregation operation.

See also

xframes.XFrame.groupby
Corresponding function on individual frame.
lineage()[source]

The table lineage: the files that went into building this table.

Returns:

dict

  • key ‘table’: set[filename]
    The files that were used to build the XArray
  • key ‘column’: dict{col_name: set[filename]}
    The set of files that were used to build each column

See also

xframes.XFrame.lineage
Corresponding function on individual frame.
num_columns()[source]

The number of columns in this XFrame.

Returns:

int

Number of columns in the XFrame.

See also

xframes.XFrame.num_rows
Returns the number of rows.
num_rows()[source]

Counts the rows in each XFrame in the stream.

Returns:

stream of XFrames

Returns a new XStream consisting of one-row XFrames. Each XFrame has one column, “count” containing the number of rows in each consittuent XFrame.

See also

xframes.XFrame.num_rows
Corresponding function on individual frame.
print_frames(label=None, num_rows=10, num_columns=40, max_column_width=30, max_row_width=None, wrap_text=False, max_wrap_rows=2, footer=False)[source]

Print the first rows and columns of each XFrame in the XStream in human readable format.

Parameters:

num_rows : int, optional

Number of rows to print.

num_columns : int, optional

Number of columns to print.

max_column_width : int, optional

Maximum width of a column. Columns use fewer characters if possible.

max_row_width : int, optional

Maximum width of a printed row. Columns beyond this width wrap to a new line. max_row_width is automatically reset to be the larger of itself and max_column_width.

wrap_text : boolean, optional

Wrap the text within a cell. Defaults to False.

max_wrap_rows : int, optional

When wrapping is in effect, the maximum number of resulting rows for each cell before truncation takes place.

footer : bool, optional

True to pinrt a footer.

See also

xframes.XFrame.print_rows
Corresponding function on individual frame.
process_frames(frame_fn, init_fn=None, final_fn=None)[source]

Process the XFrames in an XStream using a given frame processing function.

This is an output operation, and forces the XFrames to be evaluated, for their side effects.

Parameters:

frame_fn : function

This function is called on each XFrame in the XStream. This function receives two parameters: a frame and an initiali value. The initial value is the return value resulting from calling the init_fn. The frame_fn need not return a value: the function is called for its side effects only.

init_fn : function, optional

The init_fn is a parameterless function, used to set up the environment for the frame function. Its value is passed to each invocation of the frame function. If no init_fn is passed, then each frame function will receive None as its second argument.

The rows are processed in parallel in groups on one or more worker machines. For each group, init_fn is called once, and its return value is passed to each row_fn. It could be used, for instance, to open a file or socket that is used by each of the row functions.

final_fn : function, optional

The final_fn is called after each group is processed. It is a function of one parameter, the return value of the initial function.

See also

xframes.XStream.process_rows
Processes individual rows and return a result.
process_rows(row_fn, init_fn=None, final_fn=None)[source]

Process the rows in an XStream of XFrames using a given row processing function.

This is an output operation, and forces the XFrames to be evaluated.

Parameters:

row_fn : function

This function is called on each row of each XFrame. This function receives two parameters: a row and an initiali value. The row is in the form of a dictionary of column-name: column_value pairs. The initial value is the return value resulting from calling the init_fn. The row_fn need not return a value: the function is called for its side effects only.

init_fn : function, optional

The init_fn is a parameterless function, used to set up the environment for the row function. Its value is passed to each invocation of the row function. If no init_fn is passed, then each row function will receive None as its second argument.

The rows are processed in parallel in groups on one or more worker machines. For each group, init_fn is called once, and its return value is passed to each row_fn. It could be used, for instance, to open a file or socket that is used by each of the row functions.

final_fn : function, optional

The final_fn is called after each group is processed. It is a function of one parameter, the return value of the initial function.

Returns:

XStream of XFrame

XStream of XFrames that have been processed by the row function.

See also

xframes.XStream.process_frames
Processes whole frames for their side effects only.
remove_column(name)[source]

Remove a column or columns from this XFrame. This operation returns a new XFrame with the given column removed.

Parameters:

name : string or list or iterable

The name of the column to remove. If a list or iterable is given, all the named columns are removed.

Returns:

XStream of XFrame

XStream of XFrames with given column removed.

See also

xframes.XFrame.remove_column
Corresponding function on individual frame.
remove_columns(column_names)[source]

Removes one or more columns from this XFrame. This operation returns a new XFrame with the given columns removed.

Parameters:

column_names : list or iterable

A list or iterable of the column names.

Returns:

XStream of XFrame

XStream of XFrames with given columns removed.

See also

xframes.XFrame.remove_columns
Corresponding function on individual frame.
rename(names)[source]

Rename the given columns. Names can be a dict specifying the old and new names. This changes the names of the columns given as the keys and replaces them with the names given as the values. Alternatively, names can be a list of the new column names. In this case it must be the same length as the number of columns. This operation returns a new XFrame with the given columns renamed.

Parameters:

names : dict [string, string] | list [ string ]

Dictionary of [old_name, new_name] or list of new names

Returns:

XStream of XFrame

XStream of XFrames with columns renamed.

See also

xframes.XFrame.rename
Corresponding function on individual frame.
reorder_columns(column_names)[source]

Reorder the columns in the table. This operation returns a new XFrame with the given columns reordered.

Parameters:

column_names : list of string

Names of the columns in desired order.

Returns:

XStream of XFrame

XStream of XFrames with reordered columns.

See also

xframes.XFrame.reorder_columns
Corresponding function on individual frame.
replace_column(name, col)[source]

Replace a column in this XFrame. The length of the new column must match the length of the existing XFrame. This operation returns a new XFrame with the replacement column.

Parameters:

name : string

The name of the column.

col : XArray

The ‘column’ to add.

Returns:

XStream of XFrame

A new XStream of XFrames with specified column replaced.

See also

xframes.XFrame.replace_column
Corresponding function on individual frame.
save(prefix, suffix=None)[source]

Save the XStream to a set of files in the file system.

This is an output operation, and forces the XFrames to be evaluated.

Parameters:

prefix : string

The base location to save each XFrame in the XStream. The filename of each files will be made as follows: prefix-TIME-IN-MS.suffix. The prefix should be either a local directory or a remote URL.

suffix : string, optional

The filename suffix. Defaults to no suffix.

See also

xframes.XFrame.save
Corresponding function on individual frame.
select_column(column_name)[source]

Return an XStream of XArray that corresponds with the given column name. Throws an exception if the column name is something other than a string or if the column name is not found.

Subscripting an XStream by a column name is equivalent to this function.

Parameters:

column_name : str

The column name.

Returns:

XStream of XFrame

The XStream of XArray that is referred by column_name.

See also

xframes.XFrame.select_column
Corresponding function on individual frame.
select_columns(keylist)[source]

Get XFrame composed only of the columns referred to in the given list of keys. Throws an exception if ANY of the keys are not in this XFrame or if keylist is anything other than a list of strings.

Parameters:

keylist : list[str]

The list of column names.

Returns:

XStream of XFrame

A new XStream that is made up of XFrames of the columns referred to in keylist from each XFrame. The order of the columns is preserved.

See also

xframes.XFrame.select_columns
Corresponding function on individual frame.
select_rows(xa)[source]

Selects rows of the XFrame where the XArray evaluates to True.

Parameters:

xa : XArray

Must be the same length as the XFrame. The filter values.

Returns:

XFrame

A new XFrame which contains the rows of the XFrame where the XArray is True. The truth test is the same as in python, so non-zero values are considered true.

static set_checkpoint(checkpoint_dir)[source]

Set the checkpoint director for storing state.

Parameters:

checkpoint_dir : string

Path to a directory for storing checkpoints

static start()[source]

Start the streaming pipeline running.

It will continue to run, processing XFrames, until stopped.

static stop(stop_spark_context=True, stop_gracefully=False)[source]

Stop the streaming pipeline.

Parameters:

stop_spark_context : boolean, optional

If True, also stop the streaming context. This releases resources, but it can not be started again. If False, then streaming may be started again. Defaults to True.

stop_gracefully : boolean, optional

If True, stops gracefully by letting all operations in progress finish before stopping. Defaults to false.

swap_columns(column_1, column_2)[source]

Swap the columns with the given names. This operation returns a new XFrame with the given columns swapped.

Parameters:

column_1 : string

Name of column to swap

column_2 : string

Name of other column to swap

Returns:

XStream of XFrame

XStream of XFrames with specified columns swapped.

See also

xframes.XFrame.swap_columns
Corresponding function on individual frame.
to_dstream()[source]

Convert the current XStream to a Spark DStream. The RDD contained in the DStream consists of tuples containing the column data. No conversion is necessary: the internal DStream is returned.

Returns:

spark.DStream

The spark DStream that is used to represent the XStream.

See also

xframes.XFrame.to_rdd
Converts to a Spark RDD. Corresponding function on individual frame.
transform_col(col, fn, dtype)[source]

Transform a single column according to a specified function. The remaining columns are not modified. The type of the transformed column types becomes dtype, with the new value being the result of fn(x), where x is a single row in the XFrame represented as a dictionary. The fn should return exactly one value which can be cast into type dtype.

Parameters:

col : string

The name of the column to transform.

fn : function, optional

The function to transform each row of the XFrame. The return type should be convertible to dtype If the function is not given, an identity function is used.

dtype : dtype, optional

The column data type of the new XArray. If None, the first 100 elements of the array are used to guess the target data type.

Returns:

XStream of XFrame

An XStream with the given column transformed by the function and cast to the given type.

update_state(fn, col_name, state_column_names, state_column_types)[source]

Update state for an XStream by using the state key in a given column.

The state is a key-value store. The key is made up of the values in the given column. For each XFrame in the XStream, all the rows with a given key are passed to the supplied function, which computes a new state.

Parameters:

fn : function

The given function is supplied with a list of rows in each XFrame that have the same value in the given column (the key), along with the current state. It returns the new state for that key. The function is: fn(rows, old_state) and returns new_state.

col_name : str | None

The column of the XStream to match supplies the state key.

Returns:

An XStream made up of XFrames representing the state.