giving.operators

Most of the operators in this page are defined in the rx package.

The following operators are added in giving in order to better handle the kind of data produced by give: affix(), augment(), as_(), average_and_variance(), bottom(), collect_between(), flatten(), format(), getitem(), group_wrap(), keep(), kfilter(), kmap(), kmerge(), kscan(), roll(), sole(), sort(), tag(), top(), variance(), where(), where_any(). wmap().

The following reduction operators exist in rx but work slightly differently in giving: average(), count(), min(), max(), sum(). The main difference is the scan argument, which reinterprets the reduction as either a scan (if True) or roll (if an integer).

giving.operators.affix(**streams)

Affix streams as extra keys on an existing stream of dicts.

The affixed streams should have the same length as the main one, so when affixing a reduction, one should set scan=True, or scan=n.

affix

Example

obs.where("x", "y").affix(
    minx=obs["x"].min(scan=True),
    xpy=obs["x", "y"].starmap(lambda x, y: x + y),
)

Or:

obs.where("x", "y").affix(
    # o is obs.where("x", "y")
    minx=lambda o: o["x"].min(scan=True),
    xpy=lambda o: o["x", "y"].starmap(lambda x, y: x + y),
)
Parameters

streams – A mapping from extra keys to add to the dicts to Observables that generate the values, or to functions of one argument that will be called with the main Observable.

giving.operators.all(predicate)

Determines whether all elements of an observable sequence satisfy a condition.

all

Example

>>> op = all(lambda value: value.length > 3)
Parameters

predicate (Callable[[~_T], bool]) – A function to test each element for a condition.

Return type

Callable[[Observable[~_T]], Observable[bool]]

Returns

An operator function that takes an observable source and returns an observable sequence containing a single element determining whether all elements in the source sequence pass the test in the specified predicate.

giving.operators.amb(right_source)

Propagates the observable sequence that reacts first.

amb

Example

>>> op = amb(ys)
Return type

Callable[[Observable[~_T]], Observable[~_T]]

Returns

An operator function that takes an observable source and returns an observable sequence that surfaces any of the given sequences, whichever reacted first.

giving.operators.as_(key)

Make a stream of dictionaries using the given key.

For example, [1, 2].as_("x") => [{"x": 1}, {"x": 2}]

as_

Parameters

key – Key under which to generate each element of the stream.

giving.operators.as_observable()

Hides the identity of an observable sequence.

Return type

Callable[[Observable[~_T]], Observable[~_T]]

Returns

An operator function that takes an observable source and returns and observable sequence that hides the identity of the source sequence.

giving.operators.augment(**fns)

Augment a stream of dicts with new keys.

Each key in fns should be associated to a function that will be called with the rest of the data as keyword arguments, so the argument names matter. The results overwrite the old data, if any keys are in common.

Note

The functions passed in fns will be wrapped with lax_function() if possible.

This means that these functions are considered to have an implicit **kwargs argument, so that any data they do not need is ignored.

# [{"x": 1, "y": 2}, ...] => [{"x": 1, "y": 2, "z": 3}, ...]
gv.augment(z=lambda x, y: x + y)

# [{"lo": 2, "hi": 3}, ...] => [{"lo": 2, "hi": 3, "higher": 9}, ...]
gv.augment(higher=lambda hi: hi * hi)
Parameters

fns – A map from new key names to the functions to compute them.

giving.operators.average(*, scan=False)

Produce the average of a stream of values.

average

average2

average3

Parameters
  • scan – If True, generate the current average on every element. If a number n, generate the average on the last n elements.

  • seed – First element of the reduction.

giving.operators.average_and_variance(*, scan=False)

Produce the average and variance of a stream of values.

Note

The variance for the first element is always None.

Parameters

scan – If True, generate the current average+variance on every element. If a number n, generate the average+variance on the last n elements.

giving.operators.bottom(n=10, key=None, reverse=False)

Return the bottom n values, sorted in ascending order.

bottom

bottom may emit less than n elements, if there are less than n elements in the orginal sequence.

Parameters
  • n – The number of bottom entries to return.

  • key – The comparison key function to use or a string.

giving.operators.buffer(boundaries)

Projects each element of an observable sequence into zero or more buffers.

buffer

Examples

>>> res = buffer(reactivex.interval(1.0))
Parameters

boundaries (Observable[Any]) – Observable sequence whose elements denote the creation and completion of buffers.

Return type

Callable[[Observable[~_T]], Observable[List[~_T]]]

Returns

A function that takes an observable source and returns an observable sequence of buffers.

giving.operators.buffer_toggle(openings, closing_mapper)

Projects each element of an observable sequence into zero or more buffers.

buffer_toggle

>>> res = buffer_toggle(reactivex.interval(0.5), lambda i: reactivex.timer(i))
Parameters
  • openings (Observable[Any]) – Observable sequence whose elements denote the creation of buffers.

  • closing_mapper (Callable[[Any], Observable[Any]]) – A function invoked to define the closing of each produced buffer. Value from openings Observable that initiated the associated buffer is provided as argument to the function. The buffer is closed when one item is emitted or when the observable completes.

Return type

Callable[[Observable[~_T]], Observable[List[~_T]]]

Returns

A function that takes an observable source and returns an observable sequence of windows.

giving.operators.buffer_when(closing_mapper)

Projects each element of an observable sequence into zero or more buffers.

buffer_when

Examples

>>> res = buffer_when(lambda: reactivex.timer(0.5))
Parameters

closing_mapper (Callable[[], Observable[Any]]) – A function invoked to define the closing of each produced buffer. A buffer is started when the previous one is closed, resulting in non-overlapping buffers. The buffer is closed when one item is emitted or when the observable completes.

Return type

Callable[[Observable[~_T]], Observable[List[~_T]]]

Returns

A function that takes an observable source and returns an observable sequence of windows.

giving.operators.buffer_with_count(count, skip=None)

Projects each element of an observable sequence into zero or more buffers which are produced based on element count information.

buffer_with_count

Examples

>>> res = buffer_with_count(10)(xs)
>>> res = buffer_with_count(10, 1)(xs)
Parameters
  • count (int) – Length of each buffer.

  • skip (Optional[int]) – [Optional] Number of elements to skip between creation of consecutive buffers. If not provided, defaults to the count.

Return type

Callable[[Observable[~_T]], Observable[List[~_T]]]

Returns

A function that takes an observable source and returns an observable sequence of buffers.

giving.operators.buffer_with_time(timespan, timeshift=None, scheduler=None)

Projects each element of an observable sequence into zero or more buffers which are produced based on timing information.

buffer_with_time

Examples

>>> # non-overlapping segments of 1 second
>>> res = buffer_with_time(1.0)
>>> # segments of 1 second with time shift 0.5 seconds
>>> res = buffer_with_time(1.0, 0.5)
Parameters
  • timespan (Union[timedelta, float]) – Length of each buffer (specified as a float denoting seconds or an instance of timedelta).

  • timeshift (Union[timedelta, float, None]) – [Optional] Interval between creation of consecutive buffers (specified as a float denoting seconds or an instance of timedelta). If not specified, the timeshift will be the same as the timespan argument, resulting in non-overlapping adjacent buffers.

  • scheduler (Optional[SchedulerBase]) – [Optional] Scheduler to run the timer on. If not specified, the timeout scheduler is used

Return type

Callable[[Observable[~_T]], Observable[List[~_T]]]

Returns

An operator function that takes an observable source and returns an observable sequence of buffers.

giving.operators.buffer_with_time_or_count(timespan, count, scheduler=None)

Projects each element of an observable sequence into a buffer that is completed when either it’s full or a given amount of time has elapsed.

buffer_with_time_or_count

Examples

>>> # 5s or 50 items in an array
>>> res = source._buffer_with_time_or_count(5.0, 50)
>>> # 5s or 50 items in an array
>>> res = source._buffer_with_time_or_count(5.0, 50, Scheduler.timeout)
Parameters
  • timespan (Union[timedelta, float]) – Maximum time length of a buffer.

  • count (int) – Maximum element count of a buffer.

  • scheduler (Optional[SchedulerBase]) – [Optional] Scheduler to run buffering timers on. If not specified, the timeout scheduler is used.

Return type

Callable[[Observable[~_T]], Observable[List[~_T]]]

Returns

An operator function that takes an observable source and returns an observable sequence of buffers.

giving.operators.catch(handler)

Continues an observable sequence that is terminated by an exception with the next observable sequence.

catch

Examples

>>> op = catch(ys)
>>> op = catch(lambda ex, src: ys(ex))
Parameters

handler (Union[Observable[~_T], Callable[[Exception, Observable[~_T]], Observable[~_T]]]) – Second observable sequence used to produce results when an error occurred in the first sequence, or an exception handler function that returns an observable sequence given the error and source observable that occurred in the first sequence.

Return type

Callable[[Observable[~_T]], Observable[~_T]]

Returns

A function taking an observable source and returns an observable sequence containing the first sequence’s elements, followed by the elements of the handler sequence in case an exception occurred.

giving.operators.collect_between(start, end, common=None)

Collect all data between the start and end keys.

Example

with given() as gv:
    gv.collect_between("A", "Z") >> (results := [])
    give(A=1)
    give(B=2)
    give(C=3, D=4, A=5)
    give(Z=6)
    assert results == [{"A": 5, "B": 2, "C": 3, "D": 4, "Z": 6}]
Parameters
  • start – The key that marks the beginning of the accumulation.

  • end – The key that marks the end of the accumulation.

  • common – A key that must be present in all data and must have the same value in the whole group.

giving.operators.combine_latest(*others)

Merges the specified observable sequences into one observable sequence by creating a tuple whenever any of the observable sequences produces an element.

combine_latest

Examples

>>> obs = combine_latest(other)
>>> obs = combine_latest(obs1, obs2, obs3)
Return type

Callable[[Observable[Any]], Observable[Any]]

Returns

An operator function that takes an observable sources and returns an observable sequence containing the result of combining elements of the sources into a tuple.

giving.operators.concat(*sources)

Concatenates all the observable sequences.

concat

Examples

>>> op = concat(xs, ys, zs)
Return type

Callable[[Observable[~_T]], Observable[~_T]]

Returns

An operator function that takes one or more observable sources and returns an observable sequence that contains the elements of each given sequence, in sequential order.

giving.operators.contains(value, comparer=None)

Determines whether an observable sequence contains a specified element with an optional equality comparer.

contains

Examples

>>> op = contains(42)
>>> op = contains({ "value": 42 }, lambda x, y: x["value"] == y["value"])
Parameters
  • value (~_T) – The value to locate in the source sequence.

  • comparer (Optional[Callable[[~_T, ~_T], bool]]) – [Optional] An equality comparer to compare elements.

Return type

Callable[[Observable[~_T]], Observable[bool]]

Returns

A function that takes a source observable that returns an observable sequence containing a single element determining whether the source sequence contains an element that has the specified value.

giving.operators.count(*, predicate=None, scan=False)

Count operator.

Returns an observable sequence containing a value that represents how many elements in the specified observable sequence satisfy a condition if provided, else the count of items.

Parameters
  • predicate – A function to test each element for a condition.

  • scan – If True, generate a running count, if a number n, count the number of elements/matches in the last n elements.

giving.operators.debounce(duetime, scheduler=None)

