# *****************************************************************************
#
# Copyright (c) 2020, the pyEX authors.
#
# This file is part of the pyEX library, distributed under the terms of
# the Apache License 2.0. The full license can be found in the LICENSE file.
#
from enum import Enum
from functools import wraps
from .sse import _runSSE, _runSSEAsync
[docs]class FXSSE(Enum):
FOREX = "forex"
FOREX1SECOND = "forex1Second"
FOREX5SECOND = "forex5Second"
FOREX1MINUTE = "forex1Minute"
@staticmethod
def options():
return list(map(lambda c: c.value, FXSSE))
[docs]def fxSSE(
symbols=None, on_data=None, exit=None, token="", version="stable", name="forex"
):
"""This endpoint streams real-time foreign currency exchange rates.
https://iexcloud.io/docs/api/#forex-currencies
Args:
symbols (str): Tickers to request, if None then firehose
on_data (function): Callback on data
exit (Event): Trigger to exit
token (str): Access token
version (str): API version
"""
return _runSSE(
name, symbols=symbols, on_data=on_data, exit=exit, token=token, version=version
)
[docs]async def fxSSEAsync(symbols=None, exit=None, token="", version="stable", name="forex"):
"""This endpoint streams real-time foreign currency exchange rates.
https://iexcloud.io/docs/api/#forex-currencies
Args:
symbols (str): Tickers to request, if None then firehose
exit (Event): Trigger to exit
token (str): Access token
version (str): API version
"""
async for item in _runSSEAsync(
name, symbols=symbols, exit=exit, token=token, version=version
):
yield item
[docs]@wraps(fxSSE)
def forex1SecondSSE(symbols=None, on_data=None, exit=None, token="", version="stable"):
return fxSSE(
symbols=symbols,
on_data=on_data,
exit=exit,
token=token,
version=version,
name="forex1Second",
)
[docs]@wraps(fxSSEAsync)
def forex1SecondSSEAsync(symbols=None, exit=None, token="", version="stable"):
for item in fxSSEAsync(
symbols=symbols, exit=exit, token=token, version=version, name="forex1Second"
):
yield item
[docs]@wraps(fxSSE)
def forex5SecondSSE(symbols=None, on_data=None, exit=None, token="", version="stable"):
return fxSSE(
symbols=symbols,
on_data=on_data,
exit=exit,
token=token,
version=version,
name="forex5Second",
)
[docs]@wraps(fxSSEAsync)
def forex5SecondSSEAsync(symbols=None, exit=None, token="", version="stable"):
for item in fxSSEAsync(
symbols=symbols, exit=exit, token=token, version=version, name="forex5Second"
):
yield item
[docs]@wraps(fxSSE)
def forex1MinuteSSE(symbols=None, on_data=None, exit=None, token="", version="stable"):
return fxSSE(
symbols=symbols,
on_data=on_data,
exit=exit,
token=token,
version=version,
name="forex1Minute",
)
[docs]@wraps(fxSSEAsync)
def forex1MinuteSSEAsync(symbols=None, exit=None, token="", version="stable"):
for item in fxSSEAsync(
symbols=symbols, exit=exit, token=token, version=version, name="forex1Minute"
):
yield item