Source code for xframes.aggregator_property_set

[docs]class AggregatorPropertySet(object): """ Store aggregator properties for one aggregator. XFrames comes with an assortment of aggregators, which are used in conjunction with xframes.groupby. You can write your own aggregators if you need to. This class is used to define custom aggregators. We will explain how by describing the builtin SUM function. You create a function that you use in the groupby call. This function takes arguments, for instance the column name. It returns an instance of AggregatorPropertySet and also a list of the arguments passed to it. Let's examine SUM:: def SUM(src_column): return AggregatorPropertySet(agg_sum, int, 'sum', 1), [src_column] In this example, SUM takes one argument, the name of the column to sum over. This argument is placed into an array and returned. The AggregatorPropertyset instance tells groupby how to do its work. This includes the function to use to perform the aggregation (agg_sum), the type of the column to be created (int), the default name of the column if no name is given ('sum') and the number of arguments expected (1). The agg_sum function does the actual aggregation. A simplified version of agg_sum is given below as an example. This function takes arguments: rows : A collection of rows. Each row is a dictionary, in the form passed to xframes.apply. cols : A list of the arguments. These are the values returned as the second member of SUM. Then the function (agg_sum) computes and returns the aggregated value. Here is the code for agg_sum:: def agg_sum(rows, cols): src_col = cols[0] total = 0 for row in rows: val = row[src_col] if not _is_missing(val): total += val return total If you call SUM('some-col') in a groupby statement, then SUM returns the AggregatorPropertySet as shown above, and ['some-col']. Then the groupby command is executed. For every distinct value of the grouped variable, it creates a row iterator and passes it to agg_sum, along with ['some-col']. The number of rows may be very large, so they are not all computed and passed as a list, but are provided by an iterator. Then agg_sum extracts the desired value from each row (row[src_col]) and sums up the values. The function agg_sum is executed in a spark worker node, as with the function used in xframes.apply, and the same restrictions apply. """
[docs] def __init__(self, agg_function, output_type, default_column_name, num_args): """ Create a new instance. Parameters ---------- agg_function: func(rows, cols) The agregator function. This is given a pyspark resultIterable produced by rdd.groupByKey and containing the rows matching a single group. It's responsibility is to compute and return the aggregate value for the group. output_type: type or int If a type is given, use that type as the output column type. If an integer is given, then the output type is the same as the input type of the column indexed by the integer. default_column_name: str The name of the aggregate column, if not supplied explicitly. num_args : int The number of arguments to the agg_function. """ self.agg_function = agg_function self.default_column_name = default_column_name self.output_type = output_type self.num_args = num_args
def get_output_type(self, input_type): candidate = self.output_type if isinstance(candidate, int): return input_type[candidate] return candidate