Ignores values from an observable sequence which are followed by another value before duetime.

debounce

Example

>>> res = debounce(5.0) # 5 seconds
Parameters
  • duetime (Union[timedelta, float]) – Duration of the throttle period for each value (specified as a float denoting seconds or an instance of timedelta).

  • scheduler (Optional[SchedulerBase]) – Scheduler to debounce values on.

Return type

Callable[[Observable[~_T]], Observable[~_T]]

Returns

An operator function that takes the source observable and returns the debounced observable sequence.

giving.operators.default_if_empty(default_value: reactivex.operators._T) Callable[[reactivex.observable.observable.Observable[reactivex.operators._T]], reactivex.observable.observable.Observable[reactivex.operators._T]]
giving.operators.default_if_empty() Callable[[reactivex.observable.observable.Observable[reactivex.operators._T]], reactivex.observable.observable.Observable[Optional[reactivex.operators._T]]]

Returns the elements of the specified sequence or the specified value in a singleton sequence if the sequence is empty.

default_if_empty

Examples

>>> res = obs = default_if_empty()
>>> obs = default_if_empty(False)
Parameters

default_value (Optional[Any]) – The value to return if the sequence is empty. If not provided, this defaults to None.

Return type

Callable[[Observable[Any]], Observable[Any]]

Returns

An operator function that takes an observable source and returns an observable sequence that contains the specified default value if the source is empty otherwise, the elements of the source.

giving.operators.delay(duetime, scheduler=None)

The delay operator.

delay

Time shifts the observable sequence by duetime. The relative time intervals between the values are preserved.

Examples

>>> res = delay(timedelta(seconds=10))
>>> res = delay(5.0)
Parameters
  • duetime (Union[timedelta, float]) – Relative time, specified as a float denoting seconds or an instance of timedelta, by which to shift the observable sequence.

  • scheduler (Optional[SchedulerBase]) – [Optional] Scheduler to run the delay timers on. If not specified, the timeout scheduler is used.

Return type

Callable[[Observable[~_T]], Observable[~_T]]

Returns

A partially applied operator function that takes the source observable and returns a time-shifted sequence.

giving.operators.delay_subscription(duetime, scheduler=None)

Time shifts the observable sequence by delaying the subscription.

delay_subscription

Example

>>> res = delay_subscription(5.0) # 5s
Parameters
  • duetime (Union[datetime, timedelta, float]) – Absolute or relative time to perform the subscription

  • at.

  • scheduler (Optional[SchedulerBase]) – Scheduler to delay subscription on.

Return type

Callable[[Observable[~_T]], Observable[~_T]]

Returns

A function that take a source observable and returns a time-shifted observable sequence.

giving.operators.delay_with_mapper(subscription_delay=None, delay_duration_mapper=None)

Time shifts the observable sequence based on a subscription delay and a delay mapper function for each element.

delay_with_mapper

Examples

>>> # with mapper only
>>> res = source.delay_with_mapper(lambda x: Scheduler.timer(5.0))
>>> # with delay and mapper
>>> res = source.delay_with_mapper(
    reactivex.timer(2.0), lambda x: reactivex.timer(x)
)
Parameters
  • subscription_delay (Union[Observable[Any], Callable[[Any], Observable[Any]], None]) – [Optional] Sequence indicating the delay for the subscription to the source.

  • delay_duration_mapper (Optional[Callable[[~_T], Observable[Any]]]) – [Optional] Selector function to retrieve a sequence indicating the delay for each given element.

Return type

Callable[[Observable[~_T]], Observable[~_T]]

Returns

A function that takes an observable source and returns a time-shifted observable sequence.

giving.operators.dematerialize()

Dematerialize operator.

Dematerializes the explicit notification values of an observable sequence as implicit notifications.

Return type

Callable[[Observable[Notification[~_T]]], Observable[~_T]]

Returns

An observable sequence exhibiting the behavior corresponding to the source sequence’s notification values.

giving.operators.distinct(key_mapper=None, comparer=None)

Returns an observable sequence that contains only distinct elements according to the key_mapper and the comparer. Usage of this operator should be considered carefully due to the maintenance of an internal lookup structure which can grow large.

distinct

Examples

>>> res = obs = xs.distinct()
>>> obs = xs.distinct(lambda x: x.id)
>>> obs = xs.distinct(lambda x: x.id, lambda a,b: a == b)
Parameters
  • key_mapper (Optional[Callable[[~_T], ~_TKey]]) – [Optional] A function to compute the comparison key for each element.

  • comparer (Optional[Callable[[~_TKey, ~_TKey], bool]]) – [Optional] Used to compare items in the collection.

Return type

Callable[[Observable[~_T]], Observable[~_T]]

Returns

An operator function that takes an observable source and returns an observable sequence only containing the distinct elements, based on a computed key value, from the source sequence.

giving.operators.distinct_until_changed(key_mapper=None, comparer=None)

Returns an observable sequence that contains only distinct contiguous elements according to the key_mapper and the comparer.

distinct_until_changed

Examples

>>> op = distinct_until_changed();
>>> op = distinct_until_changed(lambda x: x.id)
>>> op = distinct_until_changed(lambda x: x.id, lambda x, y: x == y)
Parameters
  • key_mapper (Optional[Callable[[~_T], ~_TKey]]) – [Optional] A function to compute the comparison key for each element. If not provided, it projects the value.

  • comparer (Optional[Callable[[~_TKey, ~_TKey], bool]]) – [Optional] Equality comparer for computed key values. If not provided, defaults to an equality comparer function.

Return type

Callable[[Observable[~_T]], Observable[~_T]]

Returns

An operator function that takes an observable source and returns an observable sequence only containing the distinct contiguous elements, based on a computed key value, from the source sequence.

giving.operators.do(observer)

Invokes an action for each element in the observable sequence and invokes an action on graceful or exceptional termination of the observable sequence. This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline.

do

>>> do(observer)
Parameters

observer (ObserverBase[~_T]) – Observer

Return type

Callable[[Observable[~_T]], Observable[~_T]]

Returns

An operator function that takes the source observable and returns the source sequence with the side-effecting behavior applied.

giving.operators.do_action(on_next=None, on_error=None, on_completed=None)

Invokes an action for each element in the observable sequence and invokes an action on graceful or exceptional termination of the observable sequence. This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline.

do_action

Examples

>>> do_action(send)
>>> do_action(on_next, on_error)
>>> do_action(on_next, on_error, on_completed)
Parameters
  • on_next (Optional[Callable[[~_T], None]]) – [Optional] Action to invoke for each element in the observable sequence.

  • on_error (Optional[Callable[[Exception], None]]) – [Optional] Action to invoke on exceptional termination of the observable sequence.

  • on_completed (Optional[Callable[[], None]]) – [Optional] Action to invoke on graceful termination of the observable sequence.

Return type

Callable[[Observable[~_T]], Observable[~_T]]

Returns

An operator function that takes the source observable an returns the source sequence with the side-effecting behavior applied.

giving.operators.do_while(condition)

Repeats source as long as condition holds emulating a do while loop.

do_while

Parameters

condition (Callable[[Observable[~_T]], bool]) – The condition which determines if the source will be repeated.

Return type

Callable[[Observable[~_T]], Observable[~_T]]

Returns

An observable sequence which is repeated as long as the condition holds.

giving.operators.element_at(index)

Returns the element at a specified index in a sequence.

element_at

Example

>>> res = source.element_at(5)
Parameters

index (int) – The zero-based index of the element to retrieve.

Return type

Callable[[Observable[~_T]], Observable[~_T]]

Returns

An operator function that takes an observable source and returns an observable sequence that produces the element at the specified position in the source sequence.

giving.operators.element_at_or_default(index, default_value=None)

Returns the element at a specified index in a sequence or a default value if the index is out of range.

element_at_or_default

Example

>>> res = source.element_at_or_default(5)
>>> res = source.element_at_or_default(5, 0)
Parameters
  • index (int) – The zero-based index of the element to retrieve.

  • default_value (Optional[~_T]) – [Optional] The default value if the index is outside the bounds of the source sequence.

Return type

Callable[[Observable[~_T]], Observable[~_T]]

Returns

A function that takes an observable source and returns an observable sequence that produces the element at the specified position in the source sequence, or a default value if the index is outside the bounds of the source sequence.

giving.operators.exclusive()

Performs a exclusive waiting for the first to finish before subscribing to another observable. Observables that come in between subscriptions will be dropped on the floor.

exclusive

Return type

Callable[[Observable[Observable[~_T]]], Observable[~_T]]

Returns

An exclusive observable with only the results that happen when subscribed.

giving.operators.expand(mapper)

Expands an observable sequence by recursively invoking mapper.

Parameters

mapper (Callable[[~_T], Observable[~_T]]) – Mapper function to invoke for each produced element, resulting in another sequence to which the mapper will be invoked recursively again.

Return type

Callable[[Observable[~_T]], Observable[~_T]]

Returns

An observable sequence containing all the elements produced

by the recursive expansion.

giving.operators.filter(predicate)

Filters the elements of an observable sequence based on a predicate.

filter

Example

>>> op = filter(lambda value: value < 10)
Parameters

predicate (Callable[[~_T], bool]) – A function to test each source element for a condition.

Return type

Callable[[Observable[~_T]], Observable[~_T]]

Returns

An operator function that takes an observable source and returns an observable sequence that contains elements from the input sequence that satisfy the condition.

giving.operators.filter_indexed(predicate_indexed=None)

Filters the elements of an observable sequence based on a predicate by incorporating the element’s index.

filter_indexed

Example

>>> op = filter_indexed(lambda value, index: (value + index) < 10)
Parameters

predicate – A function to test each source element for a condition; the second parameter of the function represents the index of the source element.

Return type

Callable[[Observable[~_T]], Observable[~_T]]

Returns

An operator function that takes an observable source and returns an observable sequence that contains elements from the input sequence that satisfy the condition.

giving.operators.finally_action(action)

Invokes a specified action after the source observable sequence terminates gracefully or exceptionally.

finally_action

Example

>>> res = finally_action(lambda: print('sequence ended')
Parameters

action (Callable[[], None]) – Action to invoke after the source observable sequence terminates.

Return type

Callable[[Observable[~_T]], Observable[~_T]]

Returns

An operator function that takes an observable source and returns an observable sequence with the action-invoking termination behavior applied.

giving.operators.find(predicate)

Searches for an element that matches the conditions defined by the specified predicate, and returns the first occurrence within the entire Observable sequence.

find

Parameters

predicate (Callable[[~_T, int, Observable[~_T]], bool]) – The predicate that defines the conditions of the element to search for.

Return type

Callable[[Observable[~_T]], Observable[Optional[~_T]]]

Returns

An operator function that takes an observable source and returns an observable sequence with the first element that matches the conditions defined by the specified predicate, if found otherwise, None.

giving.operators.find_index(predicate)

Searches for an element that matches the conditions defined by the specified predicate, and returns an Observable sequence with the zero-based index of the first occurrence within the entire Observable sequence.

find_index

Parameters

predicate (Callable[[~_T, int, Observable[~_T]], bool]) – The predicate that defines the conditions of the element to search for.

Return type

Callable[[Observable[~_T]], Observable[Optional[int]]]

Returns

An operator function that takes an observable source and returns an observable sequence with the zero-based index of the first occurrence of an element that matches the conditions defined by match, if found; otherwise, -1.

giving.operators.first(predicate=None)

Returns the first element of an observable sequence that satisfies the condition in the predicate if present else the first item in the sequence.

first

Examples

>>> res = res = first()
>>> res = res = first(lambda x: x > 3)
Parameters

predicate (Optional[Callable[[~_T], bool]]) – [Optional] A predicate function to evaluate for elements in the source sequence.

Return type

Callable[[Observable[~_T]], Observable[~_T]]

Returns

A function that takes an observable source and returns an observable sequence containing the first element in the observable sequence that satisfies the condition in the predicate if provided, else the first item in the sequence.

giving.operators.first_or_default(predicate=None, default_value=None)

Returns the first element of an observable sequence that satisfies the condition in the predicate, or a default value if no such element exists.

first_or_default

Examples

>>> res = first_or_default()
>>> res = first_or_default(lambda x: x > 3)
>>> res = first_or_default(lambda x: x > 3, 0)
>>> res = first_or_default(None, 0)
Parameters
  • predicate (Optional[Callable[[~_T], bool]]) – [optional] A predicate function to evaluate for elements in the source sequence.

  • default_value (Optional[~_T]) – [Optional] The default value if no such element exists. If not specified, defaults to None.

Return type

Callable[[Observable[~_T]], Observable[~_T]]

Returns

A function that takes an observable source and returns an observable sequence containing the first element in the observable sequence that satisfies the condition in the predicate, or a default value if no such element exists.

giving.operators.flat_map(mapper: Optional[Iterable[reactivex.operators._T2]] = None) Callable[[reactivex.observable.observable.Observable[Any]], reactivex.observable.observable.Observable[reactivex.operators._T2]]
giving.operators.flat_map(mapper: Optional[reactivex.observable.observable.Observable[reactivex.operators._T2]] = None) Callable[[reactivex.observable.observable.Observable[Any]], reactivex.observable.observable.Observable[reactivex.operators._T2]]
giving.operators.flat_map(mapper: Optional[Callable[[reactivex.operators._T1], Iterable[reactivex.operators._T2]]] = None) Callable[[reactivex.observable.observable.Observable[reactivex.operators._T1]], reactivex.observable.observable.Observable[reactivex.operators._T2]]
giving.operators.flat_map(mapper: Optional[Callable[[reactivex.operators._T1], reactivex.observable.observable.Observable[reactivex.operators._T2]]] = None) Callable[[reactivex.observable.observable.Observable[reactivex.operators._T1]], reactivex.observable.observable.Observable[reactivex.operators._T2]]

The flat_map operator.

flat_map

One of the Following: Projects each element of an observable sequence to an observable sequence and merges the resulting observable sequences into one observable sequence.

Example

>>> flat_map(lambda x: Observable.range(0, x))

Or: Projects each element of the source observable sequence to the other observable sequence and merges the resulting observable sequences into one observable sequence.

Example

>>> flat_map(Observable.of(1, 2, 3))
Parameters

mapper (Optional[Any]) – A transform function to apply to each element or an observable sequence to project each element from the source sequence onto.

Return type

Callable[[Observable[Any]], Observable[Any]]

Returns

An operator function that takes a source observable and returns an observable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence.

giving.operators.flat_map_indexed(mapper_indexed: Optional[Iterable[reactivex.operators._T2]] = None) Callable[[reactivex.observable.observable.Observable[Any]], reactivex.observable.observable.Observable[reactivex.operators._T2]]
giving.operators.flat_map_indexed(mapper_indexed: Optional[reactivex.observable.observable.Observable[reactivex.operators._T2]] = None) Callable[[reactivex.observable.observable.Observable[Any]], reactivex.observable.observable.Observable[reactivex.operators._T2]]
giving.operators.flat_map_indexed(mapper_indexed: Optional[Callable[[reactivex.operators._T1, int], Iterable[reactivex.operators._T2]]] = None) Callable[[reactivex.observable.observable.Observable[reactivex.operators._T1]], reactivex.observable.observable.Observable[reactivex.operators._T2]]
giving.operators.flat_map_indexed(mapper_indexed: Optional[Callable[[reactivex.operators._T1, int], reactivex.observable.observable.Observable[reactivex.operators._T2]]] = None) Callable[[reactivex.observable.observable.Observable[reactivex.operators._T1]], reactivex.observable.observable.Observable[reactivex.operators._T2]]

The flat_map_indexed operator.

One of the Following: Projects each element of an observable sequence to an observable sequence and merges the resulting observable sequences into one observable sequence.

flat_map_indexed

Example

>>> source.flat_map_indexed(lambda x, i: Observable.range(0, x))

Or: Projects each element of the source observable sequence to the other observable sequence and merges the resulting observable sequences into one observable sequence.

Example

>>> source.flat_map_indexed(Observable.of(1, 2, 3))
Parameters

mapper_indexed (Optional[Any]) – [Optional] A transform function to apply to each element or an observable sequence to project each element from the source sequence onto.

Return type

Callable[[Observable[Any]], Observable[Any]]

Returns

An operator function that takes an observable source and returns an observable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence.

giving.operators.flat_map_latest(mapper)

Projects each element of an observable sequence into a new sequence of observable sequences by incorporating the element’s index and then transforms an observable sequence of observable sequences into an observable sequence producing values only from the most recent observable sequence.

Parameters

mapper – A transform function to apply to each source element. The second parameter of the function represents the index of the source element.

Returns

An operator function that takes an observable source and returns an observable sequence whose elements are the result of invoking the transform function on each element of source producing an observable of Observable sequences and that at any point in time produces the elements of the most recent inner observable sequence that has been received.

giving.operators.flatten(mapper=None)

Flatten a sequence of sequences/Observables into a single sequence.

Without an argument, this is equivalent to flat_map(lambda x: x).

getitem

Parameters

mapper – A function applied to each element of the original sequence which should return a sequence to insert.

giving.operators.fork_join(*others)

Wait for observables to complete and then combine last values they emitted into a tuple. Whenever any of that observables completes without emitting any value, result sequence will complete at that moment as well.

fork_join

Examples

>>> res = fork_join(obs1)
>>> res = fork_join(obs1, obs2, obs3)
Return type

Callable[[Observable[Any]], Observable[Tuple[Any, …]]]

Returns

An operator function that takes an observable source and return an observable sequence containing the result of combining last element from each source in given sequence.

giving.operators.format(string, raw=False, skip_missing=False)

Format an object using a format string.

  • If the data is a dict, it is passed as *kwargs to str.format, unless raw=True

  • If the data is a tuple, it is passed as *args to str.format, unless raw=True

Parameters
  • string – The format string.

  • raw – Whether to pass the data as *args or **kwargs if it is a tuple or dict.

  • skip_missing – Whether to ignore KeyErrors due to missing entries in the format.

giving.operators.getitem(*keys, strict=False)

Extract one or more keys from a dictionary.

If more than one key is given, a stream of tuples is produced.

getitem

Parameters
  • keys – Names of the keys to index with.

  • strict – If true, every element in the stream is required to contains this key.

giving.operators.group_by(key_mapper, element_mapper=None, subject_mapper=None)

Groups the elements of an observable sequence according to a specified key mapper function and comparer and selects the resulting elements by using a specified function.

group_by

Examples

>>> group_by(lambda x: x.id)
>>> group_by(lambda x: x.id, lambda x: x.name)
>>> group_by(lambda x: x.id, lambda x: x.name, lambda: ReplaySubject())
Keyword Arguments
  • key_mapper – A function to extract the key for each element.

  • element_mapper – [Optional] A function to map each source element to an element in an observable group.

  • subject_mapper – A function that returns a subject used to initiate a grouped observable. Default mapper returns a Subject object.

Return type

Callable[[Observable[~_T]], Observable[GroupedObservable[~_TKey, ~_TValue]]]

Returns

An operator function that takes an observable source and returns a sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.

giving.operators.group_by_until(key_mapper, element_mapper, duration_mapper, subject_mapper=None)

Groups the elements of an observable sequence according to a specified key mapper function. A duration mapper function is used to control the lifetime of groups. When a group expires, it receives an OnCompleted notification. When a new element with the same key value as a reclaimed group occurs, the group will be reborn with a new lifetime request.

group_by_until

Examples

>>> group_by_until(lambda x: x.id, None, lambda : reactivex.never())
>>> group_by_until(
    lambda x: x.id, lambda x: x.name, lambda grp: reactivex.never()
)
>>> group_by_until(
    lambda x: x.id,
    lambda x: x.name,
    lambda grp: reactivex.never(),
    lambda: ReplaySubject()
)
Parameters
  • key_mapper (Callable[[~_T], ~_TKey]) – A function to extract the key for each element.

  • element_mapper (Optional[Callable[[~_T], ~_TValue]]) – A function to map each source element to an element in an observable group.

  • duration_mapper (Callable[[GroupedObservable[~_TKey, ~_TValue]], Observable[Any]]) – A function to signal the expiration of a group.

  • subject_mapper (Optional[Callable[[], Subject[~_TValue]]]) – A function that returns a subject used to initiate a grouped observable. Default mapper returns a Subject object.

Return type

Callable[[Observable[~_T]], Observable[GroupedObservable[~_TKey, ~_TValue]]]

Returns

An operator function that takes an observable source and returns a sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value. If a group’s lifetime expires, a new group with the same key value can be created once an element with such a key value is encountered.

giving.operators.group_join(right, left_duration_mapper, right_duration_mapper)

Correlates the elements of two sequences based on overlapping durations, and groups the results.

group_join

Parameters
  • right (Observable[~_TRight]) – The right observable sequence to join elements for.

  • left_duration_mapper (Callable[[~_TLeft], Observable[Any]]) – A function to select the duration (expressed as an observable sequence) of each element of the left observable sequence, used to determine overlap.

  • right_duration_mapper (Callable[[~_TRight], Observable[Any]]) – A function to select the duration (expressed as an observable sequence) of each element of the right observable sequence, used to determine overlap.

Return type

Callable[[Observable[~_TLeft]], Observable[Tuple[~_TLeft, Observable[~_TRight]]]]

Returns

An operator function that takes an observable source and returns an observable sequence that contains elements combined into a tuple from source elements that have an overlapping duration.

giving.operators.group_wrap(name, **conditions)

Return a stream of observables for wrapped groups.

In this schema, B and E correspond to the messages sent in the enter and exit phases respectively of the wrap() context manager.

group_wrap

Example

results = []

@obs.group_wrap().subscribe
def _(obs2):
    obs2["a"].sum() >> results
Parameters
  • name – Name of the wrap block to group on.

  • conditions – Maps a key to the value it must be associated to in the dictionary of the wrap statement, or to a predicate function on the value.

giving.operators.ignore_elements()

Ignores all elements in an observable sequence leaving only the termination messages.

ignore_elements

Return type

Callable[[Observable[~_T]], Observable[~_T]]

Returns

An operator function that takes an observable source and returns an empty observable sequence that signals termination, successful or exceptional, of the source sequence.

giving.operators.is_empty()

Determines whether an observable sequence is empty.

is_empty

Return type

Callable[[Observable[Any]], Observable[bool]]

Returns

An operator function that takes an observable source and returns an observable sequence containing a single element determining whether the source sequence is empty.

giving.operators.join(right, left_duration_mapper, right_duration_mapper)

Correlates the elements of two sequences based on overlapping durations.

join

Parameters
  • right (Observable[~_T2]) – The right observable sequence to join elements for.

  • left_duration_mapper (Callable[[Any], Observable[Any]]) – A function to select the duration (expressed as an observable sequence) of each element of the left observable sequence, used to determine overlap.

  • right_duration_mapper (Callable[[Any], Observable[Any]]) – A function to select the duration (expressed as an observable sequence) of each element of the right observable sequence, used to determine overlap.

Return type

Callable[[Observable[~_T1]], Observable[Tuple[~_T1, ~_T2]]]

Returns

An operator function that takes an observable source and returns an observable sequence that contains elements combined into a tuple from source elements that have an overlapping duration.

giving.operators.keep(*keys, **remap)

Keep certain dict keys and remap others.

keep

Parameters
  • keys – Keys that must be kept

  • remap – Keys that must be renamed

giving.operators.kfilter(fn)

Filter a stream of dictionaries.

Example

# [{"x": 1, "y": 2}, {"x": 100, "y": 50}] => [{"x": 100, "y": 50}]
gv.kfilter(lambda x, y: x > y)
Parameters

fn

A function that will be called for each element, passing the element using **kwargs.

Note

If the dict has elements that are not in the function’s arguments list and the function does not have a **kwargs argument, these elements will be dropped and no error will occur.

giving.operators.kmap(_fn=None, **_fns)

Map a dict, passing keyword arguments.

kmap either takes a positional function argument or keyword arguments serving to build a new dict.

Example

# [{"x": 1, "y": 2}] => [3]
gv.kmap(lambda x, y: x + y)

# [{"x": 1, "y": 2}] => [{"z": 3}]
gv.kmap(z=lambda x, y: x + y)
Parameters
  • _fn

    A function that will be called for each element, passing the element using **kwargs.

    Note

    If the dict has elements that are not in the function’s arguments list and the function does not have a **kwargs argument, these elements will be dropped and no error will occur.

  • _fns – Alternatively, build a new dict with each key associated to a function with the same interface as fn.

giving.operators.kmerge(scan=False)

Merge the dictionaries in the stream.

kmerge

kmerge2

giving.operators.kscan()

Alias for kmerge(scan=True).

giving.operators.last(predicate=None)

The last operator.

Returns the last element of an observable sequence that satisfies the condition in the predicate if specified, else the last element.

last

Examples

>>> op = last()
>>> op = last(lambda x: x > 3)
Parameters

predicate (Optional[Callable[[~_T], bool]]) – [Optional] A predicate function to evaluate for elements in the source sequence.

Return type

Callable[[Observable[~_T]], Observable[~_T]]

Returns

An operator function that takes an observable source and returns an observable sequence containing the last element in the observable sequence that satisfies the condition in the predicate.

giving.operators.last_or_default() Callable[[reactivex.observable.observable.Observable[reactivex.operators._T]], reactivex.observable.observable.Observable[Optional[reactivex.operators._T]]]
giving.operators.last_or_default(default_value: reactivex.operators._T) Callable[[reactivex.observable.observable.Observable[reactivex.operators._T]], reactivex.observable.observable.Observable[reactivex.operators._T]]
giving.operators.last_or_default(default_value: reactivex.operators._T, predicate: Callable[[reactivex.operators._T], bool]) Callable[[reactivex.observable.observable.Observable[reactivex.operators._T]], reactivex.observable.observable.Observable[reactivex.operators._T]]

The last_or_default operator.

Returns the last element of an observable sequence that satisfies the condition in the predicate, or a default value if no such element exists.

last

Examples

>>> res = last_or_default()
>>> res = last_or_default(lambda x: x > 3)
>>> res = last_or_default(lambda x: x > 3, 0)
>>> res = last_or_default(None, 0)
Parameters
  • predicate (Optional[Callable[[~_T], bool]]) – [Optional] A predicate function to evaluate for elements in the source sequence.

  • default_value (Optional[Any]) – [Optional] The default value if no such element exists. If not specified, defaults to None.

Return type

Callable[[Observable[~_T]], Observable[Any]]

Returns

An operator function that takes an observable source and returns an observable sequence containing the last element in the observable sequence that satisfies the condition in the predicate, or a default value if no such element exists.

giving.operators.map(mapper=None)

The map operator.

Project each element of an observable sequence into a new form.

map

Example

>>> map(lambda value: value * 10)
Parameters

mapper (Optional[Callable[[~_T1], ~_T2]]) – A transform function to apply to each source element.

Return type

Callable[[Observable[~_T1]], Observable[~_T2]]

Returns

A partially applied operator function that takes an observable source and returns an observable sequence whose elements are the result of invoking the transform function on each element of the source.

giving.operators.map_indexed(mapper_indexed=None)

Project each element of an observable sequence into a new form by incorporating the element’s index.

map_indexed

Example

>>> ret = map_indexed(lambda value, index: value * value + index)
Parameters

mapper_indexed (Optional[Callable[[~_T1, int], ~_T2]]) – A transform function to apply to each source element. The second parameter of the function represents the index of the source element.

Return type

Callable[[Observable[~_T1]], Observable[~_T2]]

Returns

A partially applied operator function that takes an observable source and returns an observable sequence whose elements are the result of invoking the transform function on each element of the source.

giving.operators.materialize()

Materializes the implicit notifications of an observable sequence as explicit notification values.

Return type

Callable[[Observable[~_T]], Observable[Notification[~_T]]]

Returns

An operator function that takes an observable source and returns an observable sequence containing the materialized notification values from the source sequence.

giving.operators.max(*, key=None, comparer=None, scan=False)

Produce the maximum of a stream of values.

maximum

Parameters
  • key – A key mapping function or a string.

  • comparer – A function of two elements that returns -1 if the first is smaller than the second, 0 if they are equal, 1 if the second is larger.

  • scan – If True, generate the current maximum on every element.

  • seed – First element of the reduction.

giving.operators.merge(*sources, max_concurrent=None)

Merges an observable sequence of observable sequences into an observable sequence, limiting the number of concurrent subscriptions to inner sequences. Or merges two observable sequences into a single observable sequence.

merge

Examples

>>> op = merge(max_concurrent=1)
>>> op = merge(other_source)
Parameters

max_concurrent (Optional[int]) – [Optional] Maximum number of inner observable sequences being subscribed to concurrently or the second observable sequence.

Return type

Callable[[Observable[Any]], Observable[Any]]

Returns

An operator function that takes an observable source and returns the observable sequence that merges the elements of the inner sequences.

giving.operators.merge_all()

The merge_all operator.

Merges an observable sequence of observable sequences into an observable sequence.

merge_all

Return type

Callable[[Observable[Observable[~_T]]], Observable[~_T]]

Returns

A partially applied operator function that takes an observable source and returns the observable sequence that merges the elements of the inner sequences.

giving.operators.min(*, key=None, comparer=None, scan=False)

Produce the minimum of a stream of values.

minimum

Parameters
  • key – A key mapping function or a string.

  • comparer – A function of two elements that returns -1 if the first is smaller than the second, 0 if they are equal, 1 if the second is larger.

  • scan – If True, generate the current minimum on every element.

  • seed – First element of the reduction.

giving.operators.multicast() Callable[[reactivex.observable.observable.Observable[reactivex.operators._T]], reactivex.observable.connectableobservable.ConnectableObservable[reactivex.operators._T]]
giving.operators.multicast(subject: reactivex.abc.subject.SubjectBase[reactivex.operators._T]) Callable[[reactivex.observable.observable.Observable[reactivex.operators._T]], reactivex.observable.connectableobservable.ConnectableObservable[reactivex.operators._T]]
giving.operators.multicast(*, subject_factory: Callable[[Optional[reactivex.abc.scheduler.SchedulerBase]], reactivex.abc.subject.SubjectBase[reactivex.operators._T]], mapper: Optional[Callable[[reactivex.observable.observable.Observable[reactivex.operators._T]], reactivex.observable.observable.Observable[reactivex.operators._T2]]] = 'None') Callable[[reactivex.observable.observable.Observable[reactivex.operators._T]], reactivex.observable.observable.Observable[reactivex.operators._T2]]

Multicasts the source sequence notifications through an instantiated subject into all uses of the sequence within a mapper function. Each subscription to the resulting sequence causes a separate multicast invocation, exposing the sequence resulting from the mapper function’s invocation. For specializations with fixed subject types, see Publish, PublishLast, and Replay.

Examples

>>> res = multicast(observable)
>>> res = multicast(
    subject_factory=lambda scheduler: Subject(), mapper=lambda x: x
)
Parameters
  • subject_factory (Optional[Callable[[Optional[SchedulerBase]], SubjectBase[~_T]]]) – Factory function to create an intermediate subject through which the source sequence’s elements will be multicast to the mapper function.

  • subject (Optional[SubjectBase[~_T]]) – Subject to push source elements into.

  • mapper (Optional[Callable[[Observable[~_T]], Observable[~_T2]]]) – [Optional] Mapper function which can use the multicasted source sequence subject to the policies enforced by the created subject. Specified only if subject_factory” is a factory function.

Return type

Callable[[Observable[~_T]], Union[Observable[~_T2], ConnectableObservable[~_T]]]

Returns

An operator function that takes an observable source and returns an observable sequence that contains the elements of a sequence produced by multicasting the source sequence within a mapper function.

giving.operators.observe_on(scheduler)

Wraps the source sequence in order to run its observer callbacks on the specified scheduler.

Parameters

scheduler (SchedulerBase) – Scheduler to notify observers on.

This only invokes observer callbacks on a scheduler. In case the subscription and/or unsubscription actions have side-effects that require to be run on a scheduler, use subscribe_on.

Return type

Callable[[Observable[~_T]], Observable[~_T]]

Returns

An operator function that takes an observable source and returns the source sequence whose observations happen on the specified scheduler.

giving.operators.on_error_resume_next(second)

Continues an observable sequence that is terminated normally or by an exception with the next observable sequence.

on_error

Keyword Arguments

second – Second observable sequence used to produce results after the first sequence terminates.

Return type

Callable[[Observable[~_T]], Observable[~_T]]

Returns

An observable sequence that concatenates the first and second sequence, even if the first sequence terminates exceptionally.

giving.operators.pairwise()

The pairwise operator.

Returns a new observable that triggers on the second and subsequent triggerings of the input observable. The Nth triggering of the input observable passes the arguments from the N-1th and Nth triggering as a pair. The argument passed to the N-1th triggering is held in hidden internal state until the Nth triggering occurs.

Return type

Callable[[Observable[~_T]], Observable[Tuple[~_T, ~_T]]]

Returns

An operator function that takes an observable source and returns an observable that triggers on successive pairs of observations from the input observable as an array.

giving.operators.partition(predicate)

Returns two observables which partition the observations of the source by the given function. The first will trigger observations for those values for which the predicate returns true. The second will trigger observations for those values where the predicate returns false. The predicate is executed once for each subscribed observer. Both also propagate all error observations arising from the source and each completes when the source completes.

partition

Parameters
  • predicate (Callable[[~_T], bool]) – The function to determine which output Observable

  • observation. (will trigger a particular) –

Return type

Callable[[Observable[~_T]], List[Observable[~_T]]]

Returns

An operator function that takes an observable source and returns a list of observables. The first triggers when the predicate returns True, and the second triggers when the predicate returns False.

giving.operators.partition_indexed(predicate_indexed)

The indexed partition operator.

Returns two observables which partition the observations of the source by the given function. The first will trigger observations for those values for which the predicate returns true. The second will trigger observations for those values where the predicate returns false. The predicate is executed once for each subscribed observer. Both also propagate all error observations arising from the source and each completes when the source completes.

partition_indexed

Parameters
  • predicate – The function to determine which output Observable

  • observation. (will trigger a particular) –

Return type

Callable[[Observable[~_T]], List[Observable[~_T]]]

Returns

A list of observables. The first triggers when the predicate returns True, and the second triggers when the predicate returns False.

giving.operators.pluck(key)

Retrieves the value of a specified key using dict-like access (as in element[key]) from all elements in the Observable sequence.

To pluck an attribute of each element, use pluck_attr.

Parameters

key (~_TKey) – The key to pluck.

Return type

Callable[[Observable[Dict[~_TKey, ~_TValue]]], Observable[~_TValue]]

Returns

An operator function that takes an observable source and returns a new observable sequence of key values.

giving.operators.pluck_attr(prop)

Retrieves the value of a specified property (using getattr) from all elements in the Observable sequence.

To pluck values using dict-like access (as in element[key]) on each element, use pluck.

Parameters

property – The property to pluck.

Return type

Callable[[Observable[Any]], Observable[Any]]

Returns

An operator function that takes an observable source and returns a new observable sequence of property values.

giving.operators.publish() Callable[[reactivex.observable.observable.Observable[reactivex.operators._T1]], reactivex.observable.connectableobservable.ConnectableObservable[reactivex.operators._T1]]
giving.operators.publish(mapper: Callable[[reactivex.observable.observable.Observable[reactivex.operators._T1]], reactivex.observable.observable.Observable[reactivex.operators._T2]]) Callable[[reactivex.observable.observable.Observable[reactivex.operators._T1]], reactivex.observable.observable.Observable[reactivex.operators._T2]]

The publish operator.

Returns an observable sequence that is the result of invoking the mapper on a connectable observable sequence that shares a single subscription to the underlying sequence. This operator is a specialization of Multicast using a regular Subject.

Example

>>> res = publish()
>>> res = publish(lambda x: x)
Parameters

mapper (Optional[Callable[[Observable[~_T1]], Observable[~_T2]]]) – [Optional] Selector function which can use the multicasted source sequence as many times as needed, without causing multiple subscriptions to the source sequence. Subscribers to the given source will receive all notifications of the source from the time of the subscription on.

Return type

Callable[[Observable[~_T1]], Union[Observable[~_T2], ConnectableObservable[~_T1]]]

Returns

An operator function that takes an observable source and returns an observable sequence that contains the elements of a sequence produced by multicasting the source sequence within a mapper function.

giving.operators.publish_value(initial_value: reactivex.operators._T1) Callable[[reactivex.observable.observable.Observable[reactivex.operators._T1]], reactivex.observable.connectableobservable.ConnectableObservable[reactivex.operators._T1]]
giving.operators.publish_value(initial_value: reactivex.operators._T1, mapper: Callable[[reactivex.observable.observable.Observable[reactivex.operators._T1]], reactivex.observable.observable.Observable[reactivex.operators._T2]]) Callable[[reactivex.observable.observable.Observable[reactivex.operators._T1]], reactivex.observable.observable.Observable[reactivex.operators._T2]]

Returns an observable sequence that is the result of invoking the mapper on a connectable observable sequence that shares a single subscription to the underlying sequence and starts with initial_value.

This operator is a specialization of Multicast using a BehaviorSubject.

Examples

>>> res = source.publish_value(42)
>>> res = source.publish_value(42, lambda x: x.map(lambda y: y * y))
Parameters
  • initial_value (~_T1) – Initial value received by observers upon subscription.

  • mapper (Optional[Callable[[Observable[~_T1]], Observable[~_T2]]]) – [Optional] Optional mapper function which can use the multicasted source sequence as many times as needed, without causing multiple subscriptions to the source sequence. Subscribers to the given source will receive immediately receive the initial value, followed by all notifications of the source from the time of the subscription on.

Return type

Callable[[Observable[~_T1]], Union[Observable[~_T2], ConnectableObservable[~_T1]]]

Returns

An operator function that takes an observable source and returns an observable sequence that contains the elements of a sequence produced by multicasting the source sequence within a mapper function.

giving.operators.reduce(accumulator: Callable[[reactivex.operators._TState, reactivex.operators._T], reactivex.operators._TState]) Callable[[reactivex.observable.observable.Observable[reactivex.operators._T]], reactivex.observable.observable.Observable[reactivex.operators._T]]
giving.operators.reduce(accumulator: Callable[[reactivex.operators._TState, reactivex.operators._T], reactivex.operators._TState], seed: reactivex.operators._TState) Callable[[reactivex.observable.observable.Observable[reactivex.operators._T]], reactivex.observable.observable.Observable[reactivex.operators._TState]]

The reduce operator.

Applies an accumulator function over an observable sequence, returning the result of the aggregation as a single element in the result sequence. The specified seed value is used as the initial accumulator value.

For aggregation behavior with incremental intermediate results, see scan.

reduce

Examples

>>> res = reduce(lambda acc, x: acc + x)
>>> res = reduce(lambda acc, x: acc + x, 0)
Parameters
  • accumulator (Callable[[~_TState, ~_T], ~_TState]) – An accumulator function to be invoked on each element.

  • seed (Union[~_TState, Type[NotSet]]) – Optional initial accumulator value.

Return type

Callable[[Observable[~_T]], Observable[Any]]

Returns

A partially applied operator function that takes an observable source and returns an observable sequence containing a single element with the final accumulator value.

giving.operators.ref_count()

Returns an observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.

Return type

Callable[[ConnectableObservable[~_T]], Observable[~_T]]

giving.operators.repeat(repeat_count=None)

Repeats the observable sequence a specified number of times. If the repeat count is not specified, the sequence repeats indefinitely.

repeat

Examples

>>> repeated = repeat()
>>> repeated = repeat(42)
Parameters
  • repeat_count (Optional[int]) – Number of times to repeat the sequence. If not

  • provided

  • indefinitely. (repeats the sequence) –

Return type

Callable[[Observable[~_T]], Observable[~_T]]

Returns

An operator function that takes an observable sources and returns an observable sequence producing the elements of the given sequence repeatedly.

giving.operators.replay(buffer_size: Optional[int] = None, window: Optional[Union[datetime.timedelta, float]] = None, *, scheduler: Optional[reactivex.abc.scheduler.SchedulerBase] = 'None') Callable[[reactivex.observable.observable.Observable[reactivex.operators._T1]], reactivex.observable.connectableobservable.ConnectableObservable[reactivex.operators._T1]]
giving.operators.replay(buffer_size: Optional[int] = None, window: Optional[Union[datetime.timedelta, float]] = None, *, mapper: Optional[Callable[[reactivex.observable.observable.Observable[reactivex.operators._T1]], reactivex.observable.observable.Observable[reactivex.operators._T2]]], scheduler: Optional[reactivex.abc.scheduler.SchedulerBase] = 'None') Callable[[reactivex.observable.observable.Observable[reactivex.operators._T1]], reactivex.observable.observable.Observable[reactivex.operators._T2]]

The replay operator.

Returns an observable sequence that is the result of invoking the mapper on a connectable observable sequence that shares a single subscription to the underlying sequence replaying notifications subject to a maximum time length for the replay buffer.

This operator is a specialization of Multicast using a ReplaySubject.

Examples

>>> res = replay(buffer_size=3)
>>> res = replay(buffer_size=3, window=0.5)
>>> res = replay(None, 3, 0.5)
>>> res = replay(lambda x: x.take(6).repeat(), 3, 0.5)
Parameters
  • mapper (Optional[Callable[[Observable[~_T1]], Observable[~_T2]]]) – [Optional] Selector function which can use the multicasted source sequence as many times as needed, without causing multiple subscriptions to the source sequence. Subscribers to the given source will receive all the notifications of the source subject to the specified replay buffer trimming policy.

  • buffer_size (Optional[int]) – [Optional] Maximum element count of the replay buffer.

  • window (Union[timedelta, float, None]) – [Optional] Maximum time length of the replay buffer.

  • scheduler (Optional[SchedulerBase]) – [Optional] Scheduler the observers are invoked on.

Return type

Callable[[Observable[~_T1]], Union[Observable[~_T2], ConnectableObservable[~_T1]]]

Returns

An operator function that takes an observable source and returns an observable sequence that contains the elements of a sequence produced by multicasting the source sequence within a mapper function.

giving.operators.retry(retry_count=None)

Repeats the source observable sequence the specified number of times or until it successfully terminates. If the retry count is not specified, it retries indefinitely.

Examples

>>> retried = retry()
>>> retried = retry(42)
Parameters

retry_count (Optional[int]) – [Optional] Number of times to retry the sequence. If not provided, retry the sequence indefinitely.

Return type

Callable[[Observable[~_T]], Observable[~_T]]

Returns

An observable sequence producing the elements of the given sequence repeatedly until it terminates successfully.

giving.operators.roll(n, reduce=None, seed=<class 'reactivex.internal.utils.NotSet'>)

Group the last n elements, giving a sequence of overlapping sequences.

For example, this can be used to compute a rolling average of the 100 last elements (however, average(scan=100) is better optimized).

op.roll(100, lambda xs: sum(xs) / len(xs))

roll

Parameters
  • n – The number of elements to group together.

  • reduce

    A function to reduce the group.

    It should take five arguments:

    • last: The last result.

    • add: The element that was just added. It is the last element in the elements list.

    • drop: The element that was dropped to make room for the added one. It is not in the elements argument. If the list of elements is not yet of size n, there is no need to drop anything and drop is None.

    • last_size: The window size on the last invocation.

    • current_size: The window size on this invocation.

    Defaults to returning the deque of elements directly.

    Note

    The same reference is returned each time in order to save memory, so it should be processed immediately.

  • seed – The first element of the reduction.

giving.operators.sample(sampler, scheduler=None)

Samples the observable sequence at each interval.

sample

Examples

>>> res = sample(sample_observable) # Sampler tick sequence
>>> res = sample(5.0) # 5 seconds
Parameters
  • sampler (Union[timedelta, float, Observable[Any]]) – Observable used to sample the source observable or time interval at which to sample (specified as a float denoting seconds or an instance of timedelta).

  • scheduler (Optional[SchedulerBase]) – Scheduler to use only when a time interval is given.

Return type

Callable[[Observable[~_T]], Observable[~_T]]

Returns

An operator function that takes an observable source and returns a sampled observable sequence.

giving.operators.scan(accumulator: Callable[[reactivex.operators._T, reactivex.operators._T], reactivex.operators._T]) Callable[[reactivex.observable.observable.Observable[reactivex.operators._T]], reactivex.observable.observable.Observable[reactivex.operators._T]]
giving.operators.scan(accumulator: Callable[[reactivex.operators._TState, reactivex.operators._T], reactivex.operators._TState], seed: Union[reactivex.operators._TState, Type[reactivex.internal.utils.NotSet]]) Callable[[reactivex.observable.observable.Observable[reactivex.operators._T]], reactivex.observable.observable.Observable[reactivex.operators._TState]]

The scan operator.

Applies an accumulator function over an observable sequence and returns each intermediate result. The optional seed value is used as the initial accumulator value. For aggregation behavior with no intermediate results, see aggregate() or Observable().

scan

Examples

>>> scanned = source.scan(lambda acc, x: acc + x)
>>> scanned = source.scan(lambda acc, x: acc + x, 0)
Parameters
  • accumulator (Callable[[~_TState, ~_T], ~_TState]) – An accumulator function to be invoked on each element.

  • seed (Union[~_TState, Type[NotSet]]) – [Optional] The initial accumulator value.

Return type

Callable[[Observable[~_T]], Observable[~_TState]]

Returns

A partially applied operator function that takes an observable source and returns an observable sequence containing the accumulated values.

giving.operators.sequence_equal(second, comparer=None)

Determines whether two sequences are equal by comparing the elements pairwise using a specified equality comparer.

scan

Examples

>>> res = sequence_equal([1,2,3])
>>> res = sequence_equal([{ "value": 42 }], lambda x, y: x.value == y.value)
>>> res = sequence_equal(reactivex.return_value(42))
>>> res = sequence_equal(
    reactivex.return_value({ "value": 42 }), lambda x, y: x.value == y.value)
Parameters
  • second (Union[Observable[~_T], Iterable[~_T]]) – Second observable sequence or iterable to compare.

  • comparer (Optional[Callable[[~_T, ~_T], bool]]) – [Optional] Comparer used to compare elements of both sequences. No guarantees on order of comparer arguments.

Return type

Callable[[Observable[~_T]], Observable[bool]]

Returns

An operator function that takes an observable source and returns an observable sequence that contains a single element which indicates whether both sequences are of equal length and their corresponding elements are equal according to the specified equality comparer.

giving.operators.share()

Share a single subscription among multiple observers.

This is an alias for a composed publish() and ref_count().

Return type

Callable[[Observable[~_T]], Observable[~_T]]

Returns

An operator function that takes an observable source and returns a new Observable that multicasts (shares) the original Observable. As long as there is at least one Subscriber this Observable will be subscribed and emitting data. When all subscribers have unsubscribed it will unsubscribe from the source Observable.

giving.operators.single(predicate=None)

The single operator.

Returns the only element of an observable sequence that satisfies the condition in the optional predicate, and reports an exception if there is not exactly one element in the observable sequence.

single

Example

>>> res = single()
>>> res = single(lambda x: x == 42)
Parameters

predicate (Optional[Callable[[~_T], bool]]) – [Optional] A predicate function to evaluate for elements in the source sequence.

Return type

Callable[[Observable[~_T]], Observable[~_T]]

Returns

An operator function that takes an observable source and returns an observable sequence containing the single element in the observable sequence that satisfies the condition in the predicate.

giving.operators.single_or_default(predicate=None, default_value=None)

Returns the only element of an observable sequence that matches the predicate, or a default value if no such element exists this method reports an exception if there is more than one element in the observable sequence.

single_or_default

Examples

>>> res = single_or_default()
>>> res = single_or_default(lambda x: x == 42)
>>> res = single_or_default(lambda x: x == 42, 0)
>>> res = single_or_default(None, 0)
Parameters
  • predicate (Optional[Callable[[~_T], bool]]) – [Optional] A predicate function to evaluate for elements in the source sequence.

  • default_value (Optional[Any]) – [Optional] The default value if the index is outside the bounds of the source sequence.

Return type

Callable[[Observable[~_T]], Observable[~_T]]

Returns

An operator function that takes an observable source and returns an observable sequence containing the single element in the observable sequence that satisfies the condition in the predicate, or a default value if no such element exists.

giving.operators.single_or_default_async(has_default=False, default_value=None)
Return type

Callable[[Observable[~_T]], Observable[~_T]]

giving.operators.skip(count)

The skip operator.

Bypasses a specified number of elements in an observable sequence and then returns the remaining elements.

skip

Parameters

count (int) – The number of elements to skip before returning the remaining elements.

Return type

Callable[[Observable[~_T]], Observable[~_T]]

Returns

An operator function that takes an observable source and returns an observable sequence that contains the elements that occur after the specified index in the input sequence.

giving.operators.skip_last(count)

The skip_last operator.

skip_last

Bypasses a specified number of elements at the end of an observable sequence.

This operator accumulates a queue with a length enough to store the first count elements. As more elements are received, elements are taken from the front of the queue and produced on the result sequence. This causes elements to be delayed.

Parameters
  • count (int) – Number of elements to bypass at the end of the source

  • sequence.

Return type

Callable[[Observable[~_T]], Observable[~_T]]

Returns

An operator function that takes an observable source and returns an observable sequence containing the source sequence elements except for the bypassed ones at the end.

giving.operators.skip_last_with_time(duration, scheduler=None)

Skips elements for the specified duration from the end of the observable source sequence.

Example

>>> res = skip_last_with_time(5.0)

This operator accumulates a queue with a length enough to store elements received during the initial duration window. As more elements are received, elements older than the specified duration are taken from the queue and produced on the result sequence. This causes elements to be delayed with duration.

Parameters
  • duration (Union[timedelta, float]) – Duration for skipping elements from the end of the sequence.

  • scheduler (Optional[SchedulerBase]) – Scheduler to use for time handling.

Return type

Callable[[Observable[~_T]], Observable[~_T]]

Returns

An observable sequence with the elements skipped during the

specified duration from the end of the source sequence.

giving.operators.skip_until(other)

Returns the values from the source observable sequence only after the other observable sequence produces a value.

skip_until

Parameters

other – The observable sequence that triggers propagation of elements of the source sequence.

Returns

An operator function that takes an observable source and returns an observable sequence containing the elements of the source sequence starting from the point the other sequence triggered propagation.

giving.operators.skip_until_with_time(start_time, scheduler=None)

Skips elements from the observable source sequence until the specified start time. Errors produced by the source sequence are always forwarded to the result sequence, even if the error occurs before the start time.

skip_until

Examples

>>> res = skip_until_with_time(datetime())
>>> res = skip_until_with_time(5.0)
Parameters

start_time (Union[datetime, timedelta, float]) – Time to start taking elements from the source sequence. If this value is less than or equal to datetime.utcnow(), no elements will be skipped.

Return type

Callable[[Observable[~_T]], Observable[~_T]]

Returns

An operator function that takes an observable source and returns an observable sequence with the elements skipped until the specified start time.

giving.operators.skip_while(predicate)

The skip_while operator.

Bypasses elements in an observable sequence as long as a specified condition is true and then returns the remaining elements. The element’s index is used in the logic of the predicate function.

skip_while

Example

>>> skip_while(lambda value: value < 10)
Parameters

predicate (Callable[[~_T], bool]) – A function to test each element for a condition; the second parameter of the function represents the index of the source element.

Return type

Callable[[Observable[~_T]], Observable[~_T]]

Returns

An operator function that takes an observable source and returns an observable sequence that contains the elements from the input sequence starting at the first element in the linear series that does not pass the test specified by predicate.

giving.operators.skip_while_indexed(predicate)

Bypasses elements in an observable sequence as long as a specified condition is true and then returns the remaining elements. The element’s index is used in the logic of the predicate function.

skip_while_indexed

Example

>>> skip_while(lambda value, index: value < 10 or index < 10)
Parameters

predicate (Callable[[~_T, int], bool]) – A function to test each element for a condition; the second parameter of the function represents the index of the source element.

Return type

Callable[[Observable[~_T]], Observable[~_T]]

Returns

An operator function that takes an observable source and returns an observable sequence that contains the elements from the input sequence starting at the first element in the linear series that does not pass the test specified by predicate.

giving.operators.skip_with_time(duration, scheduler=None)

Skips elements for the specified duration from the start of the observable source sequence.

skip_with_time

Parameters

skip_with_time (>>> res =) –

Specifying a zero value for duration doesn’t guarantee no elements will be dropped from the start of the source sequence. This is a side-effect of the asynchrony introduced by the scheduler, where the action that causes callbacks from the source sequence to be forwarded may not execute immediately, despite the zero due time.

Errors produced by the source sequence are always forwarded to the result sequence, even if the error occurs before the duration.

Parameters
  • duration (Union[timedelta, float]) – Duration for skipping elements from the start of the

  • sequence.

Return type

Callable[[Observable[~_T]], Observable[~_T]]

Returns

An operator function that takes an observable source and returns an observable sequence with the elements skipped during the specified duration from the start of the source sequence.

giving.operators.slice(start=None, stop=None, step=None)

The slice operator.

Slices the given observable. It is basically a wrapper around the operators skip, skip_last, take, take_last and filter.

slice

Examples

>>> result = source.slice(1, 10)
>>> result = source.slice(1, -2)
>>> result = source.slice(1, -1, 2)
Parameters
  • start (Optional[int]) – First element to take of skip last

  • stop (Optional[int]) – Last element to take of skip last

  • step (Optional[int]) – Takes every step element. Must be larger than zero

Return type

Callable[[Observable[~_T]], Observable[~_T]]

Returns

An operator function that takes an observable source and returns a sliced observable sequence.

giving.operators.sole(*, keep_key=False, exclude=[])

Extract values from a stream of dicts with one entry each.

sole

If, after removing keys from the exclusion set, any dict is empty or has a length superior to 1, that is an error.

Parameters
  • keep_key – If True, return a (key, value) tuple, otherwise only return the value. Defaults to False.

  • exclude – Keys to exclude.

giving.operators.sort(key=None, reverse=False)

Sort the stream.

bottom

Parameters
  • key – The comparison key function to use or a string.

  • reverse – If True, the sort is descending.

giving.operators.some(predicate=None)

The some operator.

Determines whether some element of an observable sequence satisfies a condition if present, else if some items are in the sequence.

some

Examples

>>> result = source.some()
>>> result = source.some(lambda x: x > 3)
Parameters

predicate (Optional[Callable[[~_T], bool]]) – A function to test each element for a condition.

Return type

Callable[[Observable[~_T]], Observable[bool]]

Returns

An operator function that takes an observable source and returns an observable sequence containing a single element determining whether some elements in the source sequence pass the test in the specified predicate if given, else if some items are in the sequence.

giving.operators.starmap(mapper: Callable[[reactivex.operators._A, reactivex.operators._B], reactivex.operators._T]) Callable[[reactivex.observable.observable.Observable[Tuple[reactivex.operators._A, reactivex.operators._B]]], reactivex.observable.observable.Observable[reactivex.operators._T]]
giving.operators.starmap(mapper: Callable[[reactivex.operators._A, reactivex.operators._B, reactivex.operators._C], reactivex.operators._T]) Callable[[reactivex.observable.observable.Observable[Tuple[reactivex.operators._A, reactivex.operators._B, reactivex.operators._C]]], reactivex.observable.observable.Observable[reactivex.operators._T]]
giving.operators.starmap(mapper: Callable[[reactivex.operators._A, reactivex.operators._B, reactivex.operators._C, reactivex.operators._D], reactivex.operators._T]) Callable[[reactivex.observable.observable.Observable[Tuple[reactivex.operators._A, reactivex.operators._B, reactivex.operators._C, reactivex.operators._D]]], reactivex.observable.observable.Observable[reactivex.operators._T]]

The starmap operator.

Unpack arguments grouped as tuple elements of an observable sequence and return an observable sequence of values by invoking the mapper function with star applied unpacked elements as positional arguments.

Use instead of map() when the the arguments to the mapper is grouped as tuples and the mapper function takes multiple arguments.

starmap

Example

>>> starmap(lambda x, y: x + y)
Parameters

mapper (Optional[Callable[…, Any]]) – A transform function to invoke with unpacked elements as arguments.

Return type

Callable[[Observable[Any]], Observable[Any]]

Returns

An operator function that takes an observable source and returns an observable sequence containing the results of invoking the mapper function with unpacked elements of the source.

giving.operators.starmap_indexed(mapper: Callable[[reactivex.operators._A, int], reactivex.operators._T]) Callable[[reactivex.observable.observable.Observable[reactivex.operators._A]], reactivex.observable.observable.Observable[reactivex.operators._T]]
giving.operators.starmap_indexed(mapper: Callable[[reactivex.operators._A, reactivex.operators._B, int], reactivex.operators._T]) Callable[[reactivex.observable.observable.Observable[Tuple[reactivex.operators._A, reactivex.operators._B]]], reactivex.observable.observable.Observable[reactivex.operators._T]]
giving.operators.starmap_indexed(mapper: Callable[[reactivex.operators._A, reactivex.operators._B, reactivex.operators._C, int], reactivex.operators._T]) Callable[[reactivex.observable.observable.Observable[Tuple[reactivex.operators._A, reactivex.operators._B, reactivex.operators._C]]], reactivex.observable.observable.Observable[reactivex.operators._T]]
giving.operators.starmap_indexed(mapper: Callable[[reactivex.operators._A, reactivex.operators._B, reactivex.operators._C, reactivex.operators._D, int], reactivex.operators._T]) Callable[[reactivex.observable.observable.Observable[Tuple[reactivex.operators._A, reactivex.operators._B, reactivex.operators._C, reactivex.operators._D]]], reactivex.observable.observable.Observable[reactivex.operators._T]]

Variant of starmap() which accepts an indexed mapper.

starmap_indexed

Example

>>> starmap_indexed(lambda x, y, i: x + y + i)
Parameters

mapper (Optional[Callable[…, Any]]) – A transform function to invoke with unpacked elements as arguments.

Return type

Callable[[Observable[Any]], Observable[Any]]

Returns

An operator function that takes an observable source and returns an observable sequence containing the results of invoking the indexed mapper function with unpacked elements of the source.

giving.operators.start_with(*args)

Prepends a sequence of values to an observable sequence.

start_with

Example

>>> start_with(1, 2, 3)
Return type

Callable[[Observable[~_T]], Observable[~_T]]

Returns

An operator function that takes a source observable and returns the source sequence prepended with the specified values.

giving.operators.subscribe_on(scheduler)

Subscribe on the specified scheduler.

Wrap the source sequence in order to run its subscription and unsubscription logic on the specified scheduler. This operation is not commonly used; see the remarks section for more information on the distinction between subscribe_on and observe_on.

This only performs the side-effects of subscription and unsubscription on the specified scheduler. In order to invoke observer callbacks on a scheduler, use observe_on.

Parameters

scheduler (SchedulerBase) – Scheduler to perform subscription and unsubscription actions on.

Return type

Callable[[Observable[~_T]], Observable[~_T]]

Returns

An operator function that takes an observable source and returns the source sequence whose subscriptions and un-subscriptions happen on the specified scheduler.

giving.operators.sum(*, scan=False)
giving.operators.switch_latest()

The switch_latest operator.

Transforms an observable sequence of observable sequences into an observable sequence producing values only from the most recent observable sequence.

switch_latest

Returns

A partially applied operator function that takes an observable source and returns the observable sequence that at any point in time produces the elements of the most recent inner observable sequence that has been received.

giving.operators.tag(group='', field='$word', group_field='$group')

Tag each dict or object with a unique word.

If the item is a dict, do item[field] = <new_word>, otherwise attempt to do setattr(item, field, <new_word>).

These tags are displayed specially by the display() method and they can be used to determine breakpoints with the breakword() method.

Parameters
  • group – An arbitrary group name that corresponds to an independent sequence of words. It determines the color in display.

  • field – The field name in which to put the word (default: $word).

  • group_field – The field name in which to put the group (default: $group).

giving.operators.take(count)

Returns a specified number of contiguous elements from the start of an observable sequence.

take

Example

>>> op = take(5)
Parameters

count (int) – The number of elements to return.

Return type

Callable[[Observable[~_T]], Observable[~_T]]

Returns

An operator function that takes an observable source and returns an observable sequence that contains the specified number of elements from the start of the input sequence.

giving.operators.take_last(count)

Returns a specified number of contiguous elements from the end of an observable sequence.

take_last

Example

>>> res = take_last(5)

This operator accumulates a buffer with a length enough to store elements count elements. Upon completion of the source sequence, this buffer is drained on the result sequence. This causes the elements to be delayed.

Parameters
  • count (int) – Number of elements to take from the end of the source

  • sequence.

Return type

Callable[[Observable[~_T]], Observable[~_T]]

Returns

An operator function that takes an observable source and returns an observable sequence containing the specified number of elements from the end of the source sequence.

giving.operators.take_last_buffer(count)

The take_last_buffer operator.

Returns an array with the specified number of contiguous elements from the end of an observable sequence.

take_last_buffer

Example

>>> res = source.take_last(5)

This operator accumulates a buffer with a length enough to store elements count elements. Upon completion of the source sequence, this buffer is drained on the result sequence. This causes the elements to be delayed.

Parameters
  • count (int) – Number of elements to take from the end of the source

  • sequence.

Return type

Callable[[Observable[~_T]], Observable[List[~_T]]]

Returns

An operator function that takes an observable source and returns an observable sequence containing a single list with the specified number of elements from the end of the source sequence.

giving.operators.take_last_with_time(duration, scheduler=None)

Returns elements within the specified duration from the end of the observable source sequence.

take_last_with_time

Example

>>> res = take_last_with_time(5.0)

This operator accumulates a queue with a length enough to store elements received during the initial duration window. As more elements are received, elements older than the specified duration are taken from the queue and produced on the result sequence. This causes elements to be delayed with duration.

Parameters
  • duration (Union[timedelta, float]) – Duration for taking elements from the end of the

  • sequence.

Return type

Callable[[Observable[~_T]], Observable[~_T]]

Returns

An operator function that takes an observable source and returns an observable sequence with the elements taken during the specified duration from the end of the source sequence.

giving.operators.take_until(other)

Returns the values from the source observable sequence until the other observable sequence produces a value.

take_until

Parameters

other (Observable[Any]) – Observable sequence that terminates propagation of elements of the source sequence.

Return type

Callable[[Observable[~_T]], Observable[~_T]]

Returns

An operator function that takes an observable source and returns as observable sequence containing the elements of the source sequence up to the point the other sequence interrupted further propagation.

giving.operators.take_until_with_time(end_time, scheduler=None)

Takes elements for the specified duration until the specified end time, using the specified scheduler to run timers.

take_until_with_time

Examples

>>> res = take_until_with_time(dt, [optional scheduler])
>>> res = take_until_with_time(5.0, [optional scheduler])
Parameters
  • end_time (Union[datetime, timedelta, float]) – Time to stop taking elements from the source sequence. If this value is less than or equal to datetime.utcnow(), the result stream will complete immediately.

  • scheduler (Optional[SchedulerBase]) – Scheduler to run the timer on.

Return type

Callable[[Observable[~_T]], Observable[~_T]]

Returns

An operator function that takes an observable source and returns an observable sequence with the elements taken until the specified end time.

giving.operators.take_while(predicate, inclusive=False)

Returns elements from an observable sequence as long as a specified condition is true.

take_while

Example

>>> take_while(lambda value: value < 10)
Parameters
  • predicate (Callable[[~_T], bool]) – A function to test each element for a condition.

  • inclusive (bool) – [Optional] When set to True the value that caused the predicate function to return False will also be emitted. If not specified, defaults to False.

Return type

Callable[[Observable[~_T]], Observable[~_T]]

Returns

An operator function that takes an observable source and returns an observable sequence that contains the elements from the input sequence that occur before the element at which the test no longer passes.

giving.operators.take_while_indexed(predicate, inclusive=False)

Returns elements from an observable sequence as long as a specified condition is true. The element’s index is used in the logic of the predicate function.

take_while_indexed

Example

>>> take_while_indexed(lambda value, index: value < 10 or index < 10)
Parameters
  • predicate (Callable[[~_T, int], bool]) – A function to test each element for a condition; the second parameter of the function represents the index of the source element.

  • inclusive (bool) – [Optional] When set to True the value that caused the predicate function to return False will also be emitted. If not specified, defaults to False.

Return type

Callable[[Observable[~_T]], Observable[~_T]]

Returns

An observable sequence that contains the elements from the input sequence that occur before the element at which the test no longer passes.

giving.operators.take_with_time(duration, scheduler=None)

Takes elements for the specified duration from the start of the observable source sequence.

take_with_time

Example

>>> res = take_with_time(5.0)

This operator accumulates a queue with a length enough to store elements received during the initial duration window. As more elements are received, elements older than the specified duration are taken from the queue and produced on the result sequence. This causes elements to be delayed with duration.

Parameters

duration (Union[timedelta, float]) – Duration for taking elements from the start of the sequence.

Return type

Callable[[Observable[~_T]], Observable[~_T]]

Returns

An operator function that takes an observable source and returns an observable sequence with the elements taken during the specified duration from the start of the source sequence.

giving.operators.throttle(window_duration, scheduler=None)

throttle() is an alias of throttle_first()

giving.operators.throttle_first(window_duration, scheduler=None)

Returns an Observable that emits only the first item emitted by the source Observable during sequential time windows of a specified duration.

Parameters

window_duration (Union[timedelta, float]) – time to wait before emitting another item after emitting the last item.

Return type

Callable[[Observable[~_T]], Observable[~_T]]

Returns

An operator function that takes an observable source and returns an observable that performs the throttle operation.

giving.operators.throttle_with_mapper(throttle_duration_mapper)

The throttle_with_mapper operator.

Ignores values from an observable sequence which are followed by another value within a computed throttle duration.

Example

>>> op = throttle_with_mapper(lambda x: rx.Scheduler.timer(x+x))
Parameters
  • throttle_duration_mapper (Callable[[Any], Observable[Any]]) – Mapper function to retrieve an

  • each (observable sequence indicating the throttle duration for) –

  • element. (given) –

Return type

Callable[[Observable[~_T]], Observable[~_T]]

Returns

A partially applied operator function that takes an observable source and returns the throttled observable sequence.

giving.operators.throttle_with_timeout(duetime, scheduler=None)

Ignores values from an observable sequence which are followed by another value before duetime.

debounce

Example

>>> res = debounce(5.0) # 5 seconds
Parameters
  • duetime (Union[timedelta, float]) – Duration of the throttle period for each value (specified as a float denoting seconds or an instance of timedelta).

  • scheduler (Optional[SchedulerBase]) – Scheduler to debounce values on.

Return type

Callable[[Observable[~_T]], Observable[~_T]]

Returns

An operator function that takes the source observable and returns the debounced observable sequence.

giving.operators.time_interval(scheduler=None)

Records the time interval between consecutive values in an observable sequence.

time_interval

Examples

>>> res = time_interval()
Return type

Callable[[Observable[~_T]], Observable[ForwardRef]]

Returns

An operator function that takes an observable source and returns an observable sequence with time interval information on values.

giving.operators.timeout(duetime, other=None, scheduler=None)

Returns the source observable sequence or the other observable sequence if duetime elapses.

timeout

Examples

>>> res = timeout(5.0)
>>> res = timeout(datetime(), return_value(42))
>>> res = timeout(5.0, return_value(42))
Parameters
  • duetime (Union[datetime, timedelta, float]) – Absolute (specified as a datetime object) or relative time (specified as a float denoting seconds or an instance of timedetla) when a timeout occurs.

  • other (Optional[Observable[~_T]]) – Sequence to return in case of a timeout. If not specified, a timeout error throwing sequence will be used.

  • scheduler (Optional[SchedulerBase]) –

Return type

Callable[[Observable[~_T]], Observable[~_T]]

Returns

An operator function that takes and observable source and returns the source sequence switching to the other sequence in case of a timeout.

giving.operators.timeout_with_mapper(first_timeout=None, timeout_duration_mapper=None, other=None)

Returns the source observable sequence, switching to the other observable sequence if a timeout is signaled.

Examples

>>> res = timeout_with_mapper(reactivex.timer(0.5))
>>> res = timeout_with_mapper(
    reactivex.timer(0.5), lambda x: reactivex.timer(0.2)
)
>>> res = timeout_with_mapper(
    reactivex.timer(0.5),
    lambda x: reactivex.timer(0.2),
    reactivex.return_value(42)
)
Parameters
  • first_timeout (Optional[Observable[Any]]) – [Optional] Observable sequence that represents the timeout for the first element. If not provided, this defaults to reactivex.never().

  • timeout_duration_mapper (Optional[Callable[[~_T], Observable[Any]]]) – [Optional] Selector to retrieve an observable sequence that represents the timeout between the current element and the next element.

  • other (Optional[Observable[~_T]]) – [Optional] Sequence to return in case of a timeout. If not provided, this is set to reactivex.throw().

Return type

Callable[[Observable[~_T]], Observable[~_T]]

Returns

An operator function that takes an observable source and returns the source sequence switching to the other sequence in case of a timeout.

giving.operators.timestamp(scheduler=None)

The timestamp operator.

Records the timestamp for each value in an observable sequence.

Examples

>>> timestamp()

Produces objects with attributes value and timestamp, where value is the original value.

Return type

Callable[[Observable[~_T]], Observable[ForwardRef]]

Returns

A partially applied operator function that takes an observable source and returns an observable sequence with timestamp information on values.

giving.operators.to_dict(key_mapper, element_mapper=None)

Converts the observable sequence to a Map if it exists.

Parameters
  • key_mapper (Callable[[~_T], ~_TKey]) – A function which produces the key for the dictionary.

  • element_mapper (Optional[Callable[[~_T], ~_TValue]]) – [Optional] An optional function which produces the element for the dictionary. If not present, defaults to the value from the observable sequence.

Return type

Callable[[Observable[~_T]], Observable[Dict[~_TKey, ~_TValue]]]

Returns

An operator function that takes an observable source and returns an observable sequence with a single value of a dictionary containing the values from the observable sequence.

giving.operators.to_future(future_ctor=None)

Converts an existing observable sequence to a Future.

Example

op = to_future(asyncio.Future);

Parameters

future_ctor – [Optional] The constructor of the future.

Returns

An operator function that takes an observable source and returns a future with the last value from the observable sequence.

giving.operators.to_iterable()

Creates an iterable from an observable sequence.

There is also an alias called to_list.

Return type

Callable[[Observable[~_T]], Observable[List[~_T]]]

Returns

An operator function that takes an obserable source and returns an observable sequence containing a single element with an iterable containing all the elements of the source sequence.

giving.operators.to_list()

Creates an iterable from an observable sequence.

There is also an alias called to_list.

Return type

Callable[[Observable[~_T]], Observable[List[~_T]]]

Returns

An operator function that takes an obserable source and returns an observable sequence containing a single element with an iterable containing all the elements of the source sequence.

giving.operators.to_marbles(timespan=0.1, scheduler=None)

Convert an observable sequence into a marble diagram string.

Parameters
  • timespan (Union[timedelta, float]) – [Optional] duration of each character in second. If not specified, defaults to 0.1s.

  • scheduler (Optional[SchedulerBase]) – [Optional] The scheduler used to run the the input sequence on.

Return type

Callable[[Observable[Any]], Observable[str]]

Returns

Observable stream.

giving.operators.to_set()

Converts the observable sequence to a set.

Return type

Callable[[Observable[~_T]], Observable[Set[~_T]]]

Returns

An operator function that takes an observable source and returns an observable sequence with a single value of a set containing the values from the observable sequence.

giving.operators.top(n=10, key=None)

Return the top n values, sorted in descending order.

top

top may emit less than n elements, if there are less than n elements in the orginal sequence.

Parameters
  • n – The number of top entries to return.

  • key – The comparison key function to use or a string.

giving.operators.variance(*, scan=False)
giving.operators.where(*keys, **conditions)

Filter entries with the given keys meeting the given conditions.

where

where2

Example

where("x", "!y", z=True, w=lambda x: x > 0)
Parameters
  • keys – Keys that must be present in the dictionary or, if a key starts with “!”, it must not be present.

  • conditions – Maps a key to the value it must be associated to in the dictionary, or to a predicate function on the value.

giving.operators.where_any(*keys)

Filter entries with any of the given keys.

where_any

Parameters

keys – Keys that must be present in the dictionary.

giving.operators.while_do(condition)

Repeats source as long as condition holds emulating a while loop.

Parameters

condition (Callable[[Observable[~_T]], bool]) – The condition which determines if the source will be repeated.

Return type

Callable[[Observable[~_T]], Observable[~_T]]

Returns

An operator function that takes an observable source and returns an observable sequence which is repeated as long as the condition holds.

giving.operators.window(boundaries)

Projects each element of an observable sequence into zero or more windows.

window

Examples

>>> res = window(reactivex.interval(1.0))
Parameters

boundaries (Observable[Any]) – Observable sequence whose elements denote the creation and completion of non-overlapping windows.

Return type

Callable[[Observable[~_T]], Observable[Observable[~_T]]]

Returns

An operator function that takes an observable source and returns an observable sequence of windows.

giving.operators.window_toggle(openings, closing_mapper)

Projects each element of an observable sequence into zero or more windows.

window

>>> res = window(reactivex.interval(0.5), lambda i: reactivex.timer(i))
Parameters
  • openings (Observable[Any]) – Observable sequence whose elements denote the creation of windows.

  • closing_mapper (Callable[[Any], Observable[Any]]) – A function invoked to define the closing of each produced window. Value from openings Observable that initiated the associated window is provided as argument to the function.

Return type

Callable[[Observable[~_T]], Observable[Observable[~_T]]]

Returns

An operator function that takes an observable source and returns an observable sequence of windows.

giving.operators.window_when(closing_mapper)

Projects each element of an observable sequence into zero or more windows.

window

Examples

>>> res = window(lambda: reactivex.timer(0.5))
Parameters

closing_mapper (Callable[[], Observable[Any]]) – A function invoked to define the closing of each produced window. It defines the boundaries of the produced windows (a window is started when the previous one is closed, resulting in non-overlapping windows).

Return type

Callable[[Observable[~_T]], Observable[Observable[~_T]]]

Returns

An operator function that takes an observable source and returns an observable sequence of windows.

giving.operators.window_with_count(count, skip=None)

Projects each element of an observable sequence into zero or more windows which are produced based on element count information.

window_with_count

Examples

>>> window_with_count(10)
>>> window_with_count(10, 1)
Parameters
  • count (int) – Length of each window.

  • skip (Optional[int]) – [Optional] Number of elements to skip between creation of consecutive windows. If not specified, defaults to the count.

Return type

Callable[[Observable[~_T]], Observable[Observable[~_T]]]

Returns

An observable sequence of windows.

giving.operators.window_with_time(timespan, timeshift=None, scheduler=None)
Return type

Callable[[Observable[~_T]], Observable[Observable[~_T]]]

giving.operators.window_with_time_or_count(timespan, count, scheduler=None)
Return type

Callable[[Observable[~_T]], Observable[Observable[~_T]]]

giving.operators.with_latest_from(*sources)

The with_latest_from operator.

Merges the specified observable sequences into one observable sequence by creating a tuple only when the first observable sequence produces an element. The observables can be passed either as separate arguments or as a list.

with_latest_from

Examples

>>> op = with_latest_from(obs1)
>>> op = with_latest_from([obs1, obs2, obs3])
Return type

Callable[[Observable[Any]], Observable[Any]]

Returns

An operator function that takes an observable source and returns an observable sequence containing the result of combining elements of the sources into a tuple.

giving.operators.wmap(name, fn=None, pass_keys=True)

Map each begin/end pair of a give.wrap.

In this schema, B and E correspond to the messages sent in the enter and exit phases respectively of the wrap() context manager.

group_wrap

Example

def _wrap(x):
    yield
    return x * 10

with given() as gv:
    results = gv.wmap("block", _wrap).accum()

    with give.wrap("block", x=3):
        with give.wrap("block", x=4):
            pass

assert results == [40, 30]
Parameters
  • name – Name of the wrap block to group on.

  • fn – A generator function that yields exactly once.

  • pass_keys – Whether to pass the arguments to give.wrap() as keyword arguments at the start (defaults to True).

giving.operators.zip(*args)

Merges the specified observable sequences into one observable sequence by creating a tuple whenever all of the observable sequences have produced an element at a corresponding index.

zip

Example

>>> res = zip(obs1, obs2)
Parameters

args (Observable[Any]) – Observable sources to zip.

Return type

Callable[[Observable[Any]], Observable[Any]]

Returns

An operator function that takes an observable source and returns an observable sequence containing the result of combining elements of the sources as a tuple.

giving.operators.zip_with_iterable(second)

Merges the specified observable sequence and list into one observable sequence by creating a tuple whenever all of the observable sequences have produced an element at a corresponding index.

zip_with_iterable

Example
>>> res = zip([1,2,3])
Parameters

second (Iterable[~_T2]) – Iterable to zip with the source observable..

Return type

Callable[[Observable[~_T1]], Observable[Tuple[~_T1, ~_T2]]]

Returns

An operator function that takes and observable source and returns an observable sequence containing the result of combining elements of the sources as a tuple.

giving.operators.zip_with_list(second)

Merges the specified observable sequence and list into one observable sequence by creating a tuple whenever all of the observable sequences have produced an element at a corresponding index.

zip_with_iterable

Example
>>> res = zip([1,2,3])
Parameters

second (Iterable[~_T2]) – Iterable to zip with the source observable..

Return type

Callable[[Observable[~_T1]], Observable[Tuple[~_T1, ~_T2]]]

Returns

An operator function that takes and observable source and returns an observable sequence containing the result of combining elements of the sources as a tuple.