Skip to content

📑 API Reference¤

This section holds a comprehensive documentation of all of classes, methods and functions in the humbldata package.

humbldata package.

humbldata.cli ¤

humbldata CLI.

humbldata.cli.say ¤

say(message: str = '') -> None

Say a message.

Source code in src/humbldata/cli.py
 8
 9
10
11
@app.command()
def say(message: str = "") -> None:
    """Say a message."""
    typer.echo(message)

humbldata.portfolio ¤

humbldata.portfolio.portfolio_controller ¤

Context: Portfolio.

The Portfolio Controller Module.

humbldata.portfolio.portfolio_controller.Portfolio ¤

Bases: PortfolioQueryParams

A top-level Portfolio controller for data analysis tools in humblDATA.

This module serves as the primary controller, routing user-specified PortfolioQueryParams as core arguments that are used to fetch time series data.

The portfolio controller also gives access to all sub-modules and their functions.

It is designed to facilitate the collection of data across various types such as stocks, options, or alternative time series by requiring minimal input from the user.

Submodules

The Portfolio controller is composed of the following submodules:

  • analytics:

Parameters:

Name Type Description Default
symbol str or list of str

The stock symbol(s) to query. Default is "AAPL".

required
provider OBB_EQUITY_PRICE_HISTORICAL_PROVIDERS

The data provider for historical price data. Default is "yahoo".

required
membership

The membership level of the user. Default is "anonymous".

required
Parameter Notes

The parameters are the PortfolioQueryParams. They are used for data collection further down the pipeline in other commands. Intended to execute operations on core data sets. This approach enables composable and standardized querying while accommodating data-specific collection logic.

The symbols you input here will be used as Portfolio symbols for the methods available in the analytics submodule.

Source code in src/humbldata/portfolio/portfolio_controller.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
class Portfolio(PortfolioQueryParams):
    """
    A top-level Portfolio controller for data analysis tools in `humblDATA`.

    This module serves as the primary controller, routing user-specified
    PortfolioQueryParams as core arguments that are used to fetch time series
    data.

    The `portfolio` controller also gives access to all sub-modules and their
    functions.

    It is designed to facilitate the collection of data across various types such as
    stocks, options, or alternative time series by requiring minimal input from the user.

    Submodules
    ----------
    The `Portfolio` controller is composed of the following submodules:

    - `analytics`:

    Parameters
    ----------
    symbol : str or list of str
        The stock symbol(s) to query. Default is "AAPL".
    provider : OBB_EQUITY_PRICE_HISTORICAL_PROVIDERS
        The data provider for historical price data. Default is "yahoo".
    membership: Literal["anonymous", "peon", "premium", "power", "permanent", "admin"]
        The membership level of the user. Default is "anonymous".

    Parameter Notes
    -----
    The parameters are the `PortfolioQueryParams`. They are used
    for data collection further down the pipeline in other commands.
    Intended to execute operations on core data sets. This approach enables
    composable and standardized querying while accommodating data-specific
    collection logic.

    The symbols you input here will be used as `Portfolio` symbols for the
    methods available in the `analytics` submodule.
    """

    def __init__(self, *args, **kwargs):
        """
        Initialize the Portfolio module.

        This method does not take any parameters and does not return anything.
        """
        super().__init__(*args, **kwargs)

    @property
    def analytics(self):
        """
        The analytics submodule of the Portfolio controller.

        Access to all the Analytics indicators. When the Portfolio class is
        instantiated the parameters are initialized with the PortfolioQueryParams
        class, which hold all the fields needed for the context_params, like the
        symbol, interval, start_date, and end_date.
        """
        return Analytics(context_params=self)
humbldata.portfolio.portfolio_controller.Portfolio.__init__ ¤
__init__(*args, **kwargs)

Initialize the Portfolio module.

This method does not take any parameters and does not return anything.

Source code in src/humbldata/portfolio/portfolio_controller.py
52
53
54
55
56
57
58
def __init__(self, *args, **kwargs):
    """
    Initialize the Portfolio module.

    This method does not take any parameters and does not return anything.
    """
    super().__init__(*args, **kwargs)
humbldata.portfolio.portfolio_controller.Portfolio.analytics property ¤
analytics

The analytics submodule of the Portfolio controller.

Access to all the Analytics indicators. When the Portfolio class is instantiated the parameters are initialized with the PortfolioQueryParams class, which hold all the fields needed for the context_params, like the symbol, interval, start_date, and end_date.

humbldata.portfolio.analytics ¤

humbldata.portfolio.analytics.analytics_controller ¤

Context: Portfolio || Category: Analytics.

A controller to manage and compile all of the Analytics models available in the portfolio context. This will be passed as a @property to the portfolio() class, giving access to the Analytics module and its functions.

humbldata.portfolio.analytics.analytics_controller.Analytics ¤

Module for all Analytics analysis.

Attributes:

Name Type Description
context_params PortfolioQueryParams

The standard query parameters for portfolio data.

Methods:

Name Description
user_table

Execute the UserTable command.

Source code in src/humbldata/portfolio/analytics/analytics_controller.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class Analytics:
    """
    Module for all Analytics analysis.

    Attributes
    ----------
    context_params : PortfolioQueryParams
        The standard query parameters for portfolio data.

    Methods
    -------
    user_table(command_params: UserTableQueryParams)
        Execute the UserTable command.

    """

    def __init__(self, context_params: PortfolioQueryParams):
        self.context_params = context_params

    def user_table(self, **kwargs: UserTableQueryParams):
        """
        Execute the UserTable command.

        Explain the functionality...
        """
        from humbldata.core.standard_models.portfolio.analytics.user_table import (
            UserTableFetcher,
        )

        # Instantiate the Fetcher with the query parameters
        fetcher = UserTableFetcher(
            context_params=self.context_params, command_params=kwargs
        )

        # Use the fetcher to get the data
        return fetcher.fetch_data()
humbldata.portfolio.analytics.analytics_controller.Analytics.user_table ¤
user_table(**kwargs: UserTableQueryParams)

Execute the UserTable command.

Explain the functionality...

Source code in src/humbldata/portfolio/analytics/analytics_controller.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def user_table(self, **kwargs: UserTableQueryParams):
    """
    Execute the UserTable command.

    Explain the functionality...
    """
    from humbldata.core.standard_models.portfolio.analytics.user_table import (
        UserTableFetcher,
    )

    # Instantiate the Fetcher with the query parameters
    fetcher = UserTableFetcher(
        context_params=self.context_params, command_params=kwargs
    )

    # Use the fetcher to get the data
    return fetcher.fetch_data()

humbldata.portfolio.analytics.user_table ¤

humbldata.portfolio.analytics.user_table.model ¤

Context: Portfolio || Category: Analytics || Command: user_table.

The user_table Command Module.

humbldata.portfolio.analytics.user_table.model.user_table_engine async ¤
user_table_engine(symbols: str | list[str] | Series, etf_data: LazyFrame | None = None, toolbox: Toolbox | None = None, mandelbrot_data: LazyFrame | None = None, membership: Literal['anonymous', 'peon', 'premium', 'power', 'admin'] = 'anonymous')

Aggregate user table data from various sources.

Parameters:

Name Type Description Default
symbols str or list of str or pl.Series

The stock symbols to aggregate data for.

required
etf_data LazyFrame or None

Pre-fetched ETF data. If None, it will be fetched, by default None.

None
toolbox Toolbox or None

Pre-generated toolbox. If None, it will be generated, by default None.

None
mandelbrot_data LazyFrame or None

Pre-calculated Mandelbrot channel data. If None, it will be calculated, by default None.

None
membership Literal['anonymous', 'peon', 'premium', 'power', 'admin']

The user's role. If None, it will be calculated, by default None.

'anonymous'

Returns:

Type Description
LazyFrame

A LazyFrame containing the aggregated user table data with columns: date, symbol, bottom_price, recent_price, top_price, ud_pct, ud_ratio, sector, and asset_class.

Notes

This function performs the following steps: 1. Fetches ETF data if not provided 2. Generates a toolbox if not provided 3. Calculates Mandelbrot channel if not provided 4. Concurrently fetches latest price, sector, and asset class data 5. Combines all data into a single LazyFrame 6. Calculates up/down percentages and ratios 7. Selects and returns relevant columns

The function uses asynchronous operations for improved performance when fetching data from multiple sources.

Source code in src/humbldata/portfolio/analytics/user_table/model.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
async def user_table_engine(
    symbols: str | list[str] | pl.Series,
    etf_data: pl.LazyFrame | None = None,
    toolbox: Toolbox | None = None,
    mandelbrot_data: pl.LazyFrame | None = None,
    membership: Literal[
        "anonymous", "peon", "premium", "power", "admin"
    ] = "anonymous",
):
    """
    Aggregate user table data from various sources.

    Parameters
    ----------
    symbols : str or list of str or pl.Series
        The stock symbols to aggregate data for.
    etf_data : pl.LazyFrame or None, optional
        Pre-fetched ETF data. If None, it will be fetched, by default None.
    toolbox : Toolbox or None, optional
        Pre-generated toolbox. If None, it will be generated, by default None.
    mandelbrot_data : pl.LazyFrame or None, optional
        Pre-calculated Mandelbrot channel data. If None, it will be calculated, by default None.
    membership : Literal["anonymous", "peon", "premium", "power", "admin"], optional
        The user's role. If None, it will be calculated, by default None.

    Returns
    -------
    pl.LazyFrame
        A LazyFrame containing the aggregated user table data with columns:
        date, symbol, bottom_price, recent_price, top_price, ud_pct, ud_ratio,
        sector, and asset_class.

    Notes
    -----
    This function performs the following steps:
    1. Fetches ETF data if not provided
    2. Generates a toolbox if not provided
    3. Calculates Mandelbrot channel if not provided
    4. Concurrently fetches latest price, sector, and asset class data
    5. Combines all data into a single LazyFrame
    6. Calculates up/down percentages and ratios
    7. Selects and returns relevant columns

    The function uses asynchronous operations for improved performance when
    fetching data from multiple sources.
    """
    # Fetch ETF data if not provided
    if etf_data is None:
        etf_data = await aget_etf_category(symbols=symbols)

    # Calculate Mandelbrot channel if not provided
    if mandelbrot_data is None:
        # Generate toolbox params based on membership if not provided
        if toolbox is None:
            toolbox = Toolbox(symbols=symbols, membership=membership)
        mandelbrot_data = toolbox.technical.mandelbrot_channel().to_polars(
            collect=False
        )
    # Fetch data from all sources concurrently, passing etf_data where needed
    tasks = [
        aget_latest_price(symbols=symbols),
        aget_sector_filter(symbols=symbols, etf_data=etf_data),
        aget_asset_class_filter(symbols=symbols, etf_data=etf_data),
    ]
    lazyframes = await asyncio.gather(*tasks)

    # Combine all DataFrames into a single LazyFrame
    out = (
        (
            pl.concat(lazyframes, how="align")
            .lazy()
            .join(mandelbrot_data, on="symbol", how="left")
            .pipe(calc_up_down_pct)
        )
        .select(
            [
                "date",
                "symbol",
                "bottom_price",
                "recent_price",
                "top_price",
                "ud_pct",
                "ud_ratio",
                "sector",
                "asset_class",
            ]
        )
        .rename({"recent_price": "last_price"})
        .rename({"bottom_price": "buy_price"})
        .rename({"top_price": "sell_price"})
    )
    return out
humbldata.portfolio.analytics.user_table.view ¤

Context: Portfolio || Category: Analytics || Command: user_table.

The UserTable View Module.

humbldata.portfolio.analytics.user_table.view.create_example_plot ¤
create_example_plot(data: DataFrame, template: ChartTemplate = ChartTemplate.plotly) -> Figure

Generate an example plot from the provided data.

Parameters:

Name Type Description Default
data DataFrame

The dataframe containing the data to be plotted.

required
template ChartTemplate

The template to be used for styling the plot.

plotly

Returns:

Type Description
Figure

A plotly figure object representing the example plot.

Source code in src/humbldata/portfolio/analytics/user_table/view.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def create_example_plot(
    data: pl.DataFrame,
    template: ChartTemplate = ChartTemplate.plotly,
) -> go.Figure:
    """
    Generate an example plot from the provided data.

    Parameters
    ----------
    data : pl.DataFrame
        The dataframe containing the data to be plotted.
    template : ChartTemplate
        The template to be used for styling the plot.

    Returns
    -------
    go.Figure
        A plotly figure object representing the example plot.
    """
    fig = go.Figure()
    fig.add_trace(
        go.Scatter(
            x=data.select("x_column").to_series(),
            y=data.select("y_column").to_series(),
            name="Example Data",
            line=dict(color="blue"),
        )
    )
    fig.update_layout(
        title="Example Plot",
        xaxis_title="X Axis",
        yaxis_title="Y Axis",
        template=template,
    )
    return fig
humbldata.portfolio.analytics.user_table.view.generate_plots ¤
generate_plots(data: LazyFrame, template: ChartTemplate = ChartTemplate.plotly) -> List[Chart]

Context: Portfolio || Category: Analytics || Command: user_table || Function: generate_plots().

Generate plots from the given dataframe.

Parameters:

Name Type Description Default
data LazyFrame

The LazyFrame containing the data to be plotted.

required
template ChartTemplate

The template/theme to use for the plotly figure.

plotly

Returns:

Type Description
List[Chart]

A list of Chart objects, each representing a plot.

Source code in src/humbldata/portfolio/analytics/user_table/view.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def generate_plots(
    data: pl.LazyFrame,
    template: ChartTemplate = ChartTemplate.plotly,
) -> List[Chart]:
    """
    Context: Portfolio || Category: Analytics || Command: user_table || **Function: generate_plots()**.

    Generate plots from the given dataframe.

    Parameters
    ----------
    data : pl.LazyFrame
        The LazyFrame containing the data to be plotted.
    template : ChartTemplate
        The template/theme to use for the plotly figure.

    Returns
    -------
    List[Chart]
        A list of Chart objects, each representing a plot.
    """
    collected_data = data.collect()
    plot = create_example_plot(collected_data, template)
    return [Chart(content=plot.to_plotly_json(), fig=plot)]
humbldata.portfolio.analytics.user_table.helpers ¤

Context: Portfolio || Category: Analytics || Command: user_table.

The UserTable Helpers Module.

humbldata.portfolio.analytics.user_table.helpers.aget_sector_filter async ¤
aget_sector_filter(symbols: str | list[str] | Series, provider: OBB_EQUITY_PROFILE_PROVIDERS | None = 'yfinance', etf_data: ETFCategoryData | None = None) -> LazyFrame

Context: Portfolio || Category: Analytics || Command: User Table || Command: aget_sector_filter.

Retrieves equity sector information for given symbols, filling in the ETF sector with the obb.etf.info category column from aget_etf_sector. This function also normalizes the sector to GICS_SECTORS via the .replace(GICS_SECTOR_MAPPING) method, and renames the category column to sector. The normalization is different from the normalization in aget_asset_class_filter in that this function uses .str.replace() to normalize the sector, while aget_asset_class_filter uses .replace(). Using .str.replace() allows for Regex matching, but this method since all values are known is slightly more performant.

Parameters:

Name Type Description Default
symbols str | list[str] | Series

The symbols to query for sector/category information.

required
provider OBB_EQUITY_PROFILE_PROVIDERS | None

The data provider to use. Default is "yfinance".

'yfinance'

Returns:

Type Description
LazyFrame

A Polars LazyFrame with columns for symbols and their corresponding sectors/categories.

Notes

This function uses aget_equity_sector() to fetch sector information and aget_etf_category() for symbols without sectors. It then combines the results.

Source code in src/humbldata/portfolio/analytics/user_table/helpers.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
async def aget_sector_filter(
    symbols: str | list[str] | pl.Series,
    provider: OBB_EQUITY_PROFILE_PROVIDERS | None = "yfinance",
    etf_data: ETFCategoryData | None = None,
) -> pl.LazyFrame:
    """
    Context: Portfolio || Category: Analytics || Command: User Table || **Command: aget_sector_filter**.

    Retrieves equity sector information for given symbols, filling in the ETF sector
    with the `obb.etf.info` category column from `aget_etf_sector`. This function
    also normalizes the sector to GICS_SECTORS via the
    `.replace(GICS_SECTOR_MAPPING)` method, and renames the `category` column to
    `sector`. The normalization is different from the normalization in
    `aget_asset_class_filter` in that this function uses `.str.replace()` to
    normalize the sector, while `aget_asset_class_filter` uses `.replace()`.
    Using `.str.replace()` allows for Regex matching, but this method since all
    values are known is slightly more performant.

    Parameters
    ----------
    symbols : str | list[str] | pl.Series
        The symbols to query for sector/category information.
    provider : OBB_EQUITY_PROFILE_PROVIDERS | None, optional
        The data provider to use. Default is "yfinance".

    Returns
    -------
    pl.LazyFrame
        A Polars LazyFrame with columns for symbols and their corresponding sectors/categories.

    Notes
    -----
    This function uses aget_equity_sector() to fetch sector information and aget_etf_category()
    for symbols without sectors. It then combines the results.
    """
    # Get sector information
    equity_sectors = await aget_equity_sector(symbols, provider="yfinance")

    # Identify symbols with null sectors
    etf_symbols = (
        equity_sectors.lazy()
        .filter(pl.col("sector").is_null())
        .select(["symbol"])
        .collect()
        .to_series()
        .to_list()
    )
    equity_sectors = equity_sectors.filter(pl.col("sector").is_not_null())

    # Get ETF categories for symbols with null sectors
    if etf_symbols:
        if etf_data is None:
            etf_categories = await aget_etf_category(
                etf_symbols, provider="yfinance"
            )
        else:
            # Remove columns with NULL (incoming equity symbols from ETF_DATA)
            # since only etf symbols are collected from logic above
            # Validation
            etf_data = etf_data.filter(pl.col("symbol").is_in(etf_symbols))
            etf_categories = ETFCategoryData(etf_data)

        # Normalize Sectors to GICS_SECTORS
        etf_categories = etf_categories.rename(
            {"category": "sector"}
        ).with_columns(pl.col("sector").replace(GICS_SECTOR_MAPPING))

        # If all symbols are ETFs, return the ETF sectors (no need to combine)
        if etf_symbols == symbols:
            out_sectors = etf_categories
        else:
            out_sectors = pl.concat(
                [equity_sectors, etf_categories], how="vertical"
            )
    else:
        out_sectors = equity_sectors

    return out_sectors
humbldata.portfolio.analytics.user_table.helpers.normalize_asset_class ¤
normalize_asset_class(data: LazyFrame) -> LazyFrame

Normalize the asset class in the given LazyFrame to standard ASSET_CLASSES values.

This function uses string replacement to standardize asset class names in the 'category' column of the input LazyFrame.

Parameters:

Name Type Description Default
data LazyFrame

The input LazyFrame containing 'symbol' and 'category' columns to be normalized.

required

Returns:

Type Description
LazyFrame

A new LazyFrame with the 'category' column normalized to standard asset classes.

Notes

This function assumes that the input LazyFrame has 'symbol' and 'category' columns. If these columns don't exist, the function may raise an error.

Source code in src/humbldata/portfolio/analytics/user_table/helpers.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
def normalize_asset_class(data: pl.LazyFrame) -> pl.LazyFrame:
    """
    Normalize the asset class in the given LazyFrame to standard ASSET_CLASSES values.

    This function uses string replacement to standardize asset class names in
    the 'category' column of the input LazyFrame.

    Parameters
    ----------
    data : pl.LazyFrame
        The input LazyFrame containing 'symbol' and 'category' columns to be normalized.

    Returns
    -------
    pl.LazyFrame
        A new LazyFrame with the 'category' column normalized to standard asset classes.

    Notes
    -----
    This function assumes that the input LazyFrame has 'symbol' and 'category' columns.
    If these columns don't exist, the function may raise an error.
    """
    out = data.with_columns(
        pl.when(pl.col("symbol").is_in(["GLD", "FGDL", "BGLD"]))
        .then(pl.lit("Foreign Exchange"))
        .when(pl.col("symbol").is_in(["UUP", "UDN", "USDU"]))
        .then(pl.lit("Cash"))
        .when(pl.col("symbol").is_in(["BITI", "ETHU", "ZZZ"]))
        .then(pl.lit("Crypto"))
        .when(pl.col("symbol").is_in(["BDRY", "LNGG", "AMPD", "USOY"]))
        .then(pl.lit("Commodity"))
        .otherwise(
            pl.col("category")
            .str.replace(
                r"^(?:\w+\s){0,2}\w*\bBond\b\w*(?:\s\w+){0,2}$", "Fixed Income"
            )
            .str.replace(r".*Commodities.*", "Commodity")
            .str.replace(r".*Digital.*", "Crypto")
            .str.replace(r".*Currency.*", "Foreign Exchange")
            .str.replace(r".*Equity.*", "Equity")
            .str.replace("Utilities", "Equity")
            .str.replace("Financial", "Equity")
            .str.replace("Technology", "Equity")
        )
        .alias("category")
    )
    return out
humbldata.portfolio.analytics.user_table.helpers.aget_asset_class_filter async ¤
aget_asset_class_filter(symbols: str | list[str] | Series, provider: OBB_ETF_INFO_PROVIDERS | None = 'yfinance', etf_data: ETFCategoryData | None = None) -> LazyFrame

Context: Portfolio || Category: Analytics || Command: User Table || Command: aget_asset_class_filter.

This function takes in a list of symbols and returns a LazyFrame with the asset class for each symbol. Unlike aget_sector_filter, this function normalizes the asset class using the normalize_asset_class() method, which employs .str.replace() for Regex matching. This approach allows for more flexible pattern matching but may be slightly less performant than the direct .replace() method used in aget_sector_filter.

The function also renames the 'category' column to 'asset_class'. The normalization process maps the asset classes to standard ASSET_CLASSES values.

Source code in src/humbldata/portfolio/analytics/user_table/helpers.py
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
async def aget_asset_class_filter(
    symbols: str | list[str] | pl.Series,
    provider: OBB_ETF_INFO_PROVIDERS | None = "yfinance",
    etf_data: ETFCategoryData | None = None,
) -> pl.LazyFrame:
    """
    Context: Portfolio || Category: Analytics || Command: User Table || **Command: aget_asset_class_filter**.

    This function takes in a list of symbols and returns a LazyFrame with the
    asset class for each symbol. Unlike aget_sector_filter, this function
    normalizes the asset class using the normalize_asset_class() method, which
    employs `.str.replace()` for Regex matching. This approach allows for more
    flexible pattern matching but may be slightly less performant than the
    direct `.replace()` method used in aget_sector_filter.

    The function also renames the 'category' column to 'asset_class'. The
    normalization process maps the asset classes to standard ASSET_CLASSES values.
    """
    if etf_data is None:
        out = await aget_etf_category(symbols, provider=provider)
    else:
        out = ETFCategoryData(etf_data)
    out = out.lazy().with_columns(
        [
            pl.when(pl.col("category").is_null())
            .then(pl.lit("Equity"))
            .otherwise(pl.col("category"))
            .alias("category")
        ]
    )
    return out.pipe(normalize_asset_class).rename({"category": "asset_class"})
humbldata.portfolio.analytics.user_table.helpers.calc_up_down_pct ¤
calc_up_down_pct(data: LazyFrame, recent_price_col: str = 'recent_price', bottom_price_col: str = 'bottom_price', top_price_col: str = 'top_price', output_col: str = 'ud_pct', ratio_col: str = 'ud_ratio') -> LazyFrame

Calculate the difference between recent and bottom prices, and recent and top prices, and express the ratio of the two.

This function computes the percentage change from the recent price to the bottom price, and from the recent price to the top price. The results are combined into a single string column, and the ratio is provided in a separate column.

Parameters:

Name Type Description Default
data DataFrame

Input DataFrame containing price data.

required
recent_price_col str

Name of the column containing recent prices. Default is "recent_price".

'recent_price'
bottom_price_col str

Name of the column containing bottom prices. Default is "bottom_price".

'bottom_price'
top_price_col str

Name of the column containing top prices. Default is "top_price".

'top_price'
output_col str

Name of the output column for price percentages. Default is "price_percentages".

'ud_pct'
ratio_col str

Name of the output column for the up/down ratio. Default is "ud_ratio".

'ud_ratio'

Returns:

Type Description
DataFrame

DataFrame with additional columns containing the calculated price percentages and ratio.

Notes

The output column will contain strings in the format "-X.XX / +Y.YY", where X.XX is the percentage decrease from recent to bottom price, and Y.YY is the percentage increase from recent to top price. The ratio column will contain the ratio of these two percentages. This function is to be used in user_table_engine.

Source code in src/humbldata/portfolio/analytics/user_table/helpers.py
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
def calc_up_down_pct(
    data: pl.LazyFrame,
    recent_price_col: str = "recent_price",
    bottom_price_col: str = "bottom_price",
    top_price_col: str = "top_price",
    output_col: str = "ud_pct",
    ratio_col: str = "ud_ratio",
) -> pl.LazyFrame:
    """
    Calculate the difference between recent and bottom prices, and recent and top prices, and express the ratio of the two.

    This function computes the percentage change from the recent price to the bottom price,
    and from the recent price to the top price. The results are combined into a single string
    column, and the ratio is provided in a separate column.

    Parameters
    ----------
    data : pl.DataFrame
        Input DataFrame containing price data.
    recent_price_col : str, optional
        Name of the column containing recent prices. Default is "recent_price".
    bottom_price_col : str, optional
        Name of the column containing bottom prices. Default is "bottom_price".
    top_price_col : str, optional
        Name of the column containing top prices. Default is "top_price".
    output_col : str, optional
        Name of the output column for price percentages. Default is "price_percentages".
    ratio_col : str, optional
        Name of the output column for the up/down ratio. Default is "ud_ratio".

    Returns
    -------
    pl.DataFrame
        DataFrame with additional columns containing the calculated price percentages and ratio.

    Notes
    -----
    The output column will contain strings in the format "-X.XX / +Y.YY", where X.XX is the
    percentage decrease from recent to bottom price, and Y.YY is the percentage increase from
    recent to top price. The ratio column will contain the ratio of these two percentages.
    This function is to be used in `user_table_engine`.
    """
    return data.with_columns(
        [
            (
                "-"
                + (
                    (pl.col(recent_price_col) - pl.col(bottom_price_col))
                    / pl.col(recent_price_col)
                    * 100
                )
                .abs()
                .round(2)
                .cast(pl.Utf8)
                + " / +"
                + (
                    (pl.col(top_price_col) - pl.col(recent_price_col))
                    / pl.col(recent_price_col)
                    * 100
                )
                .round(2)
                .cast(pl.Utf8)
            ).alias(output_col),
            (
                (pl.col(recent_price_col) - pl.col(bottom_price_col))
                / (pl.col(top_price_col) - pl.col(recent_price_col))
            )
            .round(2)
            .alias(ratio_col),
        ]
    )

humbldata.toolbox ¤

Context: Toolbox.

A category to group all of the technical indicators available in the Toolbox()

Technical indicators rely on statistical transformations of time series data. These are raw math operations.

humbldata.toolbox.toolbox_helpers ¤

Context: Toolbox || Category: Helpers.

These Toolbox() helpers are used in various calculations in the toolbox context. Most of the helpers will be mathematical transformations of data. These functions should be DUMB functions.

humbldata.toolbox.toolbox_helpers.log_returns ¤

log_returns(data: Series | DataFrame | LazyFrame | None = None, _column_name: str = 'adj_close', *, _drop_nulls: bool = True, _sort: bool = True) -> Series | DataFrame | LazyFrame

Context: Toolbox || Category: Helpers || Command: log_returns.

This is a DUMB command. It can be used in any CONTEXT or CATEGORY. Calculates the logarithmic returns for a given Polars Series, DataFrame, or LazyFrame. Logarithmic returns are widely used in the financial industry to measure the rate of return on investments over time. This function supports calculations on both individual series and dataframes containing financial time series data.

Parameters:

Name Type Description Default
data Series | DataFrame | LazyFrame

The input data for which to calculate the log returns. Default is None.

None
_drop_nulls bool

Whether to drop null values from the result. Default is True.

True
_column_name str

The column name to use for log return calculations in DataFrame or LazyFrame. Default is "adj_close".

'adj_close'
_sort bool

If True, sorts the DataFrame or LazyFrame by date and symbol before calculation. If you want a DUMB function, set to False. Default is True.

True

Returns:

Type Description
Series | DataFrame | LazyFrame

The original data, with an extra column of log returns of the input data. The return type matches the input type.

Raises:

Type Description
HumblDataError

If neither a series, DataFrame, nor LazyFrame is provided as input.

Examples:

>>> series = pl.Series([100, 105, 103])
>>> log_returns(data=series)
series([-inf, 0.048790, -0.019418])
>>> df = pl.DataFrame({"adj_close": [100, 105, 103]})
>>> log_returns(data=df)
shape: (3, 2)
┌───────────┬────────────┐
│ adj_close ┆ log_returns│
│ ---       ┆ ---        │
│ f64       ┆ f64        │
╞═══════════╪════════════╡
│ 100.0     ┆ NaN        │
├───────────┼────────────┤
│ 105.0     ┆ 0.048790   │
├───────────┼────────────┤
│ 103.0     ┆ -0.019418  │
└───────────┴────────────┘
Improvements

Add a parameter _sort_cols: list[str] | None = None to make the function even dumber. This way you could specify certain columns to sort by instead of using default date and symbol. If _sort_cols=None and _sort=True, then the function will use the default date and symbol columns for sorting.

Source code in src/humbldata/toolbox/toolbox_helpers.py
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
def log_returns(
    data: pl.Series | pl.DataFrame | pl.LazyFrame | None = None,
    _column_name: str = "adj_close",
    *,
    _drop_nulls: bool = True,
    _sort: bool = True,
) -> pl.Series | pl.DataFrame | pl.LazyFrame:
    """
    Context: Toolbox || Category: Helpers || **Command: log_returns**.

    This is a DUMB command. It can be used in any CONTEXT or CATEGORY.
    Calculates the logarithmic returns for a given Polars Series, DataFrame, or
    LazyFrame. Logarithmic returns are widely used in the financial
    industry to measure the rate of return on investments over time. This
    function supports calculations on both individual series and dataframes
    containing financial time series data.

    Parameters
    ----------
    data : pl.Series | pl.DataFrame | pl.LazyFrame, optional
        The input data for which to calculate the log returns. Default is None.
    _drop_nulls : bool, optional
        Whether to drop null values from the result. Default is True.
    _column_name : str, optional
        The column name to use for log return calculations in DataFrame or
        LazyFrame. Default is "adj_close".
    _sort : bool, optional
        If True, sorts the DataFrame or LazyFrame by `date` and `symbol` before
        calculation. If you want a DUMB function, set to False.
        Default is True.

    Returns
    -------
    pl.Series | pl.DataFrame | pl.LazyFrame
        The original `data`, with an extra column of `log returns` of the input
        data. The return type matches the input type.

    Raises
    ------
    HumblDataError
        If neither a series, DataFrame, nor LazyFrame is provided as input.

    Examples
    --------
    >>> series = pl.Series([100, 105, 103])
    >>> log_returns(data=series)
    series([-inf, 0.048790, -0.019418])

    >>> df = pl.DataFrame({"adj_close": [100, 105, 103]})
    >>> log_returns(data=df)
    shape: (3, 2)
    ┌───────────┬────────────┐
    │ adj_close ┆ log_returns│
    │ ---       ┆ ---        │
    │ f64       ┆ f64        │
    ╞═══════════╪════════════╡
    │ 100.0     ┆ NaN        │
    ├───────────┼────────────┤
    │ 105.0     ┆ 0.048790   │
    ├───────────┼────────────┤
    │ 103.0     ┆ -0.019418  │
    └───────────┴────────────┘

    Improvements
    -----------
    Add a parameter `_sort_cols: list[str] | None = None` to make the function even
    dumber. This way you could specify certain columns to sort by instead of
    using default `date` and `symbol`. If `_sort_cols=None` and `_sort=True`,
    then the function will use the default `date` and `symbol` columns for
    sorting.

    """
    # Calculation for Polars Series
    if isinstance(data, pl.Series):
        out = data.log().diff()
        if _drop_nulls:
            out = out.drop_nulls()
    # Calculation for Polars DataFrame or LazyFrame
    elif isinstance(data, pl.DataFrame | pl.LazyFrame):
        sort_cols = _set_sort_cols(data, "symbol", "date")
        if _sort and sort_cols:
            data = data.sort(sort_cols)
            for col in sort_cols:
                data = data.set_sorted(col)
        elif _sort and not sort_cols:
            msg = "Data must contain 'symbol' and 'date' columns for sorting."
            raise HumblDataError(msg)

        if "log_returns" not in data.collect_schema().names():
            out = data.with_columns(
                pl.col(_column_name).log().diff().alias("log_returns")
            )
        else:
            out = data
        if _drop_nulls:
            out = out.drop_nulls(subset="log_returns")
    else:
        msg = "No valid data type was provided for `log_returns()` calculation."
        raise HumblDataError(msg)

    return out

humbldata.toolbox.toolbox_helpers.detrend ¤

detrend(data: DataFrame | LazyFrame | Series, _detrend_col: str = 'log_returns', _detrend_value_col: str | Series | None = 'window_mean', *, _sort: bool = False) -> DataFrame | LazyFrame | Series

Context: Toolbox || Category: Helpers || Command: detrend.

This is a DUMB command. It can be used in any CONTEXT or CATEGORY.

Detrends a column in a DataFrame, LazyFrame, or Series by subtracting the values of another column from it. Optionally sorts the data by 'symbol' and 'date' before detrending if _sort is True.

Parameters:

Name Type Description Default
data Union[DataFrame, LazyFrame, Series]

The data structure containing the columns to be processed.

required
_detrend_col str

The name of the column from which values will be subtracted.

'log_returns'
_detrend_value_col str | Series | None

The name of the column whose values will be subtracted OR if you pass a pl.Series to the data parameter, then you can use this to pass a second pl.Series to subtract from the first.

'window_mean'
_sort bool

If True, sorts the data by 'symbol' and 'date' before detrending. Default is False.

False

Returns:

Type Description
Union[DataFrame, LazyFrame, Series]

The detrended data structure with the same type as the input, with an added column named f"detrended_{_detrend_col}".

Notes

Function doesn't use .over() in calculation. Once the data is sorted, subtracting _detrend_value_col from _detrend_col is a simple operation that doesn't need to be grouped, because the sorting has already aligned the rows for subtraction

Source code in src/humbldata/toolbox/toolbox_helpers.py
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
def detrend(
    data: pl.DataFrame | pl.LazyFrame | pl.Series,
    _detrend_col: str = "log_returns",
    _detrend_value_col: str | pl.Series | None = "window_mean",
    *,
    _sort: bool = False,
) -> pl.DataFrame | pl.LazyFrame | pl.Series:
    """
    Context: Toolbox || Category: Helpers || **Command: detrend**.

    This is a DUMB command. It can be used in any CONTEXT or CATEGORY.

    Detrends a column in a DataFrame, LazyFrame, or Series by subtracting the
    values of another column from it. Optionally sorts the data by 'symbol' and
    'date' before detrending if _sort is True.

    Parameters
    ----------
    data : Union[pl.DataFrame, pl.LazyFrame, pl.Series]
        The data structure containing the columns to be processed.
    _detrend_col : str
        The name of the column from which values will be subtracted.
    _detrend_value_col : str | pl.Series | None, optional
        The name of the column whose values will be subtracted OR if you
        pass a pl.Series to the `data` parameter, then you can use this to
        pass a second `pl.Series` to subtract from the first.
    _sort : bool, optional
        If True, sorts the data by 'symbol' and 'date' before detrending.
        Default is False.

    Returns
    -------
    Union[pl.DataFrame, pl.LazyFrame, pl.Series]
        The detrended data structure with the same type as the input,
        with an added column named `f"detrended_{_detrend_col}"`.

    Notes
    -----
    Function doesn't use `.over()` in calculation. Once the data is sorted,
    subtracting _detrend_value_col from _detrend_col is a simple operation
    that doesn't need to be grouped, because the sorting has already aligned
    the rows for subtraction
    """
    if isinstance(data, pl.DataFrame | pl.LazyFrame):
        sort_cols = _set_sort_cols(data, "symbol", "date")
        if _sort and sort_cols:
            data = data.sort(sort_cols)
            for col in sort_cols:
                data = data.set_sorted(col)
        elif _sort and not sort_cols:
            msg = "Data must contain 'symbol' and 'date' columns for sorting."
            raise HumblDataError(msg)

    if isinstance(data, pl.DataFrame | pl.LazyFrame):
        col_names = data.collect_schema().names()
        if _detrend_value_col not in col_names or _detrend_col not in col_names:
            msg = f"Both {_detrend_value_col} and {_detrend_col} must be columns in the data."
            raise HumblDataError(msg)
        detrended = data.with_columns(
            (pl.col(_detrend_col) - pl.col(_detrend_value_col)).alias(
                f"detrended_{_detrend_col}"
            )
        )
    elif isinstance(data, pl.Series):
        if not isinstance(_detrend_value_col, pl.Series):
            msg = "When 'data' is a Series, '_detrend_value_col' must also be a Series."
            raise HumblDataError(msg)
        detrended = data - _detrend_value_col
        detrended.rename(f"detrended_{_detrend_col}")

    return detrended

humbldata.toolbox.toolbox_helpers.cum_sum ¤

cum_sum(data: DataFrame | LazyFrame | Series | None = None, _column_name: str = 'detrended_returns', *, _sort: bool = True, _mandelbrot_usage: bool = True) -> LazyFrame | DataFrame | Series

Context: Toolbox || Category: Helpers || Command: cum_sum.

This is a DUMB command. It can be used in any CONTEXT or CATEGORY.

Calculate the cumulative sum of a series or column in a DataFrame or LazyFrame.

Parameters:

Name Type Description Default
data DataFrame | LazyFrame | Series | None

The data to process.

None
_column_name str

The name of the column to calculate the cumulative sum on, applicable if df is provided.

'detrended_returns'
_sort bool

If True, sorts the DataFrame or LazyFrame by date and symbol before calculation. Default is True.

True
_mandelbrot_usage bool

If True, performs additional checks specific to the Mandelbrot Channel calculation. This should be set to True when you have a cumulative deviate series, and False when not. Please check 'Notes' for more information. Default is True.

True

Returns:

Type Description
DataFrame | LazyFrame | Series

The DataFrame or Series with the cumulative deviate series added as a new column or as itself.

Notes

This function is used to calculate the cumulative sum for the deviate series of detrended returns for the data in the pipeline for calc_mandelbrot_channel.

So, although it is calculating a cumulative sum, it is known as a cumulative deviate because it is a cumulative sum on a deviate series, meaning that the cumulative sum should = 0 for each window. The _mandelbrot_usage parameter allows for checks to ensure the data is suitable for Mandelbrot Channel calculations, i.e that the deviate series was calculated correctly by the end of each series being 0, meaning the trend (the mean over the window_index) was successfully removed from the data.

Source code in src/humbldata/toolbox/toolbox_helpers.py
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
def cum_sum(
    data: pl.DataFrame | pl.LazyFrame | pl.Series | None = None,
    _column_name: str = "detrended_returns",
    *,
    _sort: bool = True,
    _mandelbrot_usage: bool = True,
) -> pl.LazyFrame | pl.DataFrame | pl.Series:
    """
    Context: Toolbox || Category: Helpers || **Command: cum_sum**.

    This is a DUMB command. It can be used in any CONTEXT or CATEGORY.

    Calculate the cumulative sum of a series or column in a DataFrame or
    LazyFrame.

    Parameters
    ----------
    data : pl.DataFrame | pl.LazyFrame | pl.Series | None
        The data to process.
    _column_name : str
        The name of the column to calculate the cumulative sum on,
        applicable if df is provided.
    _sort : bool, optional
        If True, sorts the DataFrame or LazyFrame by date and symbol before
        calculation. Default is True.
    _mandelbrot_usage : bool, optional
        If True, performs additional checks specific to the Mandelbrot Channel
        calculation. This should be set to True when you have a cumulative
        deviate series, and False when not. Please check 'Notes' for more
        information. Default is True.

    Returns
    -------
    pl.DataFrame | pl.LazyFrame | pl.Series
        The DataFrame or Series with the cumulative deviate series added as a
        new column or as itself.

    Notes
    -----
    This function is used to calculate the cumulative sum for the deviate series
    of detrended returns for the data in the pipeline for
    `calc_mandelbrot_channel`.

    So, although it is calculating a cumulative sum, it is known as a cumulative
    deviate because it is a cumulative sum on a deviate series, meaning that the
    cumulative sum should = 0 for each window. The _mandelbrot_usage parameter
    allows for checks to ensure the data is suitable for Mandelbrot Channel
    calculations, i.e that the deviate series was calculated correctly by the
    end of each series being 0, meaning the trend (the mean over the
    window_index) was successfully removed from the data.
    """
    if isinstance(data, pl.DataFrame | pl.LazyFrame):
        sort_cols = _set_sort_cols(data, "symbol", "date")
        if _sort and sort_cols:
            data = data.sort(sort_cols)
            for col in sort_cols:
                data = data.set_sorted(col)

        over_cols = _set_over_cols(data, "symbol", "window_index")
        if over_cols:
            out = data.with_columns(
                pl.col(_column_name).cum_sum().over(over_cols).alias("cum_sum")
            )
        else:
            out = data.with_columns(
                pl.col(_column_name).cum_sum().alias("cum_sum")
            )
    elif isinstance(data, pl.Series):
        out = data.cum_sum().alias("cum_sum")
    else:
        msg = "No DataFrame/LazyFrame/Series was provided."
        raise HumblDataError(msg)

    if _mandelbrot_usage:
        _cumsum_check(out, _column_name="cum_sum")

    return out

humbldata.toolbox.toolbox_helpers.std ¤

std(data: LazyFrame | DataFrame | Series, _column_name: str = 'cum_sum', *, _sort: bool = True) -> LazyFrame | DataFrame | Series

Context: Toolbox || Category: Helpers || Command: std.

Calculate the standard deviation of the cumulative deviate series within each window of the dataset.

Parameters:

Name Type Description Default
df LazyFrame

The LazyFrame from which to calculate the standard deviation.

required
_column_name str

The name of the column from which to calculate the standard deviation, with "cumdev" as the default value.

'cum_sum'
_sort bool

If True, sorts the DataFrame or LazyFrame by date and symbol before calculation. Default is True.

True

Returns:

Type Description
LazyFrame

A LazyFrame with the standard deviation of the specified column for each window, added as a new column named "S".

Improvements

Just need to parametrize .over() call in the function if want an even dumber function, that doesn't calculate each window_index.

Source code in src/humbldata/toolbox/toolbox_helpers.py
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
def std(
    data: pl.LazyFrame | pl.DataFrame | pl.Series,
    _column_name: str = "cum_sum",
    *,
    _sort: bool = True,
) -> pl.LazyFrame | pl.DataFrame | pl.Series:
    """
    Context: Toolbox || Category: Helpers || **Command: std**.

    Calculate the standard deviation of the cumulative deviate series within
    each window of the dataset.

    Parameters
    ----------
    df : pl.LazyFrame
        The LazyFrame from which to calculate the standard deviation.
    _column_name : str, optional
        The name of the column from which to calculate the standard deviation,
        with "cumdev" as the default value.
    _sort : bool, optional
        If True, sorts the DataFrame or LazyFrame by date and symbol before
        calculation. Default is True.

    Returns
    -------
    pl.LazyFrame
        A LazyFrame with the standard deviation of the specified column for each
        window, added as a new column named "S".

    Improvements
    -----------
    Just need to parametrize `.over()` call in the function if want an even
    dumber function, that doesn't calculate each `window_index`.
    """
    if isinstance(data, pl.Series):
        out = data.std()
    elif isinstance(data, pl.DataFrame | pl.LazyFrame):
        sort_cols = _set_sort_cols(data, "symbol", "date")
        over_cols = _set_over_cols(data, "symbol", "window_index")
        if _sort and sort_cols:
            data = data.sort(sort_cols)
            for col in sort_cols:
                data = data.set_sorted(col)

        if over_cols:
            out = data.with_columns(
                [
                    pl.col(_column_name)
                    .std()
                    .over(over_cols)
                    .alias(f"{_column_name}_std"),  # used to be 'S'
                ]
            )
        else:
            out = data.with_columns(
                pl.col(_column_name).std().alias("S"),
            )

    return out

humbldata.toolbox.toolbox_helpers.mean ¤

mean(data: DataFrame | LazyFrame | Series, _column_name: str = 'log_returns', *, _sort: bool = True) -> DataFrame | LazyFrame

Context: Toolbox || Category: Helpers || Function: mean.

This is a DUMB command. It can be used in any CONTEXT or CATEGORY.

This function calculates the mean of a column (<_column_name>) over a each window in the dataset, if there are any. This window is intended to be the window that is passed in the calc_mandelbrot_channel() function. The mean calculated is meant to be used as the mean of each window within the time series. This way, each block of windows has their own mean, which can then be used to normalize the data (i.e remove the mean) from each window section.

Parameters:

Name Type Description Default
data DataFrame | LazyFrame

The DataFrame or LazyFrame to calculate the mean on.

required
_column_name str

The name of the column to calculate the mean on.

'log_returns'
_sort bool

If True, sorts the DataFrame or LazyFrame by date before calculation. Default is False.

True

Returns:

Type Description
DataFrame | LazyFrame

The original DataFrame or LazyFrame with a window_mean & date column, which contains the mean of 'log_returns' per range/window.

Notes

Since this function is an aggregation function, it reduces the # of observations in the dataset,thus, unless I take each value and iterate each window_mean value to correlate to the row in the original dataframe, the function will return a dataframe WITHOUT the original data.

Source code in src/humbldata/toolbox/toolbox_helpers.py
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
def mean(
    data: pl.DataFrame | pl.LazyFrame | pl.Series,
    _column_name: str = "log_returns",
    *,
    _sort: bool = True,
) -> pl.DataFrame | pl.LazyFrame:
    """
    Context: Toolbox || Category: Helpers || **Function: mean**.

    This is a DUMB command. It can be used in any CONTEXT or CATEGORY.

    This function calculates the mean of a column (<_column_name>) over a
    each window in the dataset, if there are any.
    This window is intended to be the `window` that is passed in the
    `calc_mandelbrot_channel()` function. The mean calculated is meant to be
    used as the mean of each `window` within the time series. This
    way, each block of windows has their own mean, which can then be used to
    normalize the data (i.e remove the mean) from each window section.

    Parameters
    ----------
    data : pl.DataFrame | pl.LazyFrame
        The DataFrame or LazyFrame to calculate the mean on.
    _column_name : str
        The name of the column to calculate the mean on.
    _sort : bool
        If True, sorts the DataFrame or LazyFrame by date before calculation.
        Default is False.

    Returns
    -------
    pl.DataFrame | pl.LazyFrame
        The original DataFrame or LazyFrame with a `window_mean` & `date` column,
        which contains the mean of 'log_returns' per range/window.


    Notes
    -----
    Since this function is an aggregation function, it reduces the # of
    observations in the dataset,thus, unless I take each value and iterate each
    window_mean value to correlate to the row in the original dataframe, the
    function will return a dataframe WITHOUT the original data.

    """
    if isinstance(data, pl.Series):
        out = data.mean()
    else:
        if data is None:
            msg = "No DataFrame was passed to the `mean()` function."
            raise HumblDataError(msg)
        sort_cols = _set_sort_cols(data, "symbol", "date")
        over_cols = _set_over_cols(data, "symbol", "window_index")
        if _sort and sort_cols:  # Check if _sort is True
            data = data.sort(sort_cols)
            for col in sort_cols:
                data = data.set_sorted(col)
        if over_cols:
            out = data.with_columns(
                pl.col(_column_name).mean().over(over_cols).alias("window_mean")
            )
        else:
            out = data.with_columns(pl.col(_column_name).mean().alias("mean"))
        if _sort and sort_cols:
            out = out.sort(sort_cols)
    return out

humbldata.toolbox.toolbox_helpers.range_ ¤

range_(data: LazyFrame | DataFrame | Series, _column_name: str = 'cum_sum', *, _sort: bool = True) -> LazyFrame | DataFrame | Series

Context: Toolbox || Category: Technical || Sub-Category: MandelBrot Channel || Sub-Category: Helpers || Function: mandelbrot_range.

Calculate the range (max - min) of the cumulative deviate values of a specified column in a DataFrame for each window in the dataset, if there are any.

Parameters:

Name Type Description Default
data LazyFrame

The DataFrame to calculate the range from.

required
_column_name str

The column to calculate the range from, by default "cumdev".

'cum_sum'

Returns:

Type Description
LazyFrame | DataFrame

A DataFrame with the range of the specified column for each window.

Source code in src/humbldata/toolbox/toolbox_helpers.py
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
def range_(
    data: pl.LazyFrame | pl.DataFrame | pl.Series,
    _column_name: str = "cum_sum",
    *,
    _sort: bool = True,
) -> pl.LazyFrame | pl.DataFrame | pl.Series:
    """
    Context: Toolbox || Category: Technical || Sub-Category: MandelBrot Channel || Sub-Category: Helpers || **Function: mandelbrot_range**.

    Calculate the range (max - min) of the cumulative deviate values of a
    specified column in a DataFrame for each window in the dataset, if there are any.

    Parameters
    ----------
    data : pl.LazyFrame
        The DataFrame to calculate the range from.
    _column_name : str, optional
        The column to calculate the range from, by default "cumdev".

    Returns
    -------
    pl.LazyFrame | pl.DataFrame
        A DataFrame with the range of the specified column for each window.
    """
    if isinstance(data, pl.Series):
        out = data.max() - data.min()

    if isinstance(data, pl.LazyFrame | pl.DataFrame):
        sort_cols = _set_sort_cols(data, "symbol", "date")
        over_cols = _set_over_cols(data, "symbol", "window_index")
        if _sort and sort_cols:
            data = data.sort(sort_cols)
            for col in sort_cols:
                data = data.set_sorted(col)
        if over_cols:
            out = (
                data.with_columns(
                    [
                        pl.col(_column_name)
                        .min()
                        .over(over_cols)
                        .alias(f"{_column_name}_min"),
                        pl.col(_column_name)
                        .max()
                        .over(over_cols)
                        .alias(f"{_column_name}_max"),
                    ]
                )
                .sort(sort_cols)
                .with_columns(
                    (
                        pl.col(f"{_column_name}_max")
                        - pl.col(f"{_column_name}_min")
                    ).alias(f"{_column_name}_range"),  # used to be 'R'
                )
            )
    else:
        out = (
            data.with_columns(
                [
                    pl.col(_column_name).min().alias(f"{_column_name}_min"),
                    pl.col(_column_name).max().alias(f"{_column_name}_max"),
                ]
            )
            .sort(sort_cols)
            .with_columns(
                (
                    pl.col(f"{_column_name}_max")
                    - pl.col(f"{_column_name}_min")
                ).alias(f"{_column_name}_range"),
            )
        )

    return out

humbldata.toolbox.toolbox_controller ¤

Context: Toolbox.

The Toolbox Controller Module.

humbldata.toolbox.toolbox_controller.Toolbox ¤

Bases: ToolboxQueryParams

A top-level controller for data analysis tools in humblDATA.

This module serves as the primary controller, routing user-specified ToolboxQueryParams as core arguments that are used to fetch time series data.

The Toolbox controller also gives access to all sub-modules adn their functions.

It is designed to facilitate the collection of data across various types such as stocks, options, or alternative time series by requiring minimal input from the user.

Submodules

The Toolbox controller is composed of the following submodules:

  • technical:
  • quantitative:
  • fundamental:

Parameters:

Name Type Description Default
symbol str

The symbol or ticker of the stock.

required
interval str

The interval of the data. Defaults to '1d'.

required
start_date str

The start date for the data query.

required
end_date str

The end date for the data query.

required
provider str

The provider to use for the data query. Defaults to 'yfinance'.

required
Parameter Notes

The parameters (symbol, interval, start_date, end_date) are the ToolboxQueryParams. They are used for data collection further down the pipeline in other commands. Intended to execute operations on core data sets. This approach enables composable and standardized querying while accommodating data-specific collection logic.

Source code in src/humbldata/toolbox/toolbox_controller.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
class Toolbox(ToolboxQueryParams):
    """

    A top-level <context> controller for data analysis tools in `humblDATA`.

    This module serves as the primary controller, routing user-specified
    ToolboxQueryParams as core arguments that are used to fetch time series
    data.

    The `Toolbox` controller also gives access to all sub-modules adn their
    functions.

    It is designed to facilitate the collection of data across various types such as
    stocks, options, or alternative time series by requiring minimal input from the user.

    Submodules
    ----------
    The `Toolbox` controller is composed of the following submodules:

    - `technical`:
    - `quantitative`:
    - `fundamental`:

    Parameters
    ----------
    symbol : str
        The symbol or ticker of the stock.
    interval : str, optional
        The interval of the data. Defaults to '1d'.
    start_date : str
        The start date for the data query.
    end_date : str
        The end date for the data query.
    provider : str, optional
        The provider to use for the data query. Defaults to 'yfinance'.

    Parameter Notes
    -----
    The parameters (`symbol`, `interval`, `start_date`, `end_date`)
    are the `ToolboxQueryParams`. They are used for data collection further
    down the pipeline in other commands. Intended to execute operations on core
    data sets. This approach enables composable and standardized querying while
    accommodating data-specific collection logic.
    """

    def __init__(self, *args, **kwargs):
        """
        Initialize the Toolbox module.

        This method does not take any parameters and does not return anything.
        """
        super().__init__(*args, **kwargs)

    @property
    def technical(self):
        """
        The technical submodule of the Toolbox controller.

        Access to all the technical indicators. WHen the Toolbox class is
        instatiated the parameters are initialized with the ToolboxQueryParams
        class, which hold all the fields needed for the context_params, like the
        symbol, interval, start_date, and end_date.
        """
        return Technical(context_params=self)
humbldata.toolbox.toolbox_controller.Toolbox.__init__ ¤
__init__(*args, **kwargs)

Initialize the Toolbox module.

This method does not take any parameters and does not return anything.

Source code in src/humbldata/toolbox/toolbox_controller.py
56
57
58
59
60
61
62
def __init__(self, *args, **kwargs):
    """
    Initialize the Toolbox module.

    This method does not take any parameters and does not return anything.
    """
    super().__init__(*args, **kwargs)
humbldata.toolbox.toolbox_controller.Toolbox.technical property ¤
technical

The technical submodule of the Toolbox controller.

Access to all the technical indicators. WHen the Toolbox class is instatiated the parameters are initialized with the ToolboxQueryParams class, which hold all the fields needed for the context_params, like the symbol, interval, start_date, and end_date.

humbldata.toolbox.fundamental ¤

Context: Toolbox || Category: Fundamental.

A category to group all of the fundamental indicators available in the Toolbox().

Fundamental indicators relies on earnings data, valuation models of companies, balance sheet metrics etc...

humbldata.toolbox.quantitative ¤

Context: Toolbox || Category: Quantitative.

Quantitative indicators rely on statistical transformations of time series data.

humbldata.toolbox.technical ¤

humbldata.toolbox.technical.technical_controller ¤

Context: Toolbox || Category: Technical.

A controller to manage and compile all of the technical indicator models available. This will be passed as a @property to the Toolbox() class, giving access to the technical module and its functions.

humbldata.toolbox.technical.technical_controller.Technical ¤

Module for all technical analysis.

Attributes:

Name Type Description
context_params ToolboxQueryParams

The standard query parameters for toolbox data.

Methods:

Name Description
mandelbrot_channel

Calculate the rescaled range statistics.

Source code in src/humbldata/toolbox/technical/technical_controller.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
class Technical:
    """
    Module for all technical analysis.

    Attributes
    ----------
    context_params : ToolboxQueryParams
        The standard query parameters for toolbox data.

    Methods
    -------
    mandelbrot_channel(command_params: MandelbrotChannelQueryParams)
        Calculate the rescaled range statistics.

    """

    def __init__(self, context_params: ToolboxQueryParams):
        self.context_params = context_params

    def mandelbrot_channel(self, **kwargs: MandelbrotChannelQueryParams):
        """
        Calculate the Mandelbrot Channel.

        Parameters
        ----------
        window : str, optional
            The width of the window used for splitting the data into sections for
            detrending. Defaults to "1mo".
        rv_adjustment : bool, optional
            Whether to adjust the calculation for realized volatility. If True, the
            data is filtered to only include observations in the same volatility bucket
            that the stock is currently in. Defaults to True.
        rv_method : str, optional
            The method to calculate the realized volatility. Only need to define
            when rv_adjustment is True. Defaults to "std".
        rs_method : Literal["RS", "RS_min", "RS_max", "RS_mean"], optional
            The method to use for Range/STD calculation. This is either min, max
            or mean of all RS ranges per window. If not defined, just used the
            most recent RS window. Defaults to "RS".
        rv_grouped_mean : bool, optional
            Whether to calculate the mean value of realized volatility over
            multiple window lengths. Defaults to False.
        live_price : bool, optional
            Whether to calculate the ranges using the current live price, or the
            most recent 'close' observation. Defaults to False.
        historical : bool, optional
            Whether to calculate the Historical Mandelbrot Channel (over-time), and
            return a time-series of channels from the start to the end date. If
            False, the Mandelbrot Channel calculation is done aggregating all of the
            data into one observation. If True, then it will enable daily
            observations over-time. Defaults to False.
        chart : bool, optional
            Whether to return a chart object. Defaults to False.
        template : str, optional
            The template/theme to use for the plotly figure. Defaults to "humbl_dark".

        Returns
        -------
        HumblObject
            An object containing the Mandelbrot Channel data and metadata.
        """
        from humbldata.core.standard_models.toolbox.technical.mandelbrot_channel import (
            MandelbrotChannelFetcher,
        )

        # Instantiate the Fetcher with the query parameters
        fetcher = MandelbrotChannelFetcher(
            context_params=self.context_params, command_params=kwargs
        )

        # Use the fetcher to get the data
        return fetcher.fetch_data()
humbldata.toolbox.technical.technical_controller.Technical.mandelbrot_channel ¤
mandelbrot_channel(**kwargs: MandelbrotChannelQueryParams)

Calculate the Mandelbrot Channel.

Parameters:

Name Type Description Default
window str

The width of the window used for splitting the data into sections for detrending. Defaults to "1mo".

required
rv_adjustment bool

Whether to adjust the calculation for realized volatility. If True, the data is filtered to only include observations in the same volatility bucket that the stock is currently in. Defaults to True.

required
rv_method str

The method to calculate the realized volatility. Only need to define when rv_adjustment is True. Defaults to "std".

required
rs_method Literal[RS, RS_min, RS_max, RS_mean]

The method to use for Range/STD calculation. This is either min, max or mean of all RS ranges per window. If not defined, just used the most recent RS window. Defaults to "RS".

required
rv_grouped_mean bool

Whether to calculate the mean value of realized volatility over multiple window lengths. Defaults to False.

required
live_price bool

Whether to calculate the ranges using the current live price, or the most recent 'close' observation. Defaults to False.

required
historical bool

Whether to calculate the Historical Mandelbrot Channel (over-time), and return a time-series of channels from the start to the end date. If False, the Mandelbrot Channel calculation is done aggregating all of the data into one observation. If True, then it will enable daily observations over-time. Defaults to False.

required
chart bool

Whether to return a chart object. Defaults to False.

required
template str

The template/theme to use for the plotly figure. Defaults to "humbl_dark".

required

Returns:

Type Description
HumblObject

An object containing the Mandelbrot Channel data and metadata.

Source code in src/humbldata/toolbox/technical/technical_controller.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def mandelbrot_channel(self, **kwargs: MandelbrotChannelQueryParams):
    """
    Calculate the Mandelbrot Channel.

    Parameters
    ----------
    window : str, optional
        The width of the window used for splitting the data into sections for
        detrending. Defaults to "1mo".
    rv_adjustment : bool, optional
        Whether to adjust the calculation for realized volatility. If True, the
        data is filtered to only include observations in the same volatility bucket
        that the stock is currently in. Defaults to True.
    rv_method : str, optional
        The method to calculate the realized volatility. Only need to define
        when rv_adjustment is True. Defaults to "std".
    rs_method : Literal["RS", "RS_min", "RS_max", "RS_mean"], optional
        The method to use for Range/STD calculation. This is either min, max
        or mean of all RS ranges per window. If not defined, just used the
        most recent RS window. Defaults to "RS".
    rv_grouped_mean : bool, optional
        Whether to calculate the mean value of realized volatility over
        multiple window lengths. Defaults to False.
    live_price : bool, optional
        Whether to calculate the ranges using the current live price, or the
        most recent 'close' observation. Defaults to False.
    historical : bool, optional
        Whether to calculate the Historical Mandelbrot Channel (over-time), and
        return a time-series of channels from the start to the end date. If
        False, the Mandelbrot Channel calculation is done aggregating all of the
        data into one observation. If True, then it will enable daily
        observations over-time. Defaults to False.
    chart : bool, optional
        Whether to return a chart object. Defaults to False.
    template : str, optional
        The template/theme to use for the plotly figure. Defaults to "humbl_dark".

    Returns
    -------
    HumblObject
        An object containing the Mandelbrot Channel data and metadata.
    """
    from humbldata.core.standard_models.toolbox.technical.mandelbrot_channel import (
        MandelbrotChannelFetcher,
    )

    # Instantiate the Fetcher with the query parameters
    fetcher = MandelbrotChannelFetcher(
        context_params=self.context_params, command_params=kwargs
    )

    # Use the fetcher to get the data
    return fetcher.fetch_data()

humbldata.toolbox.technical.mandelbrot_channel ¤

humbldata.toolbox.technical.mandelbrot_channel.model ¤

Context: Toolbox || Category: Technical || Command: calc_mandelbrot_channel.

A command to generate a Mandelbrot Channel for any time series.

humbldata.toolbox.technical.mandelbrot_channel.model.calc_mandelbrot_channel ¤
calc_mandelbrot_channel(data: DataFrame | LazyFrame, window: str = '1m', rv_method: str = 'std', rs_method: Literal['RS', 'RS_mean', 'RS_max', 'RS_min'] = 'RS', *, rv_adjustment: bool = True, rv_grouped_mean: bool = True, live_price: bool = True, **kwargs) -> LazyFrame

Context: Toolbox || Category: Technical || Command: calc_mandelbrot_channel.

This command calculates the Mandelbrot Channel for a given time series, utilizing various parameters to adjust the calculation. The Mandelbrot Channel provides insights into the volatility and price range of a stock over a specified window.

Parameters:

Name Type Description Default
data DataFrame | LazyFrame

The time series data for which to calculate the Mandelbrot Channel. There needs to be a close and date column.

required
window str

The window size for the calculation, specified as a string. This determines the period over which the channel is calculated.

'1m'
rv_adjustment bool

Adjusts the calculation for realized volatility. If True, filters the data to include only observations within the current volatility bucket of the stock.

True
rv_grouped_mean bool

Determines whether to use the grouped mean in the realized volatility calculation.

True
rv_method str

Specifies the method for calculating realized volatility, applicable only if rv_adjustment is True.

'std'
rs_method Literal['RS', 'RS_mean', 'RS_max', 'RS_min']

Defines the method for calculating the range over standard deviation, affecting the width of the Mandelbrot Channel. Options include RS, RS_mean, RS_min, and RS_max.

'RS'
live_price bool

Indicates whether to incorporate live price data into the calculation, which may extend the calculation time by 1-3 seconds.

True
**kwargs

Additional keyword arguments to pass to the function, if you want to change the behavior or pass parameters to internal functions.

{}

Returns:

Type Description
LazyFrame

A LazyFrame containing the calculated Mandelbrot Channel data for the specified time series.

Notes

The function returns a pl.LazyFrame; remember to call .collect() on the result to obtain a DataFrame. This lazy evaluation strategy postpones the calculation until it is explicitly requested.

Example

To calculate the Mandelbrot Channel for a yearly window with adjustments for realized volatility using the 'yz' method, and incorporating live price data:

mandelbrot_channel = calc_mandelbrot_channel(
    data,
    window="1y",
    rv_adjustment=True,
    rv_method="yz",
    rv_grouped_mean=False,
    rs_method="RS",
    live_price=True
).collect()
Source code in src/humbldata/toolbox/technical/mandelbrot_channel/model.py
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
def calc_mandelbrot_channel(  # noqa: PLR0913
    data: pl.DataFrame | pl.LazyFrame,
    window: str = "1m",
    rv_method: str = "std",
    rs_method: Literal["RS", "RS_mean", "RS_max", "RS_min"] = "RS",
    *,
    rv_adjustment: bool = True,
    rv_grouped_mean: bool = True,
    live_price: bool = True,
    **kwargs,
) -> pl.LazyFrame:
    """
    Context: Toolbox || Category: Technical || **Command: calc_mandelbrot_channel**.

    This command calculates the Mandelbrot Channel for a given time series, utilizing various parameters to adjust the calculation. The Mandelbrot Channel provides insights into the volatility and price range of a stock over a specified window.

    Parameters
    ----------
    data: pl.DataFrame | pl.LazyFrame
        The time series data for which to calculate the Mandelbrot Channel.
        There needs to be a `close` and `date` column.
    window: str, default "1m"
        The window size for the calculation, specified as a string. This
        determines the period over which the channel is calculated.
    rv_adjustment: bool, default True
        Adjusts the calculation for realized volatility. If True, filters the
        data to include only observations within the current volatility bucket
        of the stock.
    rv_grouped_mean: bool, default True
        Determines whether to use the grouped mean in the realized volatility
        calculation.
    rv_method: str, default "std"
        Specifies the method for calculating realized volatility, applicable
        only if `rv_adjustment` is True.
    rs_method: str, default "RS"
        Defines the method for calculating the range over standard deviation,
        affecting the width of the Mandelbrot Channel. Options include RS,
        RS_mean, RS_min, and RS_max.
    live_price: bool, default True
        Indicates whether to incorporate live price data into the calculation,
        which may extend the calculation time by 1-3 seconds.
    **kwargs
        Additional keyword arguments to pass to the function, if you want to
        change the behavior or pass parameters to internal functions.

    Returns
    -------
    pl.LazyFrame
        A LazyFrame containing the calculated Mandelbrot Channel data for the specified time series.

    Notes
    -----
    The function returns a pl.LazyFrame; remember to call `.collect()` on the result to obtain a DataFrame. This lazy evaluation strategy postpones the calculation until it is explicitly requested.

    Example
    -------
    To calculate the Mandelbrot Channel for a yearly window with adjustments for realized volatility using the 'yz' method, and incorporating live price data:

    ```python
    mandelbrot_channel = calc_mandelbrot_channel(
        data,
        window="1y",
        rv_adjustment=True,
        rv_method="yz",
        rv_grouped_mean=False,
        rs_method="RS",
        live_price=True
    ).collect()
    ```
    """
    # Setup ====================================================================
    # window_datetime = _window_format(window, _return_timedelta=True)
    sort_cols = _set_sort_cols(data, "symbol", "date")

    data = data.lazy()
    # Step 1: Collect Price Data -----------------------------------------------
    # Step X: Add window bins --------------------------------------------------
    # We want date grouping, non-overlapping window bins
    data1 = add_window_index(data, window=window)

    # Step X: Calculate Log Returns + Rvol -------------------------------------
    if "log_returns" not in data1.collect_schema().names():
        data2 = log_returns(data1, _column_name="close")
    else:
        data2 = data1

    # Step X: Calculate Log Mean Series ----------------------------------------
    if isinstance(data2, pl.DataFrame | pl.LazyFrame):
        data3 = mean(data2)
    else:
        msg = "A series was passed to `mean()` calculation. Please provide a DataFrame or LazyFrame."
        raise HumblDataError(msg)
    # Step X: Calculate Mean De-trended Series ---------------------------------
    data4 = detrend(
        data3, _detrend_value_col="window_mean", _detrend_col="log_returns"
    )
    # Step X: Calculate Cumulative Deviate Series ------------------------------
    data5 = cum_sum(data4, _column_name="detrended_log_returns")
    # Step X: Calculate Mandelbrot Range ---------------------------------------
    data6 = range_(data5, _column_name="cum_sum")
    # Step X: Calculate Standard Deviation -------------------------------------
    data7 = std(data6, _column_name="cum_sum")
    # Step X: Calculate Range (R) & Standard Deviation (S) ---------------------
    if rv_adjustment:
        # Step 8.1: Calculate Realized Volatility ------------------------------
        data7 = calc_realized_volatility(
            data=data7,
            window=window,
            method=rv_method,
            grouped_mean=rv_grouped_mean,
        )
        # rename col for easy selection
        for col in data7.collect_schema().names():
            if "volatility_pct" in col:
                data7 = data7.rename({col: "realized_volatility"})
        # Step 8.2: Calculate Volatility Bucket Stats --------------------------
        data7 = vol_buckets(data=data7, lo_quantile=0.3, hi_quantile=0.65)
        data7 = vol_filter(
            data7
        )  # removes rows that arent in the same vol bucket

    # Step X: Calculate RS -----------------------------------------------------
    data8 = data7.sort(sort_cols).with_columns(
        (pl.col("cum_sum_range") / pl.col("cum_sum_std")).alias("RS")
    )

    # Step X: Collect Recent Prices --------------------------------------------
    if live_price:
        symbols = (
            data.select("symbol").unique().sort("symbol").collect().to_series()
        )
        recent_prices = get_latest_price(symbols)
    else:
        recent_prices = None

    # Step X: Calculate Rescaled Price Range ----------------------------------
    out = price_range(
        data=data8,
        recent_price_data=recent_prices,
        rs_method=rs_method,
        _rv_adjustment=rv_adjustment,
    )

    return out
humbldata.toolbox.technical.mandelbrot_channel.model.acalc_mandelbrot_channel async ¤
acalc_mandelbrot_channel(data: DataFrame | LazyFrame, window: str = '1m', rv_method: str = 'std', rs_method: Literal['RS', 'RS_mean', 'RS_max', 'RS_min'] = 'RS', *, rv_adjustment: bool = True, rv_grouped_mean: bool = True, live_price: bool = True, **kwargs) -> DataFrame | LazyFrame

Context: Toolbox || Category: Technical || Sub-Category: Mandelbrot Channel || Command: acalc_mandelbrot_channel.

Asynchronous wrapper for calc_mandelbrot_channel. This function allows calc_mandelbrot_channel to be called in an async context.

Notes

This does not make calc_mandelbrot_channel() non-blocking or asynchronous.

Source code in src/humbldata/toolbox/technical/mandelbrot_channel/model.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
async def acalc_mandelbrot_channel(  # noqa: PLR0913
    data: pl.DataFrame | pl.LazyFrame,
    window: str = "1m",
    rv_method: str = "std",
    rs_method: Literal["RS", "RS_mean", "RS_max", "RS_min"] = "RS",
    *,
    rv_adjustment: bool = True,
    rv_grouped_mean: bool = True,
    live_price: bool = True,
    **kwargs,
) -> pl.DataFrame | pl.LazyFrame:
    """
    Context: Toolbox || Category: Technical || Sub-Category: Mandelbrot Channel || **Command: acalc_mandelbrot_channel**.

    Asynchronous wrapper for calc_mandelbrot_channel.
    This function allows calc_mandelbrot_channel to be called in an async context.

    Notes
    -----
    This does not make `calc_mandelbrot_channel()` non-blocking or asynchronous.
    """
    # Directly call the synchronous calc_mandelbrot_channel function

    return calc_mandelbrot_channel(
        data=data,
        window=window,
        rv_adjustment=rv_adjustment,
        rv_method=rv_method,
        rs_method=rs_method,
        rv_grouped_mean=rv_grouped_mean,
        live_price=live_price,
        **kwargs,
    )
humbldata.toolbox.technical.mandelbrot_channel.model.calc_mandelbrot_channel_historical ¤
calc_mandelbrot_channel_historical(data: DataFrame | LazyFrame, window: str = '1m', rv_method: str = 'std', rs_method: Literal['RS', 'RS_mean', 'RS_max', 'RS_min'] = 'RS', *, rv_adjustment: bool = True, rv_grouped_mean: bool = True, live_price: bool = True, **kwargs) -> LazyFrame

Context: Toolbox || Category: Technical || Sub-Category: Mandelbrot Channel || Command: calc_mandelbrot_channel_historical.

This function calculates the Mandelbrot Channel for historical data.

Synchronous wrapper for the asynchronous Mandelbrot Channel historical calculation.

Parameters:

Name Type Description Default
The
required
Please
required
description
required

Returns:

Type Description
LazyFrame

A LazyFrame containing the historical Mandelbrot Channel calculations.

Source code in src/humbldata/toolbox/technical/mandelbrot_channel/model.py
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
def calc_mandelbrot_channel_historical(  # noqa: PLR0913
    data: pl.DataFrame | pl.LazyFrame,
    window: str = "1m",
    rv_method: str = "std",
    rs_method: Literal["RS", "RS_mean", "RS_max", "RS_min"] = "RS",
    *,
    rv_adjustment: bool = True,
    rv_grouped_mean: bool = True,
    live_price: bool = True,
    **kwargs,
) -> pl.LazyFrame:
    """
    Context: Toolbox || Category: Technical || Sub-Category: Mandelbrot Channel || **Command: calc_mandelbrot_channel_historical**.

    This function calculates the Mandelbrot Channel for historical data.

    Synchronous wrapper for the asynchronous Mandelbrot Channel historical calculation.

    Parameters
    ----------
    The parameters for this function are the same as those for calc_mandelbrot_channel().
    Please refer to the documentation of calc_mandelbrot_channel() for a detailed
    description of each parameter.

    Returns
    -------
    pl.LazyFrame
        A LazyFrame containing the historical Mandelbrot Channel calculations.
    """
    return run_async(
        _acalc_mandelbrot_channel_historical_engine(
            data=data,
            window=window,
            rv_adjustment=rv_adjustment,
            rv_method=rv_method,
            rs_method=rs_method,
            rv_grouped_mean=rv_grouped_mean,
            live_price=live_price,
            **kwargs,
        )
    )
humbldata.toolbox.technical.mandelbrot_channel.model.calc_mandelbrot_channel_historical_mp ¤
calc_mandelbrot_channel_historical_mp(data: DataFrame | LazyFrame, window: str = '1m', rv_adjustment: bool = True, rv_method: str = 'std', rs_method: Literal['RS', 'RS_mean', 'RS_max', 'RS_min'] = 'RS', *, rv_grouped_mean: bool = True, live_price: bool = True, n_processes: int = 1, **kwargs) -> LazyFrame

Calculate the Mandelbrot Channel historically using multiprocessing.

Parameters:

n_processes : int, optional Number of processes to use. If None, it uses all available cores.

Other parameters are the same as calc_mandelbrot_channel_historical.

Source code in src/humbldata/toolbox/technical/mandelbrot_channel/model.py
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
def calc_mandelbrot_channel_historical_mp(
    data: pl.DataFrame | pl.LazyFrame,
    window: str = "1m",
    rv_adjustment: bool = True,
    rv_method: str = "std",
    rs_method: Literal["RS", "RS_mean", "RS_max", "RS_min"] = "RS",
    *,
    rv_grouped_mean: bool = True,
    live_price: bool = True,
    n_processes: int = 1,
    **kwargs,
) -> pl.LazyFrame:
    """
    Calculate the Mandelbrot Channel historically using multiprocessing.

    Parameters:
    -----------
    n_processes : int, optional
        Number of processes to use. If None, it uses all available cores.

    Other parameters are the same as calc_mandelbrot_channel_historical.
    """
    window_days = _window_format(window, _return_timedelta=True)
    start_date = data.lazy().select(pl.col("date")).min().collect().row(0)[0]
    start_date = start_date + window_days
    end_date = data.lazy().select("date").max().collect().row(0)[0]

    if start_date >= end_date:
        msg = f"You set <historical=True> \n\
        This calculation needs *at least* one window of data. \n\
        The (start date + window) is: {start_date} and the dataset ended: {end_date}. \n\
        Please adjust dates accordingly."
        raise HumblDataError(msg)

    dates = (
        data.lazy()
        .select(pl.col("date"))
        .filter(pl.col("date") >= start_date)
        .unique()
        .sort("date")
        .collect()
        .to_series()
    )

    # Prepare the partial function with all arguments except the date
    calc_func = partial(
        _calc_mandelbrot_for_date,
        data=data,
        window=window,
        rv_adjustment=rv_adjustment,
        rv_method=rv_method,
        rs_method=rs_method,
        rv_grouped_mean=rv_grouped_mean,
        live_price=live_price,
        **kwargs,
    )

    # Use multiprocessing to calculate in parallel
    with multiprocessing.Pool(processes=n_processes) as pool:
        results = pool.map(calc_func, dates)

    # Combine results
    out = pl.concat(results, how="vertical").sort(["symbol", "date"])

    return out.lazy()
humbldata.toolbox.technical.mandelbrot_channel.model.calc_mandelbrot_channel_historical_concurrent ¤
calc_mandelbrot_channel_historical_concurrent(data: DataFrame | LazyFrame, window: str = '1m', rv_method: str = 'std', rs_method: Literal['RS', 'RS_mean', 'RS_max', 'RS_min'] = 'RS', *, rv_adjustment: bool = True, rv_grouped_mean: bool = True, live_price: bool = True, max_workers: int | None = None, use_processes: bool = False, **kwargs) -> LazyFrame

Calculate the Mandelbrot Channel historically using concurrent.futures.

Parameters:

max_workers : int, optional Maximum number of workers to use. If None, it uses the default for ProcessPoolExecutor or ThreadPoolExecutor (usually the number of processors on the machine, multiplied by 5). use_processes : bool, default True If True, use ProcessPoolExecutor, otherwise use ThreadPoolExecutor.

Other parameters are the same as calc_mandelbrot_channel_historical.

Source code in src/humbldata/toolbox/technical/mandelbrot_channel/model.py
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
def calc_mandelbrot_channel_historical_concurrent(
    data: pl.DataFrame | pl.LazyFrame,
    window: str = "1m",
    rv_method: str = "std",
    rs_method: Literal["RS", "RS_mean", "RS_max", "RS_min"] = "RS",
    *,
    rv_adjustment: bool = True,
    rv_grouped_mean: bool = True,
    live_price: bool = True,
    max_workers: int | None = None,
    use_processes: bool = False,
    **kwargs,
) -> pl.LazyFrame:
    """
    Calculate the Mandelbrot Channel historically using concurrent.futures.

    Parameters:
    -----------
    max_workers : int, optional
        Maximum number of workers to use. If None, it uses the default for ProcessPoolExecutor
        or ThreadPoolExecutor (usually the number of processors on the machine, multiplied by 5).
    use_processes : bool, default True
        If True, use ProcessPoolExecutor, otherwise use ThreadPoolExecutor.

    Other parameters are the same as calc_mandelbrot_channel_historical.
    """
    window_days = _window_format(window, _return_timedelta=True)
    start_date = data.lazy().select(pl.col("date")).min().collect().row(0)[0]
    start_date = start_date + window_days
    end_date = data.lazy().select("date").max().collect().row(0)[0]

    if start_date >= end_date:
        msg = f"You set <historical=True> \n\
        This calculation needs *at least* one window of data. \n\
        The (start date + window) is: {start_date} and the dataset ended: {end_date}. \n\
        Please adjust dates accordingly."
        raise HumblDataError(msg)

    dates = (
        data.lazy()
        .select(pl.col("date"))
        .filter(pl.col("date") >= start_date)
        .unique()
        .sort("date")
        .collect()
        .to_series()
    )

    # Prepare the partial function with all arguments except the date
    calc_func = partial(
        _calc_mandelbrot_for_date,
        data=data,
        window=window,
        rv_adjustment=rv_adjustment,
        rv_method=rv_method,
        rs_method=rs_method,
        rv_grouped_mean=rv_grouped_mean,
        live_price=live_price,
        **kwargs,
    )

    # Choose the appropriate executor
    executor_class = (
        concurrent.futures.ProcessPoolExecutor
        if use_processes
        else concurrent.futures.ThreadPoolExecutor
    )

    # Use concurrent.futures to calculate in parallel
    with executor_class(max_workers=max_workers) as executor:
        futures = [executor.submit(calc_func, date) for date in dates]
        results = [
            future.result()
            for future in concurrent.futures.as_completed(futures)
        ]

    # Combine results
    out = pl.concat(results, how="vertical").sort(["symbol", "date"])

    return out.lazy()
humbldata.toolbox.technical.mandelbrot_channel.view ¤
humbldata.toolbox.technical.mandelbrot_channel.view.create_historical_plot ¤
create_historical_plot(data: DataFrame, symbol: str, template: ChartTemplate = ChartTemplate.plotly) -> Figure

Generate a historical plot for a given symbol from the provided data.

Parameters:

Name Type Description Default
data DataFrame

The dataframe containing historical data including dates, bottom prices, close prices, and top prices.

required
symbol str

The symbol for which the historical plot is to be generated.

required
template ChartTemplate

The template to be used for styling the plot.

plotly

Returns:

Type Description
Figure

A plotly figure object representing the historical data of the given symbol.

Source code in src/humbldata/toolbox/technical/mandelbrot_channel/view.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def create_historical_plot(
    data: pl.DataFrame,
    symbol: str,
    template: ChartTemplate = ChartTemplate.plotly,
) -> go.Figure:
    """
    Generate a historical plot for a given symbol from the provided data.

    Parameters
    ----------
    data : pl.DataFrame
        The dataframe containing historical data including dates, bottom prices, close prices, and top prices.
    symbol : str
        The symbol for which the historical plot is to be generated.
    template : ChartTemplate
        The template to be used for styling the plot.

    Returns
    -------
    go.Figure
        A plotly figure object representing the historical data of the given symbol.
    """
    filtered_data = data.filter(pl.col("symbol") == symbol)

    fig = go.Figure()
    fig.add_trace(
        go.Scatter(
            x=filtered_data.select("date").to_series(),
            y=filtered_data.select("bottom_price").to_series(),
            name="Bottom Price",
            line=dict(color="green"),
        )
    )
    fig.add_trace(
        go.Scatter(
            x=filtered_data.select("date").to_series(),
            y=filtered_data.select("recent_price").to_series(),
            name="Recent Price",
            line=dict(color="blue"),
        )
    )
    fig.add_trace(
        go.Scatter(
            x=filtered_data.select("date").to_series(),
            y=filtered_data.select("top_price").to_series(),
            name="Top Price",
            line=dict(color="red"),
        )
    )
    fig.update_layout(
        title=f"Historical Mandelbrot Channel for {symbol}",
        xaxis_title="Date",
        yaxis_title="Price",
        template=template,
    )
    return fig
humbldata.toolbox.technical.mandelbrot_channel.view.create_current_plot ¤
create_current_plot(data: DataFrame, equity_data: DataFrame, symbol: str, template: ChartTemplate = ChartTemplate.plotly) -> Figure

Generate a current plot for a given symbol from the provided data and equity data.

Parameters:

Name Type Description Default
data DataFrame

The dataframe containing historical data including top and bottom prices.

required
equity_data DataFrame

The dataframe containing current equity data including dates and close prices.

required
symbol str

The symbol for which the current plot is to be generated.

required
template ChartTemplate

The template to be used for styling the plot.

plotly

Returns:

Type Description
Figure

A plotly figure object representing the current data of the given symbol.

Source code in src/humbldata/toolbox/technical/mandelbrot_channel/view.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def create_current_plot(
    data: pl.DataFrame,
    equity_data: pl.DataFrame,
    symbol: str,
    template: ChartTemplate = ChartTemplate.plotly,
) -> go.Figure:
    """
    Generate a current plot for a given symbol from the provided data and equity data.

    Parameters
    ----------
    data : pl.DataFrame
        The dataframe containing historical data including top and bottom prices.
    equity_data : pl.DataFrame
        The dataframe containing current equity data including dates and close prices.
    symbol : str
        The symbol for which the current plot is to be generated.
    template : ChartTemplate
        The template to be used for styling the plot.

    Returns
    -------
    go.Figure
        A plotly figure object representing the current data of the given symbol.
    """
    filtered_data = data.filter(pl.col("symbol") == symbol)
    equity_data = equity_data.filter(pl.col("symbol") == symbol)
    fig = go.Figure()
    fig.add_trace(
        go.Scatter(
            x=equity_data.select("date").to_series(),
            y=equity_data.select("close").to_series(),
            name="Recent Price",
            line=dict(color="blue"),
        )
    )
    fig.add_hline(
        y=filtered_data.select("top_price").row(0)[0],
        line=dict(color="red", width=2),
        name="Top Price",
    )
    fig.add_hline(
        y=filtered_data.select("bottom_price").row(0)[0],
        line=dict(color="green", width=2),
        name="Bottom Price",
    )
    fig.update_layout(
        title=f"Current Mandelbrot Channel for {symbol}",
        xaxis_title="Date",
        yaxis_title="Price",
        template=template,
    )
    return fig
humbldata.toolbox.technical.mandelbrot_channel.view.is_historical_data ¤
is_historical_data(data: DataFrame) -> bool

Check if the provided dataframe contains historical data based on the uniqueness of dates.

Parameters:

Name Type Description Default
data DataFrame

The dataframe to check for historical data presence.

required

Returns:

Type Description
bool

Returns True if the dataframe contains historical data (more than one unique date), otherwise False.

Source code in src/humbldata/toolbox/technical/mandelbrot_channel/view.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
def is_historical_data(data: pl.DataFrame) -> bool:
    """
    Check if the provided dataframe contains historical data based on the uniqueness of dates.

    Parameters
    ----------
    data : pl.DataFrame
        The dataframe to check for historical data presence.

    Returns
    -------
    bool
        Returns True if the dataframe contains historical data (more than one unique date), otherwise False.
    """
    return data.select("date").to_series().unique().shape[0] > 1
humbldata.toolbox.technical.mandelbrot_channel.view.generate_plot_for_symbol ¤
generate_plot_for_symbol(data: DataFrame, equity_data: DataFrame, symbol: str, template: ChartTemplate = ChartTemplate.plotly) -> Chart

Generate a plot for a specific symbol that is filtered from the original DF.

This function will check if the data provided is a Historical or Current Mandelbrot Channel data. If it is historical, it will generate a historical plot. If it is current, it will generate a current plot.

Parameters:

Name Type Description Default
data DataFrame

The dataframe containing Mandelbrot channel data for all symbols.

required
equity_data DataFrame

The dataframe containing equity data for all symbols.

required
symbol str

The symbol for which to generate the plot.

required
template ChartTemplate

The template/theme to use for the plotly figure. Options are: "humbl_light", "humbl_dark", "plotly_light", "plotly_dark", "ggplot2", "seaborn", "simple_white", "none"

plotly

Returns:

Type Description
Chart

A Chart object containing the generated plot for the specified symbol.

Source code in src/humbldata/toolbox/technical/mandelbrot_channel/view.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
def generate_plot_for_symbol(
    data: pl.DataFrame,
    equity_data: pl.DataFrame,
    symbol: str,
    template: ChartTemplate = ChartTemplate.plotly,
) -> Chart:
    """
    Generate a plot for a specific symbol that is filtered from the original DF.

    This function will check if the data provided is a Historical or Current
    Mandelbrot Channel data. If it is historical, it will generate a historical
    plot. If it is current, it will generate a current plot.

    Parameters
    ----------
    data : pl.DataFrame
        The dataframe containing Mandelbrot channel data for all symbols.
    equity_data : pl.DataFrame
        The dataframe containing equity data for all symbols.
    symbol : str
        The symbol for which to generate the plot.
    template : ChartTemplate
        The template/theme to use for the plotly figure. Options are:
        "humbl_light", "humbl_dark", "plotly_light", "plotly_dark", "ggplot2", "seaborn", "simple_white", "none"

    Returns
    -------
    Chart
        A Chart object containing the generated plot for the specified symbol.

    """
    if is_historical_data(data):
        out = create_historical_plot(data, symbol, template)
    else:
        out = create_current_plot(data, equity_data, symbol, template)

    return Chart(
        content=out.to_json(), fig=out
    )  # TODO: use to_json() instead of to_plotly_json()
humbldata.toolbox.technical.mandelbrot_channel.view.generate_plots ¤
generate_plots(data: LazyFrame, equity_data: LazyFrame, template: ChartTemplate = ChartTemplate.plotly) -> list[Chart]

Context: Toolbox || Category: Technical || Subcategory: Mandelbrot Channel || Command: generate_plots().

Generate plots for each unique symbol in the given dataframes.

Parameters:

Name Type Description Default
data LazyFrame

The LazyFrame containing the symbols and MandelbrotChannelData

required
equity_data LazyFrame

The LazyFrame containing equity data for the symbols.

required
template ChartTemplate

The template/theme to use for the plotly figure.

plotly

Returns:

Type Description
list[Chart]

A list of Chart objects, each representing a plot for a unique symbol.

Source code in src/humbldata/toolbox/technical/mandelbrot_channel/view.py
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
def generate_plots(
    data: pl.LazyFrame,
    equity_data: pl.LazyFrame,
    template: ChartTemplate = ChartTemplate.plotly,
) -> list[Chart]:
    """
    Context: Toolbox || Category: Technical || Subcategory: Mandelbrot Channel || **Command: generate_plots()**.

    Generate plots for each unique symbol in the given dataframes.

    Parameters
    ----------
    data : pl.LazyFrame
        The LazyFrame containing the symbols and MandelbrotChannelData
    equity_data : pl.LazyFrame
        The LazyFrame containing equity data for the symbols.
    template : ChartTemplate
        The template/theme to use for the plotly figure.

    Returns
    -------
    list[Chart]
        A list of Chart objects, each representing a plot for a unique symbol.

    """
    symbols = data.select("symbol").unique().collect().to_series()

    plots = [
        generate_plot_for_symbol(
            data.collect(), equity_data.collect(), symbol, template
        )
        for symbol in symbols
    ]
    return plots
humbldata.toolbox.technical.mandelbrot_channel.helpers ¤

Context: Toolbox || Category: Technical || Sub-Category: MandelBrot Channel || Sub-Category: Helpers.

These Toolbox() helpers are used in various calculations in the toolbox context. Most of the helpers will be mathematical transformations of data. These functions should be DUMB functions.

humbldata.toolbox.technical.mandelbrot_channel.helpers.add_window_index ¤
add_window_index(data: LazyFrame | DataFrame, window: str) -> LazyFrame | DataFrame
Context: Toolbox || Category: MandelBrot Channel || Sub-Category: Helpers || Command: **add_window_index**.

Add a column to the dataframe indicating the window grouping for each row in a time series.

Parameters:

Name Type Description Default
data LazyFrame | DataFrame

The input data frame or lazy frame to which the window index will be added.

required
window str

The window size as a string, used to determine the grouping of rows into windows.

required

Returns:

Type Description
LazyFrame | DataFrame

The original data frame or lazy frame with an additional column named "window_index" indicating the window grouping for each row.

Notes
  • This function is essential for calculating the Mandelbrot Channel, where the dataset is split into numerous 'windows', and statistics are calculated for each window.
  • The function adds a dummy symbol column if the data contains only one symbol, to avoid errors in the group_by_dynamic() function.
  • It is utilized within the log_mean() and calc_mandelbrot_channel() functions for window-based calculations.

Examples:

>>> data = pl.DataFrame({"date": ["2021-01-01", "2021-01-02"], "symbol": ["AAPL", "AAPL"], "value": [1, 2]})
>>> window = "1d"
>>> add_window_index(data, window)
shape: (2, 4)
┌────────────┬────────┬───────┬──────────────┐
│ date       ┆ symbol ┆ value ┆ window_index │
│ ---        ┆ ---    ┆ ---   ┆ ---          │
│ date       ┆ str    ┆ i64   ┆ i64          │
╞════════════╪════════╪═══════╪══════════════╡
│ 2021-01-01 ┆ AAPL   ┆ 1     ┆ 0            │
├────────────┼────────┼───────┼──────────────┤
│ 2021-01-02 ┆ AAPL   ┆ 2     ┆ 1            │
└────────────┴────────┴───────┴──────────────┘
Source code in src/humbldata/toolbox/technical/mandelbrot_channel/helpers.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def add_window_index(
    data: pl.LazyFrame | pl.DataFrame, window: str
) -> pl.LazyFrame | pl.DataFrame:
    """
        Context: Toolbox || Category: MandelBrot Channel || Sub-Category: Helpers || Command: **add_window_index**.

    Add a column to the dataframe indicating the window grouping for each row in
    a time series.

    Parameters
    ----------
    data : pl.LazyFrame | pl.DataFrame
        The input data frame or lazy frame to which the window index will be
        added.
    window : str
        The window size as a string, used to determine the grouping of rows into
        windows.

    Returns
    -------
    pl.LazyFrame | pl.DataFrame
        The original data frame or lazy frame with an additional column named
        "window_index" indicating
        the window grouping for each row.

    Notes
    -----
    - This function is essential for calculating the Mandelbrot Channel, where
    the dataset is split into
    numerous 'windows', and statistics are calculated for each window.
    - The function adds a dummy `symbol` column if the data contains only one
    symbol, to avoid errors in the `group_by_dynamic()` function.
    - It is utilized within the `log_mean()` and `calc_mandelbrot_channel()`
    functions for window-based calculations.

    Examples
    --------
    >>> data = pl.DataFrame({"date": ["2021-01-01", "2021-01-02"], "symbol": ["AAPL", "AAPL"], "value": [1, 2]})
    >>> window = "1d"
    >>> add_window_index(data, window)
    shape: (2, 4)
    ┌────────────┬────────┬───────┬──────────────┐
    │ date       ┆ symbol ┆ value ┆ window_index │
    │ ---        ┆ ---    ┆ ---   ┆ ---          │
    │ date       ┆ str    ┆ i64   ┆ i64          │
    ╞════════════╪════════╪═══════╪══════════════╡
    │ 2021-01-01 ┆ AAPL   ┆ 1     ┆ 0            │
    ├────────────┼────────┼───────┼──────────────┤
    │ 2021-01-02 ┆ AAPL   ┆ 2     ┆ 1            │
    └────────────┴────────┴───────┴──────────────┘
    """

    def _create_monthly_window_index(col: str, k: int = 1):
        year_diff = pl.col(col).last().dt.year() - pl.col(col).dt.year()
        month_diff = pl.col(col).last().dt.month() - pl.col(col).dt.month()
        day_indicator = pl.col(col).dt.day() > pl.col(col).last().dt.day()
        return (12 * year_diff + month_diff - day_indicator) // k

    # Clean the window into standardized strings (i.e "1month"/"1 month" = "1mo")
    window = _window_format(window, _return_timedelta=False)  # returns `str`

    if "w" in window or "d" in window:
        msg = "The window cannot include 'd' or 'w', the window needs to be larger than 1 month!"
        raise HumblDataError(msg)

    window_monthly = _window_format_monthly(window)

    data = data.with_columns(
        _create_monthly_window_index(col="date", k=window_monthly)
        .alias("window_index")
        .over("symbol")
    )

    return data
humbldata.toolbox.technical.mandelbrot_channel.helpers.vol_buckets ¤
vol_buckets(data: DataFrame | LazyFrame, lo_quantile: float = 0.4, hi_quantile: float = 0.8, _column_name_volatility: str = 'realized_volatility', *, _boundary_group_down: bool = False) -> LazyFrame

Context: Toolbox || Category: MandelBrot Channel || Sub-Category: Helpers || Command: vol_buckets.

Splitting data observations into 3 volatility buckets: low, mid and high. The function does this for each symbol present in the data.

Parameters:

Name Type Description Default
data LazyFrame | DataFrame

The input dataframe or lazy frame.

required
lo_quantile float

The lower quantile for bucketing. Default is 0.4.

0.4
hi_quantile float

The higher quantile for bucketing. Default is 0.8.

0.8
_column_name_volatility str

The name of the column to apply volatility bucketing. Default is "realized_volatility".

'realized_volatility'
_boundary_group_down bool

If True, then group boundary values down to the lower bucket, using vol_buckets_alt() If False, then group boundary values up to the higher bucket, using the Polars .qcut() method. Default is False.

False

Returns:

Type Description
LazyFrame

The data with an additional column: vol_bucket

Source code in src/humbldata/toolbox/technical/mandelbrot_channel/helpers.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
def vol_buckets(
    data: pl.DataFrame | pl.LazyFrame,
    lo_quantile: float = 0.4,
    hi_quantile: float = 0.8,
    _column_name_volatility: str = "realized_volatility",
    *,
    _boundary_group_down: bool = False,
) -> pl.LazyFrame:
    """
    Context: Toolbox || Category: MandelBrot Channel || Sub-Category: Helpers || Command: **vol_buckets**.

    Splitting data observations into 3 volatility buckets: low, mid and high.
    The function does this for each `symbol` present in the data.

    Parameters
    ----------
    data : pl.LazyFrame | pl.DataFrame
        The input dataframe or lazy frame.
    lo_quantile : float
        The lower quantile for bucketing. Default is 0.4.
    hi_quantile : float
        The higher quantile for bucketing. Default is 0.8.
    _column_name_volatility : str
        The name of the column to apply volatility bucketing. Default is
        "realized_volatility".
    _boundary_group_down: bool = False
        If True, then group boundary values down to the lower bucket, using
        `vol_buckets_alt()` If False, then group boundary values up to the
        higher bucket, using the Polars `.qcut()` method.
        Default is False.

    Returns
    -------
    pl.LazyFrame
        The `data` with an additional column: `vol_bucket`
    """
    _check_required_columns(data, _column_name_volatility, "symbol")

    if not _boundary_group_down:
        # Grouping Boundary Values in Higher Bucket
        out = data.lazy().with_columns(
            pl.col(_column_name_volatility)
            .qcut(
                [lo_quantile, hi_quantile],
                labels=["low", "mid", "high"],
                left_closed=False,
                allow_duplicates=True,
            )
            .over("symbol")
            .alias("vol_bucket")
            .cast(pl.Utf8)
        )
    else:
        out = vol_buckets_alt(
            data, lo_quantile, hi_quantile, _column_name_volatility
        )

    return out
humbldata.toolbox.technical.mandelbrot_channel.helpers.vol_buckets_alt ¤
vol_buckets_alt(data: DataFrame | LazyFrame, lo_quantile: float = 0.4, hi_quantile: float = 0.8, _column_name_volatility: str = 'realized_volatility') -> LazyFrame

Context: Toolbox || Category: MandelBrot Channel || Sub-Category: Helpers || Command: vol_buckets_alt.

This is an alternative implementation of vol_buckets() using expressions, and not using .qcut(). The biggest difference is how the function groups values on the boundaries of quantiles. This function groups boundary values down Splitting data observations into 3 volatility buckets: low, mid and high. The function does this for each symbol present in the data.

Parameters:

Name Type Description Default
data LazyFrame | DataFrame

The input dataframe or lazy frame.

required
lo_quantile float

The lower quantile for bucketing. Default is 0.4.

0.4
hi_quantile float

The higher quantile for bucketing. Default is 0.8.

0.8
_column_name_volatility str

The name of the column to apply volatility bucketing. Default is "realized_volatility".

'realized_volatility'

Returns:

Type Description
LazyFrame

The data with an additional column: vol_bucket

Notes

The biggest difference is how the function groups values on the boundaries of quantiles. This function groups boundary values down to the lower bucket. So, if there is a value that lies on the mid/low border, this function will group it with low, whereas vol_buckets() will group it with mid

This function is also slightly less performant.

Source code in src/humbldata/toolbox/technical/mandelbrot_channel/helpers.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
def vol_buckets_alt(
    data: pl.DataFrame | pl.LazyFrame,
    lo_quantile: float = 0.4,
    hi_quantile: float = 0.8,
    _column_name_volatility: str = "realized_volatility",
) -> pl.LazyFrame:
    """
    Context: Toolbox || Category: MandelBrot Channel || Sub-Category: Helpers || Command: **vol_buckets_alt**.

    This is an alternative implementation of `vol_buckets()` using expressions,
    and not using `.qcut()`.
    The biggest difference is how the function groups values on the boundaries
    of quantiles. This function groups boundary values down
    Splitting data observations into 3 volatility buckets: low, mid and high.
    The function does this for each `symbol` present in the data.

    Parameters
    ----------
    data : pl.LazyFrame | pl.DataFrame
        The input dataframe or lazy frame.
    lo_quantile : float
        The lower quantile for bucketing. Default is 0.4.
    hi_quantile : float
        The higher quantile for bucketing. Default is 0.8.
    _column_name_volatility : str
        The name of the column to apply volatility bucketing. Default is "realized_volatility".

    Returns
    -------
    pl.LazyFrame
        The `data` with an additional column: `vol_bucket`

    Notes
    -----
    The biggest difference is how the function groups values on the boundaries
    of quantiles. This function __groups boundary values down__ to the lower bucket.
    So, if there is a value that lies on the mid/low border, this function will
    group it with `low`, whereas `vol_buckets()` will group it with `mid`

    This function is also slightly less performant.
    """
    # Calculate low and high quantiles for each symbol
    low_vol = pl.col(_column_name_volatility).quantile(lo_quantile)
    high_vol = pl.col(_column_name_volatility).quantile(hi_quantile)

    # Determine the volatility bucket for each row using expressions
    vol_bucket = (
        pl.when(pl.col(_column_name_volatility) <= low_vol)
        .then(pl.lit("low"))
        .when(pl.col(_column_name_volatility) <= high_vol)
        .then(pl.lit("mid"))
        .otherwise(pl.lit("high"))
        .alias("vol_bucket")
    )

    # Add the volatility bucket column to the data
    out = data.lazy().with_columns(vol_bucket.over("symbol"))

    return out
humbldata.toolbox.technical.mandelbrot_channel.helpers.vol_filter ¤
vol_filter(data: DataFrame | LazyFrame) -> LazyFrame

Context: Toolbox || Category: MandelBrot Channel || Sub-Category: Helpers || Command: vol_filter.

If _rv_adjustment is True, then filter the data to only include rows that are in the same vol_bucket as the latest row for each symbol.

Parameters:

Name Type Description Default
data DataFrame | LazyFrame

The input dataframe or lazy frame. This should be the output of vol_buckets() function in calc_mandelbrot_channel().

required

Returns:

Type Description
LazyFrame

The data with only observations in the same volatility bucket as the most recent data observation

Source code in src/humbldata/toolbox/technical/mandelbrot_channel/helpers.py
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
def vol_filter(
    data: pl.DataFrame | pl.LazyFrame,
) -> pl.LazyFrame:
    """
    Context: Toolbox || Category: MandelBrot Channel || Sub-Category: Helpers || Command: **vol_filter**.

    If `_rv_adjustment` is True, then filter the data to only include rows
    that are in the same vol_bucket as the latest row for each symbol.

    Parameters
    ----------
    data : pl.DataFrame | pl.LazyFrame
        The input dataframe or lazy frame. This should be the output of
        `vol_buckets()` function in `calc_mandelbrot_channel()`.

    Returns
    -------
    pl.LazyFrame
        The data with only observations in the same volatility bucket as the
        most recent data observation
    """
    _check_required_columns(data, "vol_bucket", "symbol")

    data = data.lazy().with_columns(
        pl.col("vol_bucket").last().over("symbol").alias("last_vol_bucket")
    )

    out = data.filter(
        (pl.col("vol_bucket") == pl.col("last_vol_bucket")).over("symbol")
    ).drop("last_vol_bucket")

    return out
humbldata.toolbox.technical.mandelbrot_channel.helpers.price_range ¤
price_range(data: LazyFrame | DataFrame, recent_price_data: DataFrame | LazyFrame | None = None, rs_method: Literal['RS', 'RS_mean', 'RS_max', 'RS_min'] = 'RS', _detrended_returns: str = 'detrended_log_returns', _column_name_cum_sum_max: str = 'cum_sum_max', _column_name_cum_sum_min: str = 'cum_sum_min', *, _rv_adjustment: bool = False, _sort: bool = True, **kwargs) -> LazyFrame

Context: Toolbox || Category: MandelBrot Channel || Sub-Category: Helpers || Command: price_range.

Calculate the price range for a given dataset using the Mandelbrot method.

This function computes the price range based on the recent price data, cumulative sum max and min, and RS method specified. It supports adjustments for real volatility and sorting of the data based on symbols and dates.

Parameters:

Name Type Description Default
data LazyFrame | DataFrame

The dataset containing the financial data.

required
recent_price_data DataFrame | LazyFrame | None

The dataset containing the most recent price data. If None, the most recent prices are extracted from data.

None
rs_method Literal['RS', 'RS_mean', 'RS_max', 'RS_min']

The RS value to use. Must be one of 'RS', 'RS_mean', 'RS_max', 'RS_min'. RS is the column that is the Range/STD of the detrended returns.

"RS"
_detrended_returns str

The column name for detrended returns in data

"detrended_log_returns"
_column_name_cum_sum_max str

The column name for cumulative sum max in data

"cum_sum_max"
_column_name_cum_sum_min str

The column name for cumulative sum min in data

"cum_sum_min"
_rv_adjustment bool

If True, calculated the std() for all observations (since they have already been filtered by volatility bucket). If False, then calculates the std() for the most recent window_index and uses that to adjust the price range.

False
_sort bool

If True, sorts the data based on symbols and dates.

True
**kwargs

Arbitrary keyword arguments.

{}

Returns:

Type Description
LazyFrame

The dataset with calculated price range, including columns for top and bottom prices.

Raises:

Type Description
HumblDataError

If the RS method specified is not supported.

Examples:

>>> price_range_data = price_range(data, recent_price_data=None, rs_method="RS")
>>> print(price_range_data.columns)
['symbol', 'bottom_price', 'recent_price', 'top_price']
Notes

For rs_method, you should know how this affects the mandelbrot channel that is produced. Selecting RS uses the most recent RS value to calculate the price range, whereas selecting RS_mean, RS_max, or RS_min uses the mean, max, or min of the RS values, respectively.

Source code in src/humbldata/toolbox/technical/mandelbrot_channel/helpers.py
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
def price_range(
    data: pl.LazyFrame | pl.DataFrame,
    recent_price_data: pl.DataFrame | pl.LazyFrame | None = None,
    rs_method: Literal["RS", "RS_mean", "RS_max", "RS_min"] = "RS",
    _detrended_returns: str = "detrended_log_returns",  # Parameterized detrended_returns column
    _column_name_cum_sum_max: str = "cum_sum_max",
    _column_name_cum_sum_min: str = "cum_sum_min",
    *,
    _rv_adjustment: bool = False,
    _sort: bool = True,
    **kwargs,
) -> pl.LazyFrame:
    """
    Context: Toolbox || Category: MandelBrot Channel || Sub-Category: Helpers || Command: **price_range**.

    Calculate the price range for a given dataset using the Mandelbrot method.

    This function computes the price range based on the recent price data,
    cumulative sum max and min, and RS method specified. It supports adjustments
    for real volatility and sorting of the data based on symbols and dates.

    Parameters
    ----------
    data : pl.LazyFrame | pl.DataFrame
        The dataset containing the financial data.
    recent_price_data : pl.DataFrame | pl.LazyFrame | None
        The dataset containing the most recent price data. If None, the most recent prices are extracted from `data`.
    rs_method : Literal["RS", "RS_mean", "RS_max", "RS_min"], default "RS"
        The RS value to use. Must be one of 'RS', 'RS_mean', 'RS_max', 'RS_min'.
        RS is the column that is the Range/STD of the detrended returns.
    _detrended_returns : str, default "detrended_log_returns"
        The column name for detrended returns in `data`
    _column_name_cum_sum_max : str, default "cum_sum_max"
        The column name for cumulative sum max in `data`
    _column_name_cum_sum_min : str, default "cum_sum_min"
        The column name for cumulative sum min in `data`
    _rv_adjustment : bool, default False
        If True, calculated the `std()` for all observations (since they have
        already been filtered by volatility bucket). If False, then calculates
        the `std()` for the most recent `window_index`
        and uses that to adjust the price range.
    _sort : bool, default True
        If True, sorts the data based on symbols and dates.
    **kwargs
        Arbitrary keyword arguments.

    Returns
    -------
    pl.LazyFrame
        The dataset with calculated price range, including columns for top and
        bottom prices.

    Raises
    ------
    HumblDataError
        If the RS method specified is not supported.

    Examples
    --------
    >>> price_range_data = price_range(data, recent_price_data=None, rs_method="RS")
    >>> print(price_range_data.columns)
    ['symbol', 'bottom_price', 'recent_price', 'top_price']

    Notes
    -----
    For `rs_method`, you should know how this affects the mandelbrot channel
    that is produced. Selecting RS uses the most recent RS value to calculate
    the price range, whereas selecting RS_mean, RS_max, or RS_min uses the mean,
    max, or min of the RS values, respectively.
    """
    # Check if RS_method is one of the allowed values
    if rs_method not in RS_METHODS:
        msg = "RS_method must be one of 'RS', 'RS_mean', 'RS_max', 'RS_min'"
        raise HumblDataError(msg)

    if isinstance(data, pl.DataFrame):
        data = data.lazy()

    sort_cols = _set_sort_cols(data, "symbol", "date")
    if _sort:
        data.sort(sort_cols)

    # Define Polars Expressions ================================================
    last_cum_sum_max = (
        pl.col(_column_name_cum_sum_max).last().alias("last_cum_sum_max")
    )
    last_cum_sum_min = (
        pl.col(_column_name_cum_sum_min).last().alias("last_cum_sum_min")
    )
    # Define a conditional expression for std_detrended_returns based on _rv_adjustment
    std_detrended_returns_expr = (
        pl.col(_detrended_returns).std().alias(f"std_{_detrended_returns}")
        if _rv_adjustment
        else pl.col(_detrended_returns)
        .filter(pl.col("window_index") == pl.col("window_index").min())
        .std()
        .alias(f"std_{_detrended_returns}")
    )
    # if rv_adjustment isnt used, then use the most recent window will be used
    # for calculating the price_range
    date_expr = pl.col("date").max()
    # ===========================================================================

    if rs_method == "RS":
        rs_expr = pl.col("RS").last().alias("RS")
    elif rs_method == "RS_mean":
        rs_expr = pl.col("RS").mean().alias("RS_mean")
    elif rs_method == "RS_max":
        rs_expr = pl.col("RS").max().alias("RS_max")
    elif rs_method == "RS_min":
        rs_expr = pl.col("RS").min().alias("RS_min")

    if recent_price_data is None:
        # if no recent_prices_data is passed, then pull the most recent prices from the data
        recent_price_expr = pl.col("close").last().alias("recent_price")
        # Perform a single group_by operation to calculate both STD of detrended returns and RS statistics
        price_range_data = (
            data.group_by("symbol")
            .agg(
                [
                    date_expr,
                    # Conditional STD calculation based on _rv_adjustment
                    std_detrended_returns_expr,
                    # Recent Price Data
                    recent_price_expr,
                    # cum_sum_max/min last
                    last_cum_sum_max,
                    last_cum_sum_min,
                    # RS statistics
                    rs_expr,
                ]
            )
            # Join with recent_price_data on symbol
            .with_columns(
                (
                    pl.col(rs_method)
                    * pl.col("std_detrended_log_returns")
                    * pl.col("recent_price")
                ).alias("price_range")
            )
            .sort("symbol")
        )
    else:
        price_range_data = (
            data.group_by("symbol")
            .agg(
                [
                    date_expr,
                    # Conditional STD calculation based on _rv_adjustment
                    std_detrended_returns_expr,
                    # cum_sum_max/min last
                    last_cum_sum_max,
                    last_cum_sum_min,
                    # RS statistics
                    rs_expr,
                ]
            )
            # Join with recent_price_data on symbol
            .join(recent_price_data.lazy(), on="symbol")
            .with_columns(
                (
                    pl.col(rs_method)
                    * pl.col("std_detrended_log_returns")
                    * pl.col("recent_price")
                ).alias("price_range")
            )
            .sort("symbol")
        )
    # Relative Position Modifier
    out = _price_range_engine(price_range_data)

    return out

humbldata.toolbox.technical.volatility ¤

humbldata.toolbox.technical.volatility.realized_volatility_helpers ¤

Context: Toolbox || Category: Technical || Sub-Category: Volatility Helpers.

All of the volatility estimators used in calc_realized_volatility(). These are various methods to calculate the realized volatility of financial data.

humbldata.toolbox.technical.volatility.realized_volatility_helpers.std ¤
std(data: DataFrame | LazyFrame | Series, window: str = '1m', trading_periods=252, _drop_nulls: bool = True, _avg_trading_days: bool = False, _column_name_returns: str = 'log_returns', _sort: bool = True) -> LazyFrame | Series

Context: Toolbox || Category: Technical || Sub-Category: Volatility Helpers || Command: _std.

This function computes the standard deviation of returns, which is a common measure of volatility.It calculates the rolling standard deviation for a given window size, optionally adjusting for the average number of trading days and scaling the result to an annualized volatility percentage.

Parameters:

Name Type Description Default
data Union[DataFrame, LazyFrame, Series]

The input data containing the returns. It can be a DataFrame, LazyFrame, or Series.

required
window str

The rolling window size for calculating the standard deviation. The default is "1m" (one month).

'1m'
trading_periods int

The number of trading periods in a year, used for annualizing the volatility. The default is 252.

252
_drop_nulls bool

If True, null values will be dropped from the result. The default is True.

True
_avg_trading_days bool

If True, the average number of trading days will be used when calculating the window size. The default is True.

False
_column_name_returns str

The name of the column containing the returns. This parameter is used when data is a DataFrame or LazyFrame. The default is "log_returns".

'log_returns'

Returns:

Type Description
Union[DataFrame, LazyFrame, Series]

The input data structure with an additional column for the rolling standard deviation of returns, or the modified Series with the rolling standard deviation values.

Source code in src/humbldata/toolbox/technical/volatility/realized_volatility_helpers.py
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
def std(
    data: pl.DataFrame | pl.LazyFrame | pl.Series,
    window: str = "1m",
    trading_periods=252,
    _drop_nulls: bool = True,
    _avg_trading_days: bool = False,
    _column_name_returns: str = "log_returns",
    _sort: bool = True,
) -> pl.LazyFrame | pl.Series:
    """
    Context: Toolbox || Category: Technical || Sub-Category: Volatility Helpers || **Command: _std**.

    This function computes the standard deviation of returns, which is a common
    measure of volatility.It calculates the rolling standard deviation for a
    given window size, optionally adjusting for the average number of trading
    days and scaling the result to an annualized volatility percentage.

    Parameters
    ----------
    data : Union[pl.DataFrame, pl.LazyFrame, pl.Series]
        The input data containing the returns. It can be a DataFrame, LazyFrame,
        or Series.
    window : str, optional
        The rolling window size for calculating the standard deviation.
        The default is "1m" (one month).
    trading_periods : int, optional
        The number of trading periods in a year, used for annualizing the
        volatility. The default is 252.
    _drop_nulls : bool, optional
        If True, null values will be dropped from the result.
        The default is True.
    _avg_trading_days : bool, optional
        If True, the average number of trading days will be used when
        calculating the window size. The default is True.
    _column_name_returns : str, optional
        The name of the column containing the returns. This parameter is used
        when `data` is a DataFrame or LazyFrame. The default is "log_returns".

    Returns
    -------
    Union[pl.DataFrame, pl.LazyFrame, pl.Series]
        The input data structure with an additional column for the rolling
        standard deviation of returns, or the modified Series with the rolling
        standard deviation values.
    """
    window_timedelta = _window_format(
        window, _return_timedelta=True, _avg_trading_days=_avg_trading_days
    )
    if isinstance(data, pl.Series):
        return data.rolling_std(
            window_size=window_timedelta.days, min_periods=1
        )
    sort_cols = _set_sort_cols(data, "symbol", "date")
    if _sort and sort_cols:
        data = data.lazy().sort(sort_cols)
        for col in sort_cols:
            data = data.set_sorted(col)

    # convert window_timedelta to days to use fixed window
    result = data.lazy().with_columns(
        (
            pl.col(_column_name_returns).rolling_std_by(
                window_size=window_timedelta,
                min_periods=2,  # using min_periods=2, bc if min_periods=1, the first value will be 0.
                by="date",
            )
            * math.sqrt(trading_periods)
            * 100
        ).alias(f"std_volatility_pct_{window_timedelta.days}D")
    )
    if _drop_nulls:
        return result.drop_nulls(
            subset=f"std_volatility_pct_{window_timedelta.days}D"
        )
    return result
humbldata.toolbox.technical.volatility.realized_volatility_helpers.parkinson ¤
parkinson(data: DataFrame | LazyFrame, window: str = '1m', _column_name_high: str = 'high', _column_name_low: str = 'low', *, _drop_nulls: bool = True, _avg_trading_days: bool = False, _sort: bool = True) -> LazyFrame

Calculate Parkinson's volatility over a specified window.

Parkinson's volatility is a measure that uses the stock's high and low prices of the day rather than just close to close prices. It is particularly useful for capturing large price movements during the day.

Parameters:

Name Type Description Default
data DataFrame | LazyFrame

The input data containing the stock prices.

required
window int

The rolling window size for calculating volatility, by default 30.

'1m'
trading_periods int

The number of trading periods in a year, by default 252.

required
_column_name_high str

The name of the column containing the high prices, by default "high".

'high'
_column_name_low str

The name of the column containing the low prices, by default "low".

'low'
_drop_nulls bool

Whether to drop null values from the result, by default True.

True
_avg_trading_days bool

Whether to use the average number of trading days when calculating the window size, by default True.

False

Returns:

Type Description
DataFrame | LazyFrame

The calculated Parkinson's volatility, with an additional column "parkinson_volatility_pct_{window_int}D" indicating the percentage volatility.

Notes

This function requires the input data to have 'high' and 'low' columns to calculate the logarithm of their ratio, which is squared and scaled by a constant to estimate volatility. The result is then annualized and expressed as a percentage.

Usage

If you pass "1m as a window argument and _avg_trading_days=False. The result will be 30. If _avg_trading_days=True, the result will be 21.

Examples:

>>> data = pl.DataFrame({'high': [120, 125], 'low': [115, 120]})
>>> _parkinson(data)
A DataFrame with the calculated Parkinson's volatility.
Source code in src/humbldata/toolbox/technical/volatility/realized_volatility_helpers.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
def parkinson(
    data: pl.DataFrame | pl.LazyFrame,
    window: str = "1m",
    _column_name_high: str = "high",
    _column_name_low: str = "low",
    *,
    _drop_nulls: bool = True,
    _avg_trading_days: bool = False,
    _sort: bool = True,
) -> pl.LazyFrame:
    """
    Calculate Parkinson's volatility over a specified window.

    Parkinson's volatility is a measure that uses the stock's high and low prices
    of the day rather than just close to close prices. It is particularly useful
    for capturing large price movements during the day.

    Parameters
    ----------
    data : pl.DataFrame | pl.LazyFrame
        The input data containing the stock prices.
    window : int, optional
        The rolling window size for calculating volatility, by default 30.
    trading_periods : int, optional
        The number of trading periods in a year, by default 252.
    _column_name_high : str, optional
        The name of the column containing the high prices, by default "high".
    _column_name_low : str, optional
        The name of the column containing the low prices, by default "low".
    _drop_nulls : bool, optional
        Whether to drop null values from the result, by default True.
    _avg_trading_days : bool, optional
        Whether to use the average number of trading days when calculating the
        window size, by default True.

    Returns
    -------
    pl.DataFrame | pl.LazyFrame
        The calculated Parkinson's volatility, with an additional column
        "parkinson_volatility_pct_{window_int}D"
        indicating the percentage volatility.

    Notes
    -----
    This function requires the input data to have 'high' and 'low' columns to
    calculate
    the logarithm of their ratio, which is squared and scaled by a constant to
    estimate
    volatility. The result is then annualized and expressed as a percentage.

    Usage
    -----
    If you pass `"1m` as a `window` argument and  `_avg_trading_days=False`.
    The result will be `30`. If `_avg_trading_days=True`, the result will be
    `21`.

    Examples
    --------
    >>> data = pl.DataFrame({'high': [120, 125], 'low': [115, 120]})
    >>> _parkinson(data)
    A DataFrame with the calculated Parkinson's volatility.
    """
    sort_cols = _set_sort_cols(data, "symbol", "date")
    if _sort and sort_cols:
        data = data.lazy().sort(sort_cols)
        for col in sort_cols:
            data = data.set_sorted(col)

    var1 = 1.0 / (4.0 * math.log(2.0))
    var2 = (
        data.lazy()
        .select((pl.col(_column_name_high) / pl.col(_column_name_low)).log())
        .collect()
        .to_series()
    )
    rs = var1 * var2**2

    window_int: int = _window_format(
        window, _return_timedelta=True, _avg_trading_days=_avg_trading_days
    ).days
    result = data.lazy().with_columns(
        (
            rs.rolling_map(_annual_vol, window_size=window_int, min_periods=1)
            * 100
        ).alias(f"parkinson_volatility_pct_{window_int}D")
    )
    if _drop_nulls:
        return result.drop_nulls(
            subset=f"parkinson_volatility_pct_{window_int}D"
        )

    return result
humbldata.toolbox.technical.volatility.realized_volatility_helpers.garman_klass ¤
garman_klass(data: DataFrame | LazyFrame, window: str = '1m', _column_name_high: str = 'high', _column_name_low: str = 'low', _column_name_open: str = 'open', _column_name_close: str = 'close', _drop_nulls: bool = True, _avg_trading_days: bool = False, _sort: bool = True) -> LazyFrame

Context: Toolbox || Category: Technical || Sub-Category: Volatility Helpers || Command: _garman_klass.

Calculates the Garman-Klass volatility for a given dataset.

Parameters:

Name Type Description Default
data DataFrame | LazyFrame

The input data containing the price information.

required
window str

The rolling window size for volatility calculation, by default "1m".

'1m'
_column_name_high str

The name of the column containing the high prices, by default "high".

'high'
_column_name_low str

The name of the column containing the low prices, by default "low".

'low'
_column_name_open str

The name of the column containing the opening prices, by default "open".

'open'
_column_name_close str

The name of the column containing the adjusted closing prices, by default "close".

'close'
_drop_nulls bool

Whether to drop null values from the result, by default True.

True
_avg_trading_days bool

Whether to use the average number of trading days when calculating the window size, by default True.

False

Returns:

Type Description
DataFrame | LazyFrame | Series

The calculated Garman-Klass volatility, with an additional column "volatility_pct" indicating the percentage volatility.

Notes

Garman-Klass volatility extends Parkinson’s volatility by considering the opening and closing prices in addition to the high and low prices. This approach provides a more accurate estimation of volatility, especially in markets with significant activity at the opening and closing of trading sessions.

Source code in src/humbldata/toolbox/technical/volatility/realized_volatility_helpers.py
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
def garman_klass(
    data: pl.DataFrame | pl.LazyFrame,
    window: str = "1m",
    _column_name_high: str = "high",
    _column_name_low: str = "low",
    _column_name_open: str = "open",
    _column_name_close: str = "close",
    _drop_nulls: bool = True,
    _avg_trading_days: bool = False,
    _sort: bool = True,
) -> pl.LazyFrame:
    """
    Context: Toolbox || Category: Technical || Sub-Category: Volatility Helpers || **Command: _garman_klass**.

    Calculates the Garman-Klass volatility for a given dataset.

    Parameters
    ----------
    data : pl.DataFrame | pl.LazyFrame
        The input data containing the price information.
    window : str, optional
        The rolling window size for volatility calculation, by default "1m".
    _column_name_high : str, optional
        The name of the column containing the high prices, by default "high".
    _column_name_low : str, optional
        The name of the column containing the low prices, by default "low".
    _column_name_open : str, optional
        The name of the column containing the opening prices, by default "open".
    _column_name_close : str, optional
        The name of the column containing the adjusted closing prices, by
        default "close".
    _drop_nulls : bool, optional
        Whether to drop null values from the result, by default True.
    _avg_trading_days : bool, optional
        Whether to use the average number of trading days when calculating the
        window size, by default True.

    Returns
    -------
    pl.DataFrame | pl.LazyFrame | pl.Series
        The calculated Garman-Klass volatility, with an additional column
        "volatility_pct" indicating the percentage volatility.

    Notes
    -----
    Garman-Klass volatility extends Parkinson’s volatility by considering the
    opening and closing prices in addition to the high and low prices. This
    approach provides a more accurate estimation of volatility, especially in
    markets with significant activity at the opening and closing of trading
    sessions.
    """
    sort_cols = _set_sort_cols(data, "symbol", "date")
    if _sort and sort_cols:
        data = data.lazy().sort(sort_cols)
        for col in sort_cols:
            data = data.set_sorted(col)
    log_hi_lo = (
        data.lazy()
        .select((pl.col(_column_name_high) / pl.col(_column_name_low)).log())
        .collect()
        .to_series()
    )
    log_close_open = (
        data.lazy()
        .select((pl.col(_column_name_close) / pl.col(_column_name_open)).log())
        .collect()
        .to_series()
    )
    rs: pl.Series = 0.5 * log_hi_lo**2 - (2 * np.log(2) - 1) * log_close_open**2

    window_int: int = _window_format(
        window, _return_timedelta=True, _avg_trading_days=_avg_trading_days
    ).days
    result = data.lazy().with_columns(
        (
            rs.rolling_map(_annual_vol, window_size=window_int, min_periods=1)
            * 100
        ).alias(f"gk_volatility_pct_{window_int}D")
    )
    if _drop_nulls:
        return result.drop_nulls(subset=f"gk_volatility_pct_{window_int}D")
    return result
humbldata.toolbox.technical.volatility.realized_volatility_helpers.hodges_tompkins ¤
hodges_tompkins(data: DataFrame | LazyFrame | Series, window: str = '1m', trading_periods=252, _column_name_returns: str = 'log_returns', *, _drop_nulls: bool = True, _avg_trading_days: bool = False, _sort: bool = True) -> LazyFrame | Series

Context: Toolbox || Category: Technical || Sub-Category: Volatility Helpers || Command: _hodges_tompkins.

Hodges-Tompkins volatility is a bias correction for estimation using an overlapping data sample that produces unbiased estimates and a substantial gain in efficiency.

Source code in src/humbldata/toolbox/technical/volatility/realized_volatility_helpers.py
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
def hodges_tompkins(
    data: pl.DataFrame | pl.LazyFrame | pl.Series,
    window: str = "1m",
    trading_periods=252,
    _column_name_returns: str = "log_returns",
    *,
    _drop_nulls: bool = True,
    _avg_trading_days: bool = False,
    _sort: bool = True,
) -> pl.LazyFrame | pl.Series:
    """
    Context: Toolbox || Category: Technical || Sub-Category: Volatility Helpers || **Command: _hodges_tompkins**.

    Hodges-Tompkins volatility is a bias correction for estimation using an
    overlapping data sample that produces unbiased estimates and a
    substantial gain in efficiency.
    """
    # When calculating rv_mean, need a different adjustment factor,
    # so window doesn't influence the Volatility_mean
    # RV_MEAN

    # Define Window Size
    window_timedelta = _window_format(
        window, _return_timedelta=True, _avg_trading_days=_avg_trading_days
    )
    # Calculate STD, assigned to `vol`
    if isinstance(data, pl.Series):
        vol = data.rolling_std(window_size=window_timedelta.days, min_periods=1)
    else:
        sort_cols = _set_sort_cols(data, "symbol", "date")
        if _sort and sort_cols:
            data = data.lazy().sort(sort_cols)
            for col in sort_cols:
                data = data.set_sorted(col)
        vol = data.lazy().select(
            pl.col(_column_name_returns).rolling_std_by(
                window_size=window_timedelta, min_periods=1, by="date"
            )
            * np.sqrt(trading_periods)
        )

    # Assign window size to h for adjustment
    h: int = window_timedelta.days

    if isinstance(data, pl.Series):
        count = data.len()
    elif isinstance(data, pl.LazyFrame):
        count = data.collect().shape[0]
    else:
        count = data.shape[0]

    n = (count - h) + 1
    adj_factor = 1.0 / (1.0 - (h / n) + ((h**2 - 1) / (3 * n**2)))

    if isinstance(data, pl.Series):
        return (vol * adj_factor) * 100
    else:
        result = data.lazy().with_columns(
            ((vol.collect() * adj_factor) * 100)
            .to_series()
            .alias(f"ht_volatility_pct_{h}D")
        )
    if _drop_nulls:
        result = result.drop_nulls(subset=f"ht_volatility_pct_{h}D")
    return result
humbldata.toolbox.technical.volatility.realized_volatility_helpers.rogers_satchell ¤
rogers_satchell(data: DataFrame | LazyFrame, window: str = '1m', _column_name_high: str = 'high', _column_name_low: str = 'low', _column_name_open: str = 'open', _column_name_close: str = 'close', _drop_nulls: bool = True, _avg_trading_days: bool = False, _sort: bool = True) -> LazyFrame

Context: Toolbox || Category: Technical || Sub-Category: Volatility Helpers || Command: _rogers_satchell.

Rogers-Satchell is an estimator for measuring the volatility of securities with an average return not equal to zero. Unlike Parkinson and Garman-Klass estimators, Rogers-Satchell incorporates a drift term (mean return not equal to zero). This function calculates the Rogers-Satchell volatility estimator over a specified window and optionally drops null values from the result.

Parameters:

Name Type Description Default
data DataFrame | LazyFrame

The input data for which to calculate the Rogers-Satchell volatility estimator. This can be either a DataFrame or a LazyFrame. There need to be OHLC columns present in the data.

required
window str

The window over which to calculate the volatility estimator. The window is specified as a string, such as "1m" for one month.

"1m"
_column_name_high str

The name of the column representing the high prices in the data.

"high"
_column_name_low str

The name of the column representing the low prices in the data.

"low"
_column_name_open str

The name of the column representing the opening prices in the data.

"open"
_column_name_close str

The name of the column representing the adjusted closing prices in the data.

"close"
_drop_nulls bool

Whether to drop null values from the result. If True, rows with null values in the calculated volatility column will be removed from the output.

True
_avg_trading_days bool

Indicates whether to use the average number of trading days per window. This affects how the window size is interpreted. i.e instead of "1mo" returning timedelta(days=31), it will return timedelta(days=21).

True

Returns:

Type Description
DataFrame | LazyFrame

The input data with an additional column containing the calculated Rogers-Satchell volatility estimator. The return type matches the input type (DataFrame or LazyFrame).

Source code in src/humbldata/toolbox/technical/volatility/realized_volatility_helpers.py
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
def rogers_satchell(
    data: pl.DataFrame | pl.LazyFrame,
    window: str = "1m",
    _column_name_high: str = "high",
    _column_name_low: str = "low",
    _column_name_open: str = "open",
    _column_name_close: str = "close",
    _drop_nulls: bool = True,
    _avg_trading_days: bool = False,
    _sort: bool = True,
) -> pl.LazyFrame:
    """
    Context: Toolbox || Category: Technical || Sub-Category: Volatility Helpers || **Command: _rogers_satchell**.

    Rogers-Satchell is an estimator for measuring the volatility of
    securities with an average return not equal to zero. Unlike Parkinson
    and Garman-Klass estimators, Rogers-Satchell incorporates a drift term
    (mean return not equal to zero). This function calculates the
    Rogers-Satchell volatility estimator over a specified window and optionally
    drops null values from the result.

    Parameters
    ----------
    data : pl.DataFrame | pl.LazyFrame
        The input data for which to calculate the Rogers-Satchell volatility
        estimator. This can be either a DataFrame or a LazyFrame. There need to
        be OHLC columns present in the data.
    window : str, default "1m"
        The window over which to calculate the volatility estimator. The
        window is specified as a string, such as "1m" for one month.
    _column_name_high : str, default "high"
        The name of the column representing the high prices in the data.
    _column_name_low : str, default "low"
        The name of the column representing the low prices in the data.
    _column_name_open : str, default "open"
        The name of the column representing the opening prices in the data.
    _column_name_close : str, default "close"
        The name of the column representing the adjusted closing prices in the
        data.
    _drop_nulls : bool, default True
        Whether to drop null values from the result. If True, rows with null
        values in the calculated volatility column will be removed from the
        output.
    _avg_trading_days : bool, default True
        Indicates whether to use the average number of trading days per window.
        This affects how the window size is interpreted. i.e instead of "1mo"
        returning `timedelta(days=31)`, it will return `timedelta(days=21)`.

    Returns
    -------
    pl.DataFrame | pl.LazyFrame
        The input data with an additional column containing the calculated
        Rogers-Satchell volatility estimator. The return type matches the input
        type (DataFrame or LazyFrame).
    """
    # Check if all required columns are present in the DataFrame
    _check_required_columns(
        data,
        _column_name_high,
        _column_name_low,
        _column_name_open,
        _column_name_close,
    )
    sort_cols = _set_sort_cols(data, "symbol", "date")
    if _sort and sort_cols:
        data = data.lazy().sort(sort_cols)
        for col in sort_cols:
            data = data.set_sorted(col)
    # assign window
    window_int: int = _window_format(
        window=window,
        _return_timedelta=True,
        _avg_trading_days=_avg_trading_days,
    ).days

    data = (
        data.lazy()
        .with_columns(
            [
                (pl.col(_column_name_high) / pl.col(_column_name_open))
                .log()
                .alias("log_ho"),
                (pl.col(_column_name_low) / pl.col(_column_name_open))
                .log()
                .alias("log_lo"),
                (pl.col(_column_name_close) / pl.col(_column_name_open))
                .log()
                .alias("log_co"),
            ]
        )
        .with_columns(
            (
                pl.col("log_ho") * (pl.col("log_ho") - pl.col("log_co"))
                + pl.col("log_lo") * (pl.col("log_lo") - pl.col("log_co"))
            ).alias("rs")
        )
    )
    result = data.lazy().with_columns(
        (
            pl.col("rs").rolling_map(
                _annual_vol, window_size=window_int, min_periods=1
            )
            * 100
        ).alias(f"rs_volatility_pct_{window_int}D")
    )
    if _drop_nulls:
        result = result.drop_nulls(subset=f"rs_volatility_pct_{window_int}D")
    return result
humbldata.toolbox.technical.volatility.realized_volatility_helpers.yang_zhang ¤
yang_zhang(data: DataFrame | LazyFrame, window: str = '1m', trading_periods: int = 252, _column_name_high: str = 'high', _column_name_low: str = 'low', _column_name_open: str = 'open', _column_name_close: str = 'close', _avg_trading_days: bool = False, _drop_nulls: bool = True, _sort: bool = True) -> LazyFrame

Context: Toolbox || Category: Technical || Sub-Category: Volatility Helpers || Command: _yang_zhang.

Yang-Zhang volatility is the combination of the overnight (close-to-open volatility), a weighted average of the Rogers-Satchell volatility and the day’s open-to-close volatility.

Source code in src/humbldata/toolbox/technical/volatility/realized_volatility_helpers.py
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
def yang_zhang(
    data: pl.DataFrame | pl.LazyFrame,
    window: str = "1m",
    trading_periods: int = 252,
    _column_name_high: str = "high",
    _column_name_low: str = "low",
    _column_name_open: str = "open",
    _column_name_close: str = "close",
    _avg_trading_days: bool = False,
    _drop_nulls: bool = True,
    _sort: bool = True,
) -> pl.LazyFrame:
    """
    Context: Toolbox || Category: Technical || Sub-Category: Volatility Helpers || **Command: _yang_zhang**.

    Yang-Zhang volatility is the combination of the overnight
    (close-to-open volatility), a weighted average of the Rogers-Satchell
    volatility and the day’s open-to-close volatility.
    """
    # check required columns
    _check_required_columns(
        data,
        _column_name_high,
        _column_name_low,
        _column_name_open,
        _column_name_close,
    )
    sort_cols = _set_sort_cols(data, "symbol", "date")
    if _sort and sort_cols:
        data = data.lazy().sort(sort_cols)
        for col in sort_cols:
            data = data.set_sorted(col)

    # assign window
    window_int: int = _window_format(
        window=window,
        _return_timedelta=True,
        _avg_trading_days=_avg_trading_days,
    ).days

    data = (
        data.lazy()
        .with_columns(
            [
                (pl.col(_column_name_high) / pl.col(_column_name_open))
                .log()
                .alias("log_ho"),
                (pl.col(_column_name_low) / pl.col(_column_name_open))
                .log()
                .alias("log_lo"),
                (pl.col(_column_name_close) / pl.col(_column_name_open))
                .log()
                .alias("log_co"),
                (pl.col(_column_name_open) / pl.col(_column_name_close).shift())
                .log()
                .alias("log_oc"),
                (
                    pl.col(_column_name_close)
                    / pl.col(_column_name_close).shift()
                )
                .log()
                .alias("log_cc"),
            ]
        )
        .with_columns(
            [
                (pl.col("log_oc") ** 2).alias("log_oc_sq"),
                (pl.col("log_cc") ** 2).alias("log_cc_sq"),
                (
                    pl.col("log_ho") * (pl.col("log_ho") - pl.col("log_co"))
                    + pl.col("log_lo") * (pl.col("log_lo") - pl.col("log_co"))
                ).alias("rs"),
            ]
        )
    )

    k = 0.34 / (1.34 + (window_int + 1) / (window_int - 1))
    data = _yang_zhang_engine(data=data, window=window_int)
    result = (
        data.lazy()
        .with_columns(
            (
                (
                    pl.col("open_vol")
                    + k * pl.col("close_vol")
                    + (1 - k) * pl.col("window_rs")
                ).sqrt()
                * np.sqrt(trading_periods)
                * 100
            ).alias(f"yz_volatility_pct_{window_int}D")
        )
        .select(
            pl.exclude(
                [
                    "log_ho",
                    "log_lo",
                    "log_co",
                    "log_oc",
                    "log_cc",
                    "log_oc_sq",
                    "log_cc_sq",
                    "rs",
                    "close_vol",
                    "open_vol",
                    "window_rs",
                ]
            )
        )
    )
    if _drop_nulls:
        return result.drop_nulls(subset=f"yz_volatility_pct_{window_int}D")
    return result
humbldata.toolbox.technical.volatility.realized_volatility_helpers.squared_returns ¤
squared_returns(data: DataFrame | LazyFrame, window: str = '1m', trading_periods: int = 252, _drop_nulls: bool = True, _avg_trading_days: bool = False, _column_name_returns: str = 'log_returns', _sort: bool = True) -> LazyFrame

Calculate squared returns over a rolling window.

Parameters:

Name Type Description Default
data DataFrame | LazyFrame

The input data containing the price information.

required
window str

The rolling window size for calculating squared returns, by default "1m".

'1m'
trading_periods int

The number of trading periods in a year, used for scaling the result. The default is 252.

252
_drop_nulls bool

Whether to drop null values from the result, by default True.

True
_column_name_returns str

The name of the column containing the price data, by default "close".

'log_returns'

Returns:

Type Description
DataFrame | LazyFrame

The input data structure with an additional column for the rolling squared returns.

Source code in src/humbldata/toolbox/technical/volatility/realized_volatility_helpers.py
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
def squared_returns(
    data: pl.DataFrame | pl.LazyFrame,
    window: str = "1m",
    trading_periods: int = 252,
    _drop_nulls: bool = True,
    _avg_trading_days: bool = False,
    _column_name_returns: str = "log_returns",
    _sort: bool = True,
) -> pl.LazyFrame:
    """
    Calculate squared returns over a rolling window.

    Parameters
    ----------
    data : pl.DataFrame | pl.LazyFrame
        The input data containing the price information.
    window : str, optional
        The rolling window size for calculating squared returns, by default "1m".
    trading_periods : int, optional
        The number of trading periods in a year, used for scaling the result.
        The default is 252.
    _drop_nulls : bool, optional
        Whether to drop null values from the result, by default True.
    _column_name_returns : str, optional
        The name of the column containing the price data, by default "close".

    Returns
    -------
    pl.DataFrame | pl.LazyFrame
        The input data structure with an additional column for the rolling
        squared returns.
    """
    _check_required_columns(data, _column_name_returns)

    sort_cols = _set_sort_cols(data, "symbol", "date")
    if _sort and sort_cols:
        data = data.lazy().sort(sort_cols)
        for col in sort_cols:
            data = data.set_sorted(col)

    # assign window
    window_int: int = _window_format(
        window=window,
        _return_timedelta=True,
        _avg_trading_days=_avg_trading_days,
    ).days

    data = data.lazy().with_columns(
        ((pl.col(_column_name_returns) * 100) ** 2).alias("sq_log_returns_pct")
    )
    # Calculate rolling squared returns
    result = (
        data.lazy()
        .with_columns(
            pl.col("sq_log_returns_pct")
            .rolling_mean(window_size=window_int, min_periods=1)
            .alias(f"sq_volatility_pct_{window_int}D")
        )
        .drop("sq_log_returns_pct")
    )
    if _drop_nulls:
        result = result.drop_nulls(subset=f"sq_volatility_pct_{window_int}D")
    return result
humbldata.toolbox.technical.volatility.realized_volatility_model ¤

Context: Toolbox || Category: Technical || Command: calc_realized_volatility.

A command to generate Realized Volatility for any time series. A complete set of volatility estimators based on Euan Sinclair's Volatility Trading

humbldata.toolbox.technical.volatility.realized_volatility_model.calc_realized_volatility ¤
calc_realized_volatility(data: DataFrame | LazyFrame, window: str = '1m', method: Literal['std', 'parkinson', 'garman_klass', 'gk', 'hodges_tompkins', 'ht', 'rogers_satchell', 'rs', 'yang_zhang', 'yz', 'squared_returns', 'sq'] = 'std', grouped_mean: list[int] | None = None, _trading_periods: int = 252, _column_name_returns: str = 'log_returns', _column_name_close: str = 'close', _column_name_high: str = 'high', _column_name_low: str = 'low', _column_name_open: str = 'open', *, _sort: bool = True) -> LazyFrame | DataFrame

Context: Toolbox || Category: Technical || Command: calc_realized_volatility.

Calculates the Realized Volatility for a given time series based on the provided standard and extra parameters. This function adds ONE rolling volatility column to the input DataFrame.

Parameters:

Name Type Description Default
data DataFrame | LazyFrame

The time series data for which to calculate the Realized Volatility.

required
window str

The window size for a rolling volatility calculation, default is "1m" (1 month).

'1m'
method Literal['std', 'parkinson', 'garman_klass', 'hodges_tompkins', 'rogers_satchell', 'yang_zhang', 'squared_returns']

The volatility estimator to use. You can also use abbreviations to access the same methods. The abbreviations are: gk for garman_klass, ht for hodges_tompkins, rs for rogers_satchell, yz for yang_zhang, sq for squared_returns.

'std'
grouped_mean list[int] | None

A list of window sizes to use for calculating volatility. If provided, the volatility method will be calculated across these various windows, and then an averaged value of all the windows will be returned. If None, a single window size specified by window parameter will be used.

None
_sort bool

If True, the data will be sorted before calculation. Default is True.

True
_trading_periods int

The number of trading periods in a year, default is 252 (the typical number of trading days in a year).

252
_column_name_returns str

The name of the column containing the returns. Default is "log_returns".

'log_returns'
_column_name_close str

The name of the column containing the close prices. Default is "close".

'close'
_column_name_high str

The name of the column containing the high prices. Default is "high".

'high'
_column_name_low str

The name of the column containing the low prices. Default is "low".

'low'
_column_name_open str

The name of the column containing the open prices. Default is "open".

'open'

Returns:

Type Description
VolatilityData

The calculated Realized Volatility data for the given time series.

Notes
  • Rolling calculations are used to show a time series of recent volatility that captures only a certain number of data points. The window size is used to determine the number of data points to use in the calculation. We do this because when looking at the volatility of a stock, you get a better insight (more granular) into the characteristics of the volatility seeing how 1-month or 3-month rolling volatility looked over time.

  • This function does not accept pl.Series because the methods used to calculate volatility require, high, low, close, open columns for the data. It would be too cumbersome to pass each series needed for the calculation as a separate argument. Therefore, the function only accepts pl.DataFrame or pl.LazyFrame as input.

Source code in src/humbldata/toolbox/technical/volatility/realized_volatility_model.py
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
def calc_realized_volatility(
    data: pl.DataFrame | pl.LazyFrame,
    window: str = "1m",
    method: Literal[  # used to be rvol_method
        "std",
        "parkinson",
        "garman_klass",
        "gk",
        "hodges_tompkins",
        "ht",
        "rogers_satchell",
        "rs",
        "yang_zhang",
        "yz",
        "squared_returns",
        "sq",
    ] = "std",
    grouped_mean: list[int] | None = None,  # used to be rv_mean
    _trading_periods: int = 252,
    _column_name_returns: str = "log_returns",
    _column_name_close: str = "close",
    _column_name_high: str = "high",
    _column_name_low: str = "low",
    _column_name_open: str = "open",
    *,
    _sort: bool = True,
) -> pl.LazyFrame | pl.DataFrame:
    """
    Context: Toolbox || Category: Technical || **Command: calc_realized_volatility**.

    Calculates the Realized Volatility for a given time series based on the
    provided standard and extra parameters. This function adds ONE rolling
    volatility column to the input DataFrame.

    Parameters
    ----------
    data : pl.DataFrame | pl.LazyFrame
        The time series data for which to calculate the Realized Volatility.
    window : str
        The window size for a rolling volatility calculation, default is `"1m"`
        (1 month).
    method : Literal["std", "parkinson", "garman_klass", "hodges_tompkins","rogers_satchell", "yang_zhang", "squared_returns"]
        The volatility estimator to use. You can also use abbreviations to
        access the same methods. The abbreviations are: `gk` for `garman_klass`,
        `ht` for `hodges_tompkins`, `rs` for `rogers_satchell`, `yz` for
        `yang_zhang`, `sq` for `squared_returns`.
    grouped_mean : list[int] | None
        A list of window sizes to use for calculating volatility. If provided,
        the volatility method will be calculated across these various windows,
        and then an averaged value of all the windows will be returned. If `None`,
        a single window size specified by `window` parameter will be used.
    _sort : bool
        If True, the data will be sorted before calculation. Default is True.
    _trading_periods : int
        The number of trading periods in a year, default is 252 (the typical
        number of trading days in a year).
    _column_name_returns : str
        The name of the column containing the returns. Default is "log_returns".
    _column_name_close : str
        The name of the column containing the close prices. Default is "close".
    _column_name_high : str
        The name of the column containing the high prices. Default is "high".
    _column_name_low : str
        The name of the column containing the low prices. Default is "low".
    _column_name_open : str
        The name of the column containing the open prices. Default is "open".

    Returns
    -------
    VolatilityData
        The calculated Realized Volatility data for the given time series.

    Notes
    -----
    - Rolling calculations are used to show a time series of recent volatility
    that captures only a certain number of data points. The window size is
    used to determine the number of data points to use in the calculation. We do
    this because when looking at the volatility of a stock, you get a better
    insight (more granular) into the characteristics of the volatility seeing how 1-month or
    3-month rolling volatility looked over time.

    - This function does not accept `pl.Series` because the methods used to
    calculate volatility require, high, low, close, open columns for the data.
    It would be too cumbersome to pass each series needed for the calculation
    as a separate argument. Therefore, the function only accepts `pl.DataFrame`
    or `pl.LazyFrame` as input.
    """  # noqa: W505
    # Step 1: Get the correct realized volatility function =====================
    func = VOLATILITY_METHODS.get(method)
    if not func:
        msg = f"Volatility method: '{method}' is not supported."
        raise HumblDataError(msg)

    # Step 2: Get the names of the parameters that the function accepts ========
    func_params = inspect.signature(func).parameters

    # Step 3: Filter out the parameters not accepted by the function ===========
    args_to_pass = {
        key: value for key, value in locals().items() if key in func_params
    }

    # Step 4: Calculate Realized Volatility ====================================
    if grouped_mean:
        # calculate volatility over multiple windows and average the result, add to a new column
        print("🚧 WIP!")
    else:
        out = func(**args_to_pass)

    return out

humbldata.core ¤

The core module to contain logic & functions used in controllers.

This module is intended to contain sub-modules and functions that are not directly utilized from the package, but rather used in building the package itself. This means that the core module should not contain any code that is specific to the package's use case, but rather should be generic and reusable in other contexts.

humbldata.core.utils ¤

humbldata core utils.

Utils is used to keep; helpers, descriptions, constants, and other useful tools.

humbldata.core.utils.openbb_helpers ¤

Core Module - OpenBB Helpers.

This module contains functions used to interact with OpenBB, or wrap commands to have specific data outputs.

humbldata.core.utils.openbb_helpers.obb_login ¤
obb_login(pat: str | None = None) -> bool

Log into the OpenBB Hub using a Personal Access Token (PAT).

This function wraps the obb.account.login method to provide a simplified interface for logging into OpenBB Hub. It optionally accepts a PAT. If no PAT is provided, it attempts to use the PAT stored in the environment variable OBB_PAT.

Parameters:

Name Type Description Default
pat str | None

The personal access token for authentication. If None, the token is retrieved from the environment variable OBB_PAT. Default is None.

None

Returns:

Type Description
bool

True if login is successful, False otherwise.

Raises:

Type Description
HumblDataError

If an error occurs during the login process.

Examples:

>>> # obb_login("your_personal_access_token_here")
True
>>> # obb_login()  # Assumes `OBB_PAT` is set in the environment
True
Source code in src/humbldata/core/utils/openbb_helpers.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def obb_login(pat: str | None = None) -> bool:
    """
    Log into the OpenBB Hub using a Personal Access Token (PAT).

    This function wraps the `obb.account.login` method to provide a simplified
    interface for logging into OpenBB Hub. It optionally accepts a PAT. If no PAT
    is provided, it attempts to use the PAT stored in the environment variable
    `OBB_PAT`.

    Parameters
    ----------
    pat : str | None, optional
        The personal access token for authentication. If None, the token is
        retrieved from the environment variable `OBB_PAT`. Default is None.

    Returns
    -------
    bool
        True if login is successful, False otherwise.

    Raises
    ------
    HumblDataError
        If an error occurs during the login process.

    Examples
    --------
    >>> # obb_login("your_personal_access_token_here")
    True

    >>> # obb_login()  # Assumes `OBB_PAT` is set in the environment
    True

    """
    if pat is None:
        pat = Env().OBB_PAT
    try:
        obb.account.login(pat=pat, remember_me=True)
        # obb.account.save()

        # dotenv.set_key(dotenv.find_dotenv(), "OBB_LOGGED_IN", "true")

        return True
    except Exception as e:
        from humbldata.core.standard_models.abstract.warnings import (
            HumblDataWarning,
        )

        # dotenv.set_key(dotenv.find_dotenv(), "OBB_LOGGED_IN", "false")

        warnings.warn(
            "An error occurred while logging into OpenBB. Details below:\n"
            + repr(e),
            category=HumblDataWarning,
            stacklevel=1,
        )
        return False
humbldata.core.utils.openbb_helpers.get_latest_price ¤
get_latest_price(symbol: str | list[str] | Series, provider: OBB_EQUITY_PRICE_QUOTE_PROVIDERS | None = 'yfinance') -> LazyFrame

Context: Core || Category: Utils || Subcategory: OpenBB Helpers || Command: get_latest_price.

Queries the latest stock price data for the given symbol(s) using the specified provider. Defaults to YahooFinance (yfinance) if no provider is specified. Returns a LazyFrame with the stock symbols and their latest prices.

Parameters:

Name Type Description Default
symbol str | list[str] | Series

The stock symbol(s) to query for the latest price. Accepts a single symbol, a list of symbols, or a Polars Series of symbols.

required
provider OBB_EQUITY_PRICE_QUOTE_PROVIDERS

The data provider for fetching stock prices. Defaults is yfinance, in which case a default provider is used.

'yfinance'

Returns:

Type Description
LazyFrame

A Polars LazyFrame with columns for the stock symbols ('symbol') and their latest prices ('last_price').

Source code in src/humbldata/core/utils/openbb_helpers.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def get_latest_price(
    symbol: str | list[str] | pl.Series,
    provider: OBB_EQUITY_PRICE_QUOTE_PROVIDERS | None = "yfinance",
) -> pl.LazyFrame:
    """
    Context: Core || Category: Utils || Subcategory: OpenBB Helpers || **Command: get_latest_price**.

    Queries the latest stock price data for the given symbol(s) using the
    specified provider. Defaults to YahooFinance (`yfinance`) if no provider is
    specified. Returns a LazyFrame with the stock symbols and their latest prices.

    Parameters
    ----------
    symbol : str | list[str] | pl.Series
        The stock symbol(s) to query for the latest price. Accepts a single
        symbol, a list of symbols, or a Polars Series of symbols.
    provider : OBB_EQUITY_PRICE_QUOTE_PROVIDERS, optional
        The data provider for fetching stock prices. Defaults is `yfinance`,
        in which case a default provider is used.

    Returns
    -------
    pl.LazyFrame
        A Polars LazyFrame with columns for the stock symbols ('symbol') and
        their latest prices ('last_price').
    """
    logging.getLogger("openbb_terminal.stocks.stocks_model").setLevel(
        logging.CRITICAL
    )

    return (
        obb.equity.price.quote(symbol, provider=provider)
        .to_polars()
        .lazy()
        .select(["symbol", "last_price"])
        .rename({"last_price": "recent_price"})
    )
humbldata.core.utils.openbb_helpers.aget_latest_price async ¤
aget_latest_price(symbols: str | list[str] | Series, provider: OBB_EQUITY_PRICE_QUOTE_PROVIDERS | None = 'yfinance') -> LazyFrame

Asynchronous version of get_latest_price.

Context: Core || Category: Utils || Subcategory: OpenBB Helpers || Command: get_latest_price_async.

Queries the latest stock price data for the given symbol(s) using the specified provider asynchronously. This functions collects the latest prices for ETF's and Equities, but not futures or options. Defaults to YahooFinance (yfinance) if no provider is specified. Returns a LazyFrame with the stock symbols and their latest prices.

Parameters:

Name Type Description Default
symbols str | List[str] | Series

The stock symbol(s) to query for the latest price. Accepts a single symbol, a list of symbols, or a Polars Series of symbols.

required
provider OBB_EQUITY_PRICE_QUOTE_PROVIDERS

The data provider for fetching stock prices. Default is yfinance.

'yfinance'

Returns:

Type Description
LazyFrame

A Polars LazyFrame with columns for the stock symbols ('symbol') and their latest prices ('recent_price').

Notes

If you run into an error: RuntimeError: asyncio.run() cannot be called from a running event loop you can use the following code to apply the asyncio event loop to the current thread:

import nest_asyncio
nest_asyncio.apply()

Source code in src/humbldata/core/utils/openbb_helpers.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
async def aget_latest_price(
    symbols: str | list[str] | pl.Series,
    provider: OBB_EQUITY_PRICE_QUOTE_PROVIDERS | None = "yfinance",
) -> pl.LazyFrame:
    """
    Asynchronous version of get_latest_price.

    Context: Core || Category: Utils || Subcategory: OpenBB Helpers || **Command: get_latest_price_async**.

    Queries the latest stock price data for the given symbol(s) using the
    specified provider asynchronously. This functions collects the latest prices
    for ETF's and Equities, but not futures or options. Defaults to YahooFinance
    (`yfinance`) if no provider is specified. Returns a LazyFrame with the stock
    symbols and their latest prices.

    Parameters
    ----------
    symbols : str | List[str] | pl.Series
        The stock symbol(s) to query for the latest price. Accepts a single
        symbol, a list of symbols, or a Polars Series of symbols.
    provider : OBB_EQUITY_PRICE_QUOTE_PROVIDERS, optional
        The data provider for fetching stock prices. Default is `yfinance`.

    Returns
    -------
    pl.LazyFrame
        A Polars LazyFrame with columns for the stock symbols ('symbol') and
        their latest prices ('recent_price').

    Notes
    -----
    If you run into an error: `RuntimeError: asyncio.run() cannot be called from a running event loop`
    you can use the following code to apply the asyncio event loop to the current thread:
    ```
    import nest_asyncio
    nest_asyncio.apply()
    ```
    """
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(
        None, lambda: obb.equity.price.quote(symbols, provider=provider)
    )
    out = result.to_polars().lazy()
    if {"last_price", "prev_close"}.issubset(out.collect_schema().names()):
        out = out.select(
            [
                pl.when(pl.col("asset_type") == "ETF")
                .then(pl.col("prev_close"))
                .otherwise(pl.col("last_price"))
                .alias("last_price"),
                pl.col("symbol"),
            ]
        )
    elif "last_price" not in out.collect_schema().names():
        out = out.select(
            pl.col("symbol"), pl.col("prev_close").alias("last_price")
        )
    else:
        out = out.select(pl.col("symbol"), pl.col("last_price"))

    return out
humbldata.core.utils.openbb_helpers.aget_last_close async ¤
aget_last_close(symbols: str | list[str] | Series, provider: OBB_EQUITY_PRICE_QUOTE_PROVIDERS = 'yfinance') -> LazyFrame

Context: Core || Category: Utils || Subcategory: OpenBB Helpers || Command: aget_last_close.

Asynchronously retrieves the last closing price for the given stock symbol(s) using OpenBB's equity price quote data.

Parameters:

Name Type Description Default
symbols str | List[str] | Series

The stock symbol(s) to query for the last closing price. Accepts a single symbol, a list of symbols, or a Polars Series of symbols.

required
provider OBB_EQUITY_PRICE_QUOTE_PROVIDERS

The data provider for fetching stock prices. Default is yfinance.

'yfinance'

Returns:

Type Description
LazyFrame

A Polars LazyFrame with columns for the stock symbols ('symbol') and their last closing prices ('prev_close').

Notes

This function uses OpenBB's equity price quote data to fetch the last closing price. It returns a lazy frame for efficient processing, especially with large datasets.

Source code in src/humbldata/core/utils/openbb_helpers.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
async def aget_last_close(
    symbols: str | list[str] | pl.Series,
    provider: OBB_EQUITY_PRICE_QUOTE_PROVIDERS = "yfinance",
) -> pl.LazyFrame:
    """
    Context: Core || Category: Utils || Subcategory: OpenBB Helpers || **Command: aget_last_close**.

    Asynchronously retrieves the last closing price for the given stock symbol(s) using OpenBB's equity price quote data.

    Parameters
    ----------
    symbols : str | List[str] | pl.Series
        The stock symbol(s) to query for the last closing price. Accepts a single
        symbol, a list of symbols, or a Polars Series of symbols.
    provider : OBB_EQUITY_PRICE_QUOTE_PROVIDERS, optional
        The data provider for fetching stock prices. Default is `yfinance`.

    Returns
    -------
    pl.LazyFrame
        A Polars LazyFrame with columns for the stock symbols ('symbol') and
        their last closing prices ('prev_close').

    Notes
    -----
    This function uses OpenBB's equity price quote data to fetch the last closing price.
    It returns a lazy frame for efficient processing, especially with large datasets.
    """
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(
        None, lambda: obb.equity.price.quote(symbols, provider=provider)
    )
    out = result.to_polars().lazy()

    return out.select(pl.col("symbol"), pl.col("prev_close"))
humbldata.core.utils.openbb_helpers.get_equity_sector ¤
get_equity_sector(symbols: str | list[str] | Series, provider: OBB_EQUITY_PROFILE_PROVIDERS | None = 'yfinance') -> LazyFrame

Context: Core || Category: Utils || Subcategory: OpenBB Helpers || Command: get_sector.

Retrieves the sector information for the given stock symbol(s) using OpenBB's equity profile data.

Parameters:

Name Type Description Default
symbols str | list[str] | Series

The stock symbol(s) to query for sector information. Accepts a single symbol, a list of symbols, or a Polars Series of symbols.

required
provider str | None

The data provider to use for fetching sector information. If None, the default provider will be used.

'yfinance'

Returns:

Type Description
LazyFrame

A Polars LazyFrame with columns for the stock symbols ('symbol') and their corresponding sectors ('sector').

Notes

This function uses OpenBB's equity profile data to fetch sector information. It returns a lazy frame for efficient processing, especially with large datasets.

Source code in src/humbldata/core/utils/openbb_helpers.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
def get_equity_sector(
    symbols: str | list[str] | pl.Series,
    provider: OBB_EQUITY_PROFILE_PROVIDERS | None = "yfinance",
) -> pl.LazyFrame:
    """
    Context: Core || Category: Utils || Subcategory: OpenBB Helpers || **Command: get_sector**.

    Retrieves the sector information for the given stock symbol(s) using OpenBB's equity profile data.

    Parameters
    ----------
    symbols : str | list[str] | pl.Series
        The stock symbol(s) to query for sector information. Accepts a single
        symbol, a list of symbols, or a Polars Series of symbols.
    provider : str | None, optional
        The data provider to use for fetching sector information. If None, the default
        provider will be used.

    Returns
    -------
    pl.LazyFrame
        A Polars LazyFrame with columns for the stock symbols ('symbol') and
        their corresponding sectors ('sector').

    Notes
    -----
    This function uses OpenBB's equity profile data to fetch sector information.
    It returns a lazy frame for efficient processing, especially with large datasets.
    """
    try:
        result = obb.equity.profile(symbols, provider=provider)
        return result.to_polars().select(["symbol", "sector"]).lazy()
    except pl.exceptions.ColumnNotFoundError:
        # If an error occurs, return a LazyFrame with symbol and null sector
        if isinstance(symbols, str):
            symbols = [symbols]
        elif isinstance(symbols, pl.Series):
            symbols = symbols.to_list()
        return pl.LazyFrame(
            {"symbol": symbols, "sector": [None] * len(symbols)}
        )
humbldata.core.utils.openbb_helpers.aget_equity_sector async ¤
aget_equity_sector(symbols: str | list[str] | Series, provider: OBB_EQUITY_PROFILE_PROVIDERS | None = 'yfinance') -> LazyFrame

Asynchronous version of get_sector.

Context: Core || Category: Utils || Subcategory: OpenBB Helpers || Command: get_sector_async.

Retrieves the sector information for the given stock symbol(s) using OpenBB's equity profile data asynchronously. If an ETF is passed, it will return a NULL sector for the symbol. The sector returned hasn't been normalized to GICS_SECTORS, it is the raw OpenBB sector output. Sectors are normalized to GICS_SECTORS in the aet_sector_filter function.

Parameters:

Name Type Description Default
symbols str | List[str] | Series

The stock symbol(s) to query for sector information. Accepts a single symbol, a list of symbols, or a Polars Series of symbols.

required
provider str | None

The data provider to use for fetching sector information. If None, the default provider will be used.

'yfinance'

Returns:

Type Description
LazyFrame

A Polars LazyFrame with columns for the stock symbols ('symbol') and their corresponding sectors ('sector').

Notes

This function uses OpenBB's equity profile data to fetch sector information. It returns a lazy frame for efficient processing, especially with large datasets.

If you just pass an ETF to the obb.equity.profile function, it will throw return data without the NULL columns (sector column included) and only returns columns where there is data, so we need to handle that edge case. If an ETF is included with an equity, it will return a NULL sector column, so we can select the sector column from the ETF data and return it as a NULL sector for the equity.

Source code in src/humbldata/core/utils/openbb_helpers.py
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
async def aget_equity_sector(
    symbols: str | list[str] | pl.Series,
    provider: OBB_EQUITY_PROFILE_PROVIDERS | None = "yfinance",
) -> pl.LazyFrame:
    """
    Asynchronous version of get_sector.

    Context: Core || Category: Utils || Subcategory: OpenBB Helpers || **Command: get_sector_async**.

    Retrieves the sector information for the given stock symbol(s) using
    OpenBB's equity profile data asynchronously. If an ETF is passed, it will
    return a NULL sector for the symbol. The sector returned hasn't been
    normalized to GICS_SECTORS, it is the raw OpenBB sector output.
    Sectors are normalized to GICS_SECTORS in the `aet_sector_filter` function.

    Parameters
    ----------
    symbols : str | List[str] | pl.Series
        The stock symbol(s) to query for sector information. Accepts a single
        symbol, a list of symbols, or a Polars Series of symbols.
    provider : str | None, optional
        The data provider to use for fetching sector information. If None, the default
        provider will be used.

    Returns
    -------
    pl.LazyFrame
        A Polars LazyFrame with columns for the stock symbols ('symbol') and
        their corresponding sectors ('sector').

    Notes
    -----
    This function uses OpenBB's equity profile data to fetch sector information.
    It returns a lazy frame for efficient processing, especially with large datasets.

    If you just pass an ETF to the `obb.equity.profile` function, it will throw
    return data without the NULL columns (sector column included) and only
    returns columns where there is data, so we need to handle that edge case.
    If an ETF is included with an equity, it will return a NULL sector column,
    so we can select the sector column from the ETF data and return it as a
    NULL sector for the equity.
    """
    loop = asyncio.get_event_loop()
    try:
        result = await loop.run_in_executor(
            None, lambda: obb.equity.profile(symbols, provider=provider)
        )
        return result.to_polars().select(["symbol", "sector"]).lazy()
    except pl.exceptions.ColumnNotFoundError:
        # If an error occurs, return a LazyFrame with symbol and null sector
        if isinstance(symbols, str):
            symbols = [symbols]
        elif isinstance(symbols, pl.Series):
            symbols = symbols.to_list()
        return pl.LazyFrame(
            {"symbol": symbols, "sector": [None] * len(symbols)}
        ).cast(pl.Utf8)
humbldata.core.utils.openbb_helpers.aget_etf_category async ¤
aget_etf_category(symbols: str | list[str] | Series, provider: OBB_ETF_INFO_PROVIDERS | None = 'yfinance') -> LazyFrame

Asynchronously retrieves the category information for the given ETF symbol(s).

This function uses the obb.etf.info function and selects the category column to get the sector information. This function handles EQUITY symbols that are not ETF's the same way that aget_equity_sector does. The sector returned (under the OpenBB column name category) hasn't been normalized to GICS_SECTORS, it is the raw OpenBB category output. Sectors are normalized to GICS_SECTORS in the aget_sector_filter function.

Parameters:

Name Type Description Default
symbols str | list[str] | Series

The ETF symbol(s) to query for category information.

required
provider OBB_EQUITY_PROFILE_PROVIDERS | None
'yfinance'

Returns:

Type Description
LazyFrame

A Polars LazyFrame with columns for the ETF symbols ('symbol') and their corresponding categories ('category').

Source code in src/humbldata/core/utils/openbb_helpers.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
async def aget_etf_category(
    symbols: str | list[str] | pl.Series,
    provider: OBB_ETF_INFO_PROVIDERS | None = "yfinance",
) -> pl.LazyFrame:
    """
    Asynchronously retrieves the category information for the given ETF symbol(s).

    This function uses the `obb.etf.info` function and selects the `category`
    column to get the sector information. This function handles EQUITY
    symbols that are not ETF's the same way that `aget_equity_sector` does.
    The sector returned (under the OpenBB column name `category`) hasn't been
    normalized to GICS_SECTORS, it is the raw OpenBB category output.
    Sectors are normalized to GICS_SECTORS in the `aget_sector_filter` function.

    Parameters
    ----------
    symbols : str | list[str] | pl.Series
        The ETF symbol(s) to query for category information.
    provider : OBB_EQUITY_PROFILE_PROVIDERS | None, optional

    Returns
    -------
    pl.LazyFrame
        A Polars LazyFrame with columns for the ETF symbols ('symbol') and
        their corresponding categories ('category').
    """
    loop = asyncio.get_event_loop()
    try:
        result = await loop.run_in_executor(
            None, lambda: obb.etf.info(symbols, provider=provider)
        )
        out = result.to_polars().lazy().select(["symbol", "category"])
        # Create a LazyFrame with all input symbols
        all_symbols = pl.LazyFrame({"symbol": symbols})

        # Left join to include all input symbols, filling missing sectors with null
        out = all_symbols.join(out, on="symbol", how="left").with_columns(
            [
                pl.when(pl.col("category").is_null())
                .then(None)
                .otherwise(pl.col("category"))
                .alias("category")
            ]
        )
    except OpenBBError:
        if isinstance(symbols, str):
            symbols = [symbols]
        elif isinstance(symbols, pl.Series):
            symbols = symbols.to_list()
        return pl.LazyFrame(
            {"symbol": symbols, "category": [None] * len(symbols)}
        ).cast(pl.Utf8)
    return out

humbldata.core.utils.constants ¤

A module to contain all project-wide constants.

humbldata.core.utils.logger ¤

humbldata.core.utils.logger.setup_logger ¤
setup_logger(name: str, level: int = logging.INFO) -> Logger

Set up a logger with the specified name and logging level.

Parameters:

Name Type Description Default
name str

The name of the logger.

required
level int

The logging level, by default logging.INFO.

INFO

Returns:

Type Description
Logger

A configured logger instance.

Notes

This function creates a logger with a StreamHandler that outputs to sys.stdout. It uses a formatter that includes timestamp, logger name, log level, and message. If the logger already has handlers, it skips the setup to avoid duplicate logging. The logger is configured not to propagate messages to the root logger.

Examples:

>>> logger = setup_logger("my_logger", logging.DEBUG)
>>> logger.debug("This is a debug message")
2023-05-20 10:30:45,123 - my_logger - DEBUG - This is a debug message
Source code in src/humbldata/core/utils/logger.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def setup_logger(name: str, level: int = logging.INFO) -> logging.Logger:
    """
    Set up a logger with the specified name and logging level.

    Parameters
    ----------
    name : str
        The name of the logger.
    level : int, optional
        The logging level, by default logging.INFO.

    Returns
    -------
    logging.Logger
        A configured logger instance.

    Notes
    -----
    This function creates a logger with a StreamHandler that outputs to sys.stdout.
    It uses a formatter that includes timestamp, logger name, log level, and message.
    If the logger already has handlers, it skips the setup to avoid duplicate logging.
    The logger is configured not to propagate messages to the root logger.

    Examples
    --------
    >>> logger = setup_logger("my_logger", logging.DEBUG)
    >>> logger.debug("This is a debug message")
    2023-05-20 10:30:45,123 - my_logger - DEBUG - This is a debug message
    """
    logger = logging.getLogger(name)

    # Check if the logger already has handlers to avoid duplicate logging
    if not logger.handlers:
        logger.setLevel(level)

        # Install coloredlogs
        coloredlogs.install(
            level=level,
            logger=logger,
            fmt="%(levelname)s: %(name)s || %(message)s",
            level_styles={
                "debug": {"color": "green"},
                "info": {"color": "blue"},
                "warning": {"color": "yellow", "bold": True},
                "error": {"color": "red", "bold": True},
                "critical": {
                    "color": "red",
                    "bold": True,
                    "background": "white",
                },
            },
            field_styles={
                "asctime": {"color": "blue"},
                "levelname": {"color": "magenta", "bold": True},
                "name": {"color": "cyan"},
            },
        )

    # Prevent the logger from propagating messages to the root logger
    logger.propagate = False

    return logger
humbldata.core.utils.logger.log_start_end ¤
log_start_end(func: Callable | None = None, *, logger: Logger | None = None) -> Callable

Log the start and end of any function, including time tracking.

This decorator works with both synchronous and asynchronous functions. It logs the start and end of the function execution, as well as the total execution time. If an exception occurs, it logs the exception details.

Parameters:

Name Type Description Default
func Callable | None

The function to be decorated. If None, the decorator can be used with parameters.

None
logger Logger | None

The logger to use. If None, a logger will be created using the function's module name.

None

Returns:

Type Description
Callable

The wrapped function.

Notes
  • For asynchronous functions, the decorator uses an async wrapper.
  • For synchronous functions, it uses a sync wrapper.
  • If a KeyboardInterrupt occurs, it logs the interruption and returns an empty list.
  • If any other exception occurs, it logs the exception and re-raises it.

Examples:

>>> @log_start_end
... def example_function():
...     print("This is an example function")
...
>>> example_function()
START: example_function (sync)
This is an example function
END: example_function (sync) - Total time: 0.0001s
>>> @log_start_end(logger=custom_logger)
... async def async_example():
...     await asyncio.sleep(1)
...
>>> asyncio.run(async_example())
START: async_example (async)
END: async_example (async) - Total time: 1.0012s
Source code in src/humbldata/core/utils/logger.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
def log_start_end(
    func: Callable | None = None, *, logger: logging.Logger | None = None
) -> Callable:
    """
    Log the start and end of any function, including time tracking.

    This decorator works with both synchronous and asynchronous functions.
    It logs the start and end of the function execution, as well as the total
    execution time. If an exception occurs, it logs the exception details.

    Parameters
    ----------
    func : Callable | None, optional
        The function to be decorated. If None, the decorator can be used with parameters.
    logger : logging.Logger | None, optional
        The logger to use. If None, a logger will be created using the function's module name.

    Returns
    -------
    Callable
        The wrapped function.

    Notes
    -----
    - For asynchronous functions, the decorator uses an async wrapper.
    - For synchronous functions, it uses a sync wrapper.
    - If a KeyboardInterrupt occurs, it logs the interruption and returns an empty list.
    - If any other exception occurs, it logs the exception and re-raises it.

    Examples
    --------
    >>> @log_start_end
    ... def example_function():
    ...     print("This is an example function")
    ...
    >>> example_function()
    START: example_function (sync)
    This is an example function
    END: example_function (sync) - Total time: 0.0001s

    >>> @log_start_end(logger=custom_logger)
    ... async def async_example():
    ...     await asyncio.sleep(1)
    ...
    >>> asyncio.run(async_example())
    START: async_example (async)
    END: async_example (async) - Total time: 1.0012s
    """
    assert callable(func) or func is None

    def decorator(func: Callable) -> Callable:
        @functools.wraps(func)
        async def async_wrapper(*args, **kwargs) -> Any:
            nonlocal logger
            if logger is None:
                logger = logging.getLogger(func.__module__)

            start_time = time.time()
            logger.info(f"START: {func.__name__} (async)")

            try:
                result = await func(*args, **kwargs)
            except KeyboardInterrupt:
                end_time = time.time()
                total_time = end_time - start_time
                logger.info(
                    f"INTERRUPTED: {func.__name__} (async) - Total time: {total_time:.4f}s"
                )
                return []
            except Exception as e:
                end_time = time.time()
                total_time = end_time - start_time
                logger.exception(
                    f"EXCEPTION in {func.__name__} (async) - Total time: {total_time:.4f}s"
                )
                raise
            else:
                end_time = time.time()
                total_time = end_time - start_time
                logger.info(
                    f"END: {func.__name__} (async) - Total time: {total_time:.4f}s"
                )
                return result

        @functools.wraps(func)
        def sync_wrapper(*args, **kwargs) -> Any:
            nonlocal logger
            if logger is None:
                logger = logging.getLogger(func.__module__)

            start_time = time.time()
            logger.info(f"START: {func.__name__} (sync)")

            try:
                result = func(*args, **kwargs)
            except KeyboardInterrupt:
                end_time = time.time()
                total_time = end_time - start_time
                logger.info(
                    f"INTERRUPTED: {func.__name__} (sync) - Total time: {total_time:.4f}s"
                )
                return []
            except Exception as e:
                end_time = time.time()
                total_time = end_time - start_time
                logger.exception(
                    f"EXCEPTION in {func.__name__} (sync) - Total time: {total_time:.4f}s"
                )
                raise
            else:
                end_time = time.time()
                total_time = end_time - start_time
                logger.info(
                    f"END: {func.__name__} (sync) - Total time: {total_time:.4f}s"
                )
                return result

        if asyncio.iscoroutinefunction(func):
            return async_wrapper
        else:
            return sync_wrapper

    return decorator(func) if callable(func) else decorator

humbldata.core.utils.descriptions ¤

Common descriptions for model fields.

humbldata.core.utils.core_helpers ¤

A module to contain core helper functions for the program.

humbldata.core.utils.core_helpers.is_debug_mode ¤
is_debug_mode() -> bool

Check if the current system is in debug mode.

Returns:

Type Description
bool

True if the system is in debug mode, False otherwise.

Source code in src/humbldata/core/utils/core_helpers.py
14
15
16
17
18
19
20
21
22
23
def is_debug_mode() -> bool:
    """
    Check if the current system is in debug mode.

    Returns
    -------
    bool
        True if the system is in debug mode, False otherwise.
    """
    return False
humbldata.core.utils.core_helpers.run_async ¤
run_async(coro)

Run an async function in a new thread and return the result.

Source code in src/humbldata/core/utils/core_helpers.py
26
27
28
29
30
def run_async(coro):
    """Run an async function in a new thread and return the result."""
    with ThreadPoolExecutor() as executor:
        future = executor.submit(lambda: asyncio.run(coro))
        return future.result()

humbldata.core.utils.env ¤

The Env Module, to control a single instance of environment variables.

humbldata.core.utils.env.Env ¤

A singleton environment to hold all Environment variables.

Source code in src/humbldata/core/utils/env.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
class Env(metaclass=SingletonMeta):
    """A singleton environment to hold all Environment variables."""

    _environ: dict[str, str]

    def __init__(self) -> None:
        env_path = dotenv.find_dotenv()
        dotenv.load_dotenv(Path(env_path))

        self._environ = os.environ.copy()

    @property
    def OBB_PAT(self) -> str | None:  # noqa: N802
        """OpenBB Personal Access Token."""
        return self._environ.get("OBB_PAT", None)

    @property
    def LOGGER_LEVEL(self) -> int:
        """
        Get the global logger level.

        Returns
        -------
        int
            The numeric logging level (default: 20 for INFO).

        Notes
        -----
        Mapping of string levels to numeric values:
        DEBUG: 10, INFO: 20, WARNING: 30, ERROR: 40, CRITICAL: 50
        """
        level_map = {
            "DEBUG": 10,
            "INFO": 20,
            "WARNING": 30,
            "ERROR": 40,
            "CRITICAL": 50,
        }
        return level_map.get(
            self._environ.get("LOGGER_LEVEL", "INFO").upper(), 20
        )

    @property
    def OBB_LOGGED_IN(self) -> bool:
        return self.str2bool(self._environ.get("OBB_LOGGED_IN", False))

    @staticmethod
    def str2bool(value: str | bool) -> bool:
        """Match a value to its boolean correspondent.

        Args:
            value (str): The string value to be converted to a boolean.

        Returns
        -------
            bool: The boolean value corresponding to the input string.

        Raises
        ------
            ValueError: If the input string does not correspond to a boolean
            value.
        """
        if isinstance(value, bool):
            return value
        if value.lower() in {"false", "f", "0", "no", "n"}:
            return False
        if value.lower() in {"true", "t", "1", "yes", "y"}:
            return True
        msg = f"Failed to cast '{value}' to bool."
        raise ValueError(msg)
humbldata.core.utils.env.Env.OBB_PAT property ¤
OBB_PAT: str | None

OpenBB Personal Access Token.

humbldata.core.utils.env.Env.LOGGER_LEVEL property ¤
LOGGER_LEVEL: int

Get the global logger level.

Returns:

Type Description
int

The numeric logging level (default: 20 for INFO).

Notes

Mapping of string levels to numeric values: DEBUG: 10, INFO: 20, WARNING: 30, ERROR: 40, CRITICAL: 50

humbldata.core.utils.env.Env.str2bool staticmethod ¤
str2bool(value: str | bool) -> bool

Match a value to its boolean correspondent.

Args: value (str): The string value to be converted to a boolean.

Returns:

Type Description
bool: The boolean value corresponding to the input string.

Raises:

Type Description
ValueError: If the input string does not correspond to a boolean

value.

Source code in src/humbldata/core/utils/env.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
@staticmethod
def str2bool(value: str | bool) -> bool:
    """Match a value to its boolean correspondent.

    Args:
        value (str): The string value to be converted to a boolean.

    Returns
    -------
        bool: The boolean value corresponding to the input string.

    Raises
    ------
        ValueError: If the input string does not correspond to a boolean
        value.
    """
    if isinstance(value, bool):
        return value
    if value.lower() in {"false", "f", "0", "no", "n"}:
        return False
    if value.lower() in {"true", "t", "1", "yes", "y"}:
        return True
    msg = f"Failed to cast '{value}' to bool."
    raise ValueError(msg)

humbldata.core.standard_models ¤

Models to represent core data structures of the Standardization Framework.

humbldata.core.standard_models.abstract ¤

Abstract core DATA MODELS to be inherited by other models.

humbldata.core.standard_models.abstract.humblobject ¤
humbldata.core.standard_models.abstract.humblobject.extract_subclass_dict ¤
extract_subclass_dict(self, attribute_name: str, items: list)

Extract the dictionary representation of the specified attribute.

Parameters:

Name Type Description Default
attribute_name str

The name of the attribute to update in the items list.

required
Source code in src/humbldata/core/standard_models/abstract/humblobject.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def extract_subclass_dict(self, attribute_name: str, items: list):
    """
    Extract the dictionary representation of the specified attribute.

    Parameters
    ----------
    attribute_name : str
        The name of the attribute to update in the items list.
    """
    # Check if the attribute exists and has a value
    attribute_value = getattr(self, attribute_name, None)
    if attribute_value:
        # Assuming the attribute has a method called 'model_dump' to get its dictionary representation
        add_item = attribute_value.model_dump()
        add_item_str = str(add_item)
        if len(add_item_str) > 80:
            add_item_str = add_item_str[:80] + "..."
        for i, item in enumerate(items):
            if item.startswith(f"{attribute_name}:"):
                items[i] = f"{attribute_name}: {add_item_str}"
                break

    return items
humbldata.core.standard_models.abstract.humblobject.HumblObject ¤

Bases: Tagged, Generic[T]

HumblObject is the base class for all dta returned from the Toolbox.

Source code in src/humbldata/core/standard_models/abstract/humblobject.py
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
class HumblObject(Tagged, Generic[T]):
    """HumblObject is the base class for all dta returned from the Toolbox."""

    _user_settings: ClassVar[BaseModel | None] = None
    _system_settings: ClassVar[BaseModel | None] = None

    model_config = ConfigDict(arbitrary_types_allowed=True)

    results: T | None = Field(
        default=None,
        description="Serializable Logical Plan of the pl.LazyFrame results.",
    )
    equity_data: T | None = Field(
        default=None,
        description="Serialized raw data used in the command calculations.",
    )
    provider: str | None = Field(
        default=None,
        description="Provider name.",
    )
    warnings: list[Warning_] | None = Field(
        default=None,
        description="List of warnings.",
    )
    chart: Chart | list[Chart] | None = Field(
        default=None,
        description="Chart object.",
    )
    extra: dict[str, Any] = Field(
        default_factory=dict,
        description="Extra info.",
    )
    context_params: ToolboxQueryParams | PortfolioQueryParams | None = Field(
        default_factory=ToolboxQueryParams,
        title="Context Parameters",
        description="Context parameters.",
    )
    command_params: SerializeAsAny[QueryParams] | None = Field(
        default=QueryParams,
        title="Command Parameters",
        description="Command-specific parameters.",
    )

    # @field_validator("command_params")
    # def validate_command_params(cls, v):
    #     class_name = v.__class__.__name__
    #     if "QueryParams" in class_name:
    #         return v
    #     msg = "Wrong type for 'command_params', must be subclass of QueryParams"
    #     raise TypeError(msg)

    def __repr__(self) -> str:
        """Human readable representation of the object."""
        items = [
            f"{k}: {v}"[:83] + ("..." if len(f"{k}: {v}") > 83 else "")
            for k, v in self.model_dump().items()
        ]

        # Needed to extract subclass dict correctly
        # items = extract_subclass_dict(self, "command_params", items)

        return f"{self.__class__.__name__}\n\n" + "\n".join(items)

    def to_polars(
        self, collect: bool = True, equity_data: bool = False
    ) -> pl.LazyFrame | pl.DataFrame:
        """
        Deserialize the stored results or return the LazyFrame, and optionally collect them into a Polars DataFrame.

        Parameters
        ----------
        collect : bool, optional
            If True, collects the deserialized LazyFrame into a DataFrame.
            Default is True.
        equity_data : bool, optional
            If True, processes equity_data instead of results.
            Default is False.

        Returns
        -------
        pl.LazyFrame | pl.DataFrame
            The results as a Polars LazyFrame or DataFrame,
            depending on the collect parameter.

        Raises
        ------
        HumblDataError
            If no results or equity data are found to process
        """
        data = self.equity_data if equity_data else self.results

        if data is None:
            raise HumblDataError("No data found.")

        if isinstance(data, pl.LazyFrame):
            out = data
        elif isinstance(data, str):
            with io.StringIO(data) as data_io:
                out = pl.LazyFrame.deserialize(data_io, format="json")
        elif isinstance(data, bytes):
            with io.BytesIO(data) as data_io:
                out = pl.LazyFrame.deserialize(data_io, format="binary")
        else:
            raise HumblDataError(
                "Invalid data type. Expected LazyFrame or serialized string."
            )

        if collect:
            out = out.collect()

        return out

    def to_df(
        self, collect: bool = True, equity_data: bool = False
    ) -> pl.LazyFrame | pl.DataFrame:
        """
        Alias for the `to_polars` method.

        Parameters
        ----------
        collect : bool, optional
            If True, collects the deserialized LazyFrame into a DataFrame.
            Default is True.

        Returns
        -------
        pl.LazyFrame | pl.DataFrame
            The deserialized results as a Polars LazyFrame or DataFrame,
            depending on the collect parameter.
        """
        return self.to_polars(collect=collect, equity_data=equity_data)

    def to_pandas(self, equity_data: bool = False) -> pd.DataFrame:
        """
        Convert the results to a Pandas DataFrame.

        Returns
        -------
        pd.DataFrame
            The results as a Pandas DataFrame.
        """
        return self.to_polars(collect=True, equity_data=equity_data).to_pandas()

    def to_numpy(self, equity_data: bool = False) -> np.ndarray:
        """
        Convert the results to a NumPy array.

        Returns
        -------
        np.ndarray
            The results as a NumPy array.
        """
        return self.to_polars(collect=True, equity_data=equity_data).to_numpy()

    def to_dict(
        self,
        row_wise: bool = False,
        equity_data: bool = False,
        as_series: bool = True,
    ) -> dict | list[dict]:
        """
        Transform the stored data into a dictionary or a list of dictionaries.

        This method allows for the conversion of the internal data
        representation into a more universally accessible format, either
        aggregating the entire dataset into a single dictionary (column-wise)
        or breaking it down into a list of dictionaries, each representing a
        row in the dataset.

        Parameters
        ----------
        row_wise : bool, optional
            Determines the format of the output. If set to True, the method
            returns a list of dictionaries, with each dictionary representing a
            row and its corresponding data as key-value pairs. If set to False,
            the method returns a single dictionary, with column names as keys
            and lists of column data as values. Default is False.

        equity_data : bool, optional
            A flag to specify whether to use equity-specific data for the
            conversion. This parameter allows for flexibility in handling
            different types of data stored within the object. Default is
            False.
        as_series : bool, optional
            If True, the method returns a pl.Series with values as Series. If
            False, the method returns a dict with values as List[Any].
            Default is True.

        Returns
        -------
        dict | list[dict]
            Depending on the `row_wise` parameter, either a dictionary mapping column names to lists of values (if `row_wise` is False) or a list of dictionaries, each representing a row in the dataset (if `row_wise` is True).
        """
        if row_wise:
            return self.to_polars(
                collect=True, equity_data=equity_data
            ).to_dicts()
        return self.to_polars(collect=True, equity_data=equity_data).to_dict(
            as_series=as_series
        )

    def to_arrow(self, equity_data: bool = False) -> pa.Table:
        """
        Convert the results to an Arrow Table.

        Returns
        -------
        pa.Table
            The results as an Arrow Table.
        """
        return self.to_polars(collect=True, equity_data=equity_data).to_arrow()

    def to_struct(
        self, name: str = "results", equity_data: bool = False
    ) -> pl.Series:
        """
        Convert the results to a struct.

        Parameters
        ----------
        name : str, optional
            The name of the struct. Default is "results".

        Returns
        -------
        pl.Struct
            The results as a struct.
        """
        return self.to_polars(collect=True, equity_data=equity_data).to_struct(
            name=name
        )

    def to_json(
        self, equity_data: bool = False, chart: bool = False
    ) -> str | list[str]:
        """
        Convert the results to a JSON string.

        Parameters
        ----------
        equity_data : bool, optional
            A flag to specify whether to use equity-specific data for the
            conversion. Default is False.
        chart : bool, optional
            If True, return all generated charts as a JSON string instead of
            returning the results. Default is False.

        Returns
        -------
        str
            The results or charts as a JSON string.

        Raises
        ------
        HumblDataError
            If chart is True but no charts are available.
        """
        import json
        from datetime import date, datetime

        from humbldata.core.standard_models.abstract.errors import (
            HumblDataError,
        )

        def json_serial(obj):
            """JSON serializer for objects not serializable by default json code."""
            if isinstance(obj, (datetime, date)):
                return obj.isoformat()
            msg = f"Type {type(obj)} not serializable"
            raise TypeError(msg)

        if chart:
            if self.chart is None:
                msg = f"You set `.to_json(chart=True)` but there were no charts. Make sure `chart=True` in {self.command_params.__class__.__name__}"
                raise HumblDataError(msg)

            if isinstance(self.chart, list):
                return [
                    chart.content
                    for chart in self.chart
                    if chart and chart.content
                ]
            else:
                return self.chart.content
        else:
            data = self.to_polars(
                collect=True, equity_data=equity_data
            ).to_dict(as_series=False)
            return json.dumps(data, default=json_serial)

    def is_empty(self, equity_data: bool = False) -> bool:
        """
        Check if the results are empty.

        Returns
        -------
        bool
            True if the results are empty, False otherwise.
        """
        return self.to_polars(collect=True, equity_data=equity_data).is_empty()

    def show(self) -> None:
        """Show the chart."""
        if isinstance(self.chart, list):
            for chart in self.chart:
                if chart and chart.fig:
                    chart.fig.show()
                else:
                    msg = "Chart object is missing or incomplete."
                    raise HumblDataError(msg)
        elif not self.chart or not self.chart.fig:
            msg = "Chart not found."
            raise HumblDataError(msg)
humbldata.core.standard_models.abstract.humblobject.HumblObject.__repr__ ¤
__repr__() -> str

Human readable representation of the object.

Source code in src/humbldata/core/standard_models/abstract/humblobject.py
104
105
106
107
108
109
110
111
112
113
114
def __repr__(self) -> str:
    """Human readable representation of the object."""
    items = [
        f"{k}: {v}"[:83] + ("..." if len(f"{k}: {v}") > 83 else "")
        for k, v in self.model_dump().items()
    ]

    # Needed to extract subclass dict correctly
    # items = extract_subclass_dict(self, "command_params", items)

    return f"{self.__class__.__name__}\n\n" + "\n".join(items)
humbldata.core.standard_models.abstract.humblobject.HumblObject.to_polars ¤
to_polars(collect: bool = True, equity_data: bool = False) -> LazyFrame | DataFrame

Deserialize the stored results or return the LazyFrame, and optionally collect them into a Polars DataFrame.

Parameters:

Name Type Description Default
collect bool

If True, collects the deserialized LazyFrame into a DataFrame. Default is True.

True
equity_data bool

If True, processes equity_data instead of results. Default is False.

False

Returns:

Type Description
LazyFrame | DataFrame

The results as a Polars LazyFrame or DataFrame, depending on the collect parameter.

Raises:

Type Description
HumblDataError

If no results or equity data are found to process

Source code in src/humbldata/core/standard_models/abstract/humblobject.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
def to_polars(
    self, collect: bool = True, equity_data: bool = False
) -> pl.LazyFrame | pl.DataFrame:
    """
    Deserialize the stored results or return the LazyFrame, and optionally collect them into a Polars DataFrame.

    Parameters
    ----------
    collect : bool, optional
        If True, collects the deserialized LazyFrame into a DataFrame.
        Default is True.
    equity_data : bool, optional
        If True, processes equity_data instead of results.
        Default is False.

    Returns
    -------
    pl.LazyFrame | pl.DataFrame
        The results as a Polars LazyFrame or DataFrame,
        depending on the collect parameter.

    Raises
    ------
    HumblDataError
        If no results or equity data are found to process
    """
    data = self.equity_data if equity_data else self.results

    if data is None:
        raise HumblDataError("No data found.")

    if isinstance(data, pl.LazyFrame):
        out = data
    elif isinstance(data, str):
        with io.StringIO(data) as data_io:
            out = pl.LazyFrame.deserialize(data_io, format="json")
    elif isinstance(data, bytes):
        with io.BytesIO(data) as data_io:
            out = pl.LazyFrame.deserialize(data_io, format="binary")
    else:
        raise HumblDataError(
            "Invalid data type. Expected LazyFrame or serialized string."
        )

    if collect:
        out = out.collect()

    return out
humbldata.core.standard_models.abstract.humblobject.HumblObject.to_df ¤
to_df(collect: bool = True, equity_data: bool = False) -> LazyFrame | DataFrame

Alias for the to_polars method.

Parameters:

Name Type Description Default
collect bool

If True, collects the deserialized LazyFrame into a DataFrame. Default is True.

True

Returns:

Type Description
LazyFrame | DataFrame

The deserialized results as a Polars LazyFrame or DataFrame, depending on the collect parameter.

Source code in src/humbldata/core/standard_models/abstract/humblobject.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
def to_df(
    self, collect: bool = True, equity_data: bool = False
) -> pl.LazyFrame | pl.DataFrame:
    """
    Alias for the `to_polars` method.

    Parameters
    ----------
    collect : bool, optional
        If True, collects the deserialized LazyFrame into a DataFrame.
        Default is True.

    Returns
    -------
    pl.LazyFrame | pl.DataFrame
        The deserialized results as a Polars LazyFrame or DataFrame,
        depending on the collect parameter.
    """
    return self.to_polars(collect=collect, equity_data=equity_data)
humbldata.core.standard_models.abstract.humblobject.HumblObject.to_pandas ¤
to_pandas(equity_data: bool = False) -> DataFrame

Convert the results to a Pandas DataFrame.

Returns:

Type Description
DataFrame

The results as a Pandas DataFrame.

Source code in src/humbldata/core/standard_models/abstract/humblobject.py
185
186
187
188
189
190
191
192
193
194
def to_pandas(self, equity_data: bool = False) -> pd.DataFrame:
    """
    Convert the results to a Pandas DataFrame.

    Returns
    -------
    pd.DataFrame
        The results as a Pandas DataFrame.
    """
    return self.to_polars(collect=True, equity_data=equity_data).to_pandas()
humbldata.core.standard_models.abstract.humblobject.HumblObject.to_numpy ¤
to_numpy(equity_data: bool = False) -> ndarray

Convert the results to a NumPy array.

Returns:

Type Description
ndarray

The results as a NumPy array.

Source code in src/humbldata/core/standard_models/abstract/humblobject.py
196
197
198
199
200
201
202
203
204
205
def to_numpy(self, equity_data: bool = False) -> np.ndarray:
    """
    Convert the results to a NumPy array.

    Returns
    -------
    np.ndarray
        The results as a NumPy array.
    """
    return self.to_polars(collect=True, equity_data=equity_data).to_numpy()
humbldata.core.standard_models.abstract.humblobject.HumblObject.to_dict ¤
to_dict(row_wise: bool = False, equity_data: bool = False, as_series: bool = True) -> dict | list[dict]

Transform the stored data into a dictionary or a list of dictionaries.

This method allows for the conversion of the internal data representation into a more universally accessible format, either aggregating the entire dataset into a single dictionary (column-wise) or breaking it down into a list of dictionaries, each representing a row in the dataset.

Parameters:

Name Type Description Default
row_wise bool

Determines the format of the output. If set to True, the method returns a list of dictionaries, with each dictionary representing a row and its corresponding data as key-value pairs. If set to False, the method returns a single dictionary, with column names as keys and lists of column data as values. Default is False.

False
equity_data bool

A flag to specify whether to use equity-specific data for the conversion. This parameter allows for flexibility in handling different types of data stored within the object. Default is False.

False
as_series bool

If True, the method returns a pl.Series with values as Series. If False, the method returns a dict with values as List[Any]. Default is True.

True

Returns:

Type Description
dict | list[dict]

Depending on the row_wise parameter, either a dictionary mapping column names to lists of values (if row_wise is False) or a list of dictionaries, each representing a row in the dataset (if row_wise is True).

Source code in src/humbldata/core/standard_models/abstract/humblobject.py
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
def to_dict(
    self,
    row_wise: bool = False,
    equity_data: bool = False,
    as_series: bool = True,
) -> dict | list[dict]:
    """
    Transform the stored data into a dictionary or a list of dictionaries.

    This method allows for the conversion of the internal data
    representation into a more universally accessible format, either
    aggregating the entire dataset into a single dictionary (column-wise)
    or breaking it down into a list of dictionaries, each representing a
    row in the dataset.

    Parameters
    ----------
    row_wise : bool, optional
        Determines the format of the output. If set to True, the method
        returns a list of dictionaries, with each dictionary representing a
        row and its corresponding data as key-value pairs. If set to False,
        the method returns a single dictionary, with column names as keys
        and lists of column data as values. Default is False.

    equity_data : bool, optional
        A flag to specify whether to use equity-specific data for the
        conversion. This parameter allows for flexibility in handling
        different types of data stored within the object. Default is
        False.
    as_series : bool, optional
        If True, the method returns a pl.Series with values as Series. If
        False, the method returns a dict with values as List[Any].
        Default is True.

    Returns
    -------
    dict | list[dict]
        Depending on the `row_wise` parameter, either a dictionary mapping column names to lists of values (if `row_wise` is False) or a list of dictionaries, each representing a row in the dataset (if `row_wise` is True).
    """
    if row_wise:
        return self.to_polars(
            collect=True, equity_data=equity_data
        ).to_dicts()
    return self.to_polars(collect=True, equity_data=equity_data).to_dict(
        as_series=as_series
    )
humbldata.core.standard_models.abstract.humblobject.HumblObject.to_arrow ¤
to_arrow(equity_data: bool = False) -> Table

Convert the results to an Arrow Table.

Returns:

Type Description
Table

The results as an Arrow Table.

Source code in src/humbldata/core/standard_models/abstract/humblobject.py
254
255
256
257
258
259
260
261
262
263
def to_arrow(self, equity_data: bool = False) -> pa.Table:
    """
    Convert the results to an Arrow Table.

    Returns
    -------
    pa.Table
        The results as an Arrow Table.
    """
    return self.to_polars(collect=True, equity_data=equity_data).to_arrow()
humbldata.core.standard_models.abstract.humblobject.HumblObject.to_struct ¤
to_struct(name: str = 'results', equity_data: bool = False) -> Series

Convert the results to a struct.

Parameters:

Name Type Description Default
name str

The name of the struct. Default is "results".

'results'

Returns:

Type Description
Struct

The results as a struct.

Source code in src/humbldata/core/standard_models/abstract/humblobject.py
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
def to_struct(
    self, name: str = "results", equity_data: bool = False
) -> pl.Series:
    """
    Convert the results to a struct.

    Parameters
    ----------
    name : str, optional
        The name of the struct. Default is "results".

    Returns
    -------
    pl.Struct
        The results as a struct.
    """
    return self.to_polars(collect=True, equity_data=equity_data).to_struct(
        name=name
    )
humbldata.core.standard_models.abstract.humblobject.HumblObject.to_json ¤
to_json(equity_data: bool = False, chart: bool = False) -> str | list[str]

Convert the results to a JSON string.

Parameters:

Name Type Description Default
equity_data bool

A flag to specify whether to use equity-specific data for the conversion. Default is False.

False
chart bool

If True, return all generated charts as a JSON string instead of returning the results. Default is False.

False

Returns:

Type Description
str

The results or charts as a JSON string.

Raises:

Type Description
HumblDataError

If chart is True but no charts are available.

Source code in src/humbldata/core/standard_models/abstract/humblobject.py
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
def to_json(
    self, equity_data: bool = False, chart: bool = False
) -> str | list[str]:
    """
    Convert the results to a JSON string.

    Parameters
    ----------
    equity_data : bool, optional
        A flag to specify whether to use equity-specific data for the
        conversion. Default is False.
    chart : bool, optional
        If True, return all generated charts as a JSON string instead of
        returning the results. Default is False.

    Returns
    -------
    str
        The results or charts as a JSON string.

    Raises
    ------
    HumblDataError
        If chart is True but no charts are available.
    """
    import json
    from datetime import date, datetime

    from humbldata.core.standard_models.abstract.errors import (
        HumblDataError,
    )

    def json_serial(obj):
        """JSON serializer for objects not serializable by default json code."""
        if isinstance(obj, (datetime, date)):
            return obj.isoformat()
        msg = f"Type {type(obj)} not serializable"
        raise TypeError(msg)

    if chart:
        if self.chart is None:
            msg = f"You set `.to_json(chart=True)` but there were no charts. Make sure `chart=True` in {self.command_params.__class__.__name__}"
            raise HumblDataError(msg)

        if isinstance(self.chart, list):
            return [
                chart.content
                for chart in self.chart
                if chart and chart.content
            ]
        else:
            return self.chart.content
    else:
        data = self.to_polars(
            collect=True, equity_data=equity_data
        ).to_dict(as_series=False)
        return json.dumps(data, default=json_serial)
humbldata.core.standard_models.abstract.humblobject.HumblObject.is_empty ¤
is_empty(equity_data: bool = False) -> bool

Check if the results are empty.

Returns:

Type Description
bool

True if the results are empty, False otherwise.

Source code in src/humbldata/core/standard_models/abstract/humblobject.py
343
344
345
346
347
348
349
350
351
352
def is_empty(self, equity_data: bool = False) -> bool:
    """
    Check if the results are empty.

    Returns
    -------
    bool
        True if the results are empty, False otherwise.
    """
    return self.to_polars(collect=True, equity_data=equity_data).is_empty()
humbldata.core.standard_models.abstract.humblobject.HumblObject.show ¤
show() -> None

Show the chart.

Source code in src/humbldata/core/standard_models/abstract/humblobject.py
354
355
356
357
358
359
360
361
362
363
364
365
def show(self) -> None:
    """Show the chart."""
    if isinstance(self.chart, list):
        for chart in self.chart:
            if chart and chart.fig:
                chart.fig.show()
            else:
                msg = "Chart object is missing or incomplete."
                raise HumblDataError(msg)
    elif not self.chart or not self.chart.fig:
        msg = "Chart not found."
        raise HumblDataError(msg)
humbldata.core.standard_models.abstract.errors ¤

An ABSTRACT DATA MODEL to be inherited by custom errors.

humbldata.core.standard_models.abstract.errors.HumblDataError ¤

Bases: BaseException

Base Error for HumblData logic.

Source code in src/humbldata/core/standard_models/abstract/errors.py
4
5
6
7
8
9
class HumblDataError(BaseException):
    """Base Error for HumblData logic."""

    def __init__(self, original: str | Exception | None = None):
        self.original = original
        super().__init__(str(original))
humbldata.core.standard_models.abstract.chart ¤
humbldata.core.standard_models.abstract.chart.ChartTemplate ¤

Bases: str, Enum

Chart format.

Available options: - plotly - humbl_light - humbl_dark - plotly_light - plotly_dark - ggplot2 - seaborn - simple_white - presentation - xgridoff - ygridoff - gridon - none

Source code in src/humbldata/core/standard_models/abstract/chart.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class ChartTemplate(str, Enum):
    """
    Chart format.

    Available options:
    - plotly
    - humbl_light
    - humbl_dark
    - plotly_light
    - plotly_dark
    - ggplot2
    - seaborn
    - simple_white
    - presentation
    - xgridoff
    - ygridoff
    - gridon
    - none
    """

    plotly = "plotly"
    humbl_light = "humbl_light"
    humbl_dark = "humbl_dark"
    plotly_light = "plotly_light"
    plotly_dark = "plotly_dark"
    ggplot2 = "ggplot2"
    seaborn = "seaborn"
    simple_white = "simple_white"
    presentation = "presentation"
    xgridoff = "xgridoff"
    ygridoff = "ygridoff"
    gridon = "gridon"
    none = "none"
humbldata.core.standard_models.abstract.chart.Chart ¤

Bases: BaseModel

a Chart Object that is returned from a View.

Source code in src/humbldata/core/standard_models/abstract/chart.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
class Chart(BaseModel):
    """a Chart Object that is returned from a View."""

    content: str | None = Field(
        default=None,
        description="Raw textual representation of the chart.",
    )
    theme: ChartTemplate | None = Field(
        default=ChartTemplate.plotly,
        description="Complementary attribute to the `content` attribute. It specifies the format of the chart.",
    )
    fig: Any | None = Field(
        default=None,
        description="The figure object.",
        # json_schema_extra={"exclude_from_api": True},
    )
    model_config = ConfigDict(validate_assignment=True)

    def __repr__(self) -> str:
        """Human readable representation of the object."""
        items = [
            f"{k}: {v}"[:83] + ("..." if len(f"{k}: {v}") > 83 else "")
            for k, v in self.model_dump().items()
        ]

        return f"{self.__class__.__name__}\n\n" + "\n".join(items)
humbldata.core.standard_models.abstract.chart.Chart.__repr__ ¤
__repr__() -> str

Human readable representation of the object.

Source code in src/humbldata/core/standard_models/abstract/chart.py
60
61
62
63
64
65
66
67
def __repr__(self) -> str:
    """Human readable representation of the object."""
    items = [
        f"{k}: {v}"[:83] + ("..." if len(f"{k}: {v}") > 83 else "")
        for k, v in self.model_dump().items()
    ]

    return f"{self.__class__.__name__}\n\n" + "\n".join(items)
humbldata.core.standard_models.abstract.singleton ¤

An ABSTRACT DATA MODEL, Singleton, to represent a class that should only have one instance.

humbldata.core.standard_models.abstract.singleton.SingletonMeta ¤

Bases: type, Generic[T]

SingletonMeta is a metaclass that creates a Singleton instance of a class.

Singleton design pattern restricts the instantiation of a class to a single instance. This is useful when exactly one object is needed to coordinate actions across the system.

Source code in src/humbldata/core/standard_models/abstract/singleton.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class SingletonMeta(type, Generic[T]):
    """
    SingletonMeta is a metaclass that creates a Singleton instance of a class.

    Singleton design pattern restricts the instantiation of a class to a single
    instance. This is useful when exactly one object is needed to coordinate
    actions across the system.
    """

    _instances: ClassVar[dict[T, T]] = {}  # type: ignore  # noqa: PGH003

    def __call__(cls, *args, **kwargs) -> T:
        """
        Override the __call__ method.

        If the class exists, otherwise creates a new instance and stores it in
        the _instances dictionary.
        """
        if cls not in cls._instances:
            instance = super().__call__(*args, **kwargs)
            cls._instances[cls] = instance  # type: ignore  # noqa: PGH003

        return cls._instances[cls]  # type: ignore  # noqa: PGH003
humbldata.core.standard_models.abstract.singleton.SingletonMeta.__call__ ¤
__call__(*args, **kwargs) -> T

Override the call method.

If the class exists, otherwise creates a new instance and stores it in the _instances dictionary.

Source code in src/humbldata/core/standard_models/abstract/singleton.py
21
22
23
24
25
26
27
28
29
30
31
32
def __call__(cls, *args, **kwargs) -> T:
    """
    Override the __call__ method.

    If the class exists, otherwise creates a new instance and stores it in
    the _instances dictionary.
    """
    if cls not in cls._instances:
        instance = super().__call__(*args, **kwargs)
        cls._instances[cls] = instance  # type: ignore  # noqa: PGH003

    return cls._instances[cls]  # type: ignore  # noqa: PGH003
humbldata.core.standard_models.abstract.query_params ¤

A wrapper around OpenBB QueryParams Standardized Model to use with humbldata.

humbldata.core.standard_models.abstract.query_params.QueryParams ¤

Bases: QueryParams

An abstract standard_model to represent a base QueryParams Data.

QueryParams model should be used to define the query parameters for a context.category.command call.

This QueryParams model is meant to be inherited and built upon by other standard_models for a specific context.

Examples:

class EquityHistoricalQueryParams(QueryParams):

    symbol: str = Field(description=QUERY_DESCRIPTIONS.get("symbol", ""))
    interval: Optional[str] = Field(
        default="1d",
        description=QUERY_DESCRIPTIONS.get("interval", ""),
    )
    start_date: Optional[dateType] = Field(
        default=None,
        description=QUERY_DESCRIPTIONS.get("start_date", ""),
    )
    end_date: Optional[dateType] = Field(
        default=None,
        description=QUERY_DESCRIPTIONS.get("end_date", ""),
    )

    @field_validator("symbol", mode="before", check_fields=False)
    @classmethod
    def upper_symbol(cls, v: Union[str, List[str], Set[str]]):
        if isinstance(v, str):
            return v.upper()
        return ",".join([symbol.upper() for symbol in list(v)])

This would create a class that would be used to query historical price data for equities from any given command.

This could then be used to create a MandelbrotChannelEquityHistoricalQueryParams that would define what query parameters are needed for the Mandelbrot Channel command.

Source code in src/humbldata/core/standard_models/abstract/query_params.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
class QueryParams(OpenBBQueryParams):
    """
    An abstract standard_model to represent a base QueryParams Data.

    QueryParams model should be used to define the query parameters for a
    `context.category.command` call.

    This QueryParams model is meant to be inherited and built upon by other
    standard_models for a specific context.

    Examples
    --------
    ```py
    class EquityHistoricalQueryParams(QueryParams):

        symbol: str = Field(description=QUERY_DESCRIPTIONS.get("symbol", ""))
        interval: Optional[str] = Field(
            default="1d",
            description=QUERY_DESCRIPTIONS.get("interval", ""),
        )
        start_date: Optional[dateType] = Field(
            default=None,
            description=QUERY_DESCRIPTIONS.get("start_date", ""),
        )
        end_date: Optional[dateType] = Field(
            default=None,
            description=QUERY_DESCRIPTIONS.get("end_date", ""),
        )

        @field_validator("symbol", mode="before", check_fields=False)
        @classmethod
        def upper_symbol(cls, v: Union[str, List[str], Set[str]]):
            if isinstance(v, str):
                return v.upper()
            return ",".join([symbol.upper() for symbol in list(v)])
    ```

    This would create a class that would be used to query historical price data
    for equities from any given command.

    This could then be used to create a
    `MandelbrotChannelEquityHistoricalQueryParams` that would define what query
    parameters are needed for the Mandelbrot Channel command.
    """
humbldata.core.standard_models.abstract.tagged ¤

An ABSTRACT DATA MODEL, Tagged, to be inherited by other models as identifier.

humbldata.core.standard_models.abstract.tagged.Tagged ¤

Bases: BaseModel

A class to represent an object tagged with a uuid7.

Source code in src/humbldata/core/standard_models/abstract/tagged.py
 7
 8
 9
10
class Tagged(BaseModel):
    """A class to represent an object tagged with a uuid7."""

    id: str = Field(default_factory=uuid7str, alias="_id")
humbldata.core.standard_models.abstract.data ¤

A wrapper around OpenBB Data Standardized Model to use with humbldata.

humbldata.core.standard_models.abstract.data.Data ¤

Bases: DataFrameModel

An abstract standard_model to represent a base Data Model.

The Data Model should be used to define the data that is being collected and analyzed in a context.category.command call.

This Data model is meant to be inherited and built upon by other standard_models for a specific context.

Example
class EquityHistoricalData(Data):

date: Union[dateType, datetime] = Field(
    description=DATA_DESCRIPTIONS.get("date", "")
)
open: float = Field(description=DATA_DESCRIPTIONS.get("open", ""))
high: float = Field(description=DATA_DESCRIPTIONS.get("high", ""))
low: float = Field(description=DATA_DESCRIPTIONS.get("low", ""))
close: float = Field(description=DATA_DESCRIPTIONS.get("close", ""))
volume: Optional[Union[float, int]] = Field(
    default=None, description=DATA_DESCRIPTIONS.get("volume", "")
)

@field_validator("date", mode="before", check_fields=False)
def date_validate(cls, v):  # pylint: disable=E0213
    v = parser.isoparse(str(v))
    if v.hour == 0 and v.minute == 0:
        return v.date()
    return v
Source code in src/humbldata/core/standard_models/abstract/data.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class Data(pa.DataFrameModel):
    """
    An abstract standard_model to represent a base Data Model.

    The Data Model should be used to define the data that is being
    collected and analyzed in a `context.category.command` call.

    This Data model is meant to be inherited and built upon by other
    standard_models for a specific context.

    Example
    -------
    ```py
    class EquityHistoricalData(Data):

    date: Union[dateType, datetime] = Field(
        description=DATA_DESCRIPTIONS.get("date", "")
    )
    open: float = Field(description=DATA_DESCRIPTIONS.get("open", ""))
    high: float = Field(description=DATA_DESCRIPTIONS.get("high", ""))
    low: float = Field(description=DATA_DESCRIPTIONS.get("low", ""))
    close: float = Field(description=DATA_DESCRIPTIONS.get("close", ""))
    volume: Optional[Union[float, int]] = Field(
        default=None, description=DATA_DESCRIPTIONS.get("volume", "")
    )

    @field_validator("date", mode="before", check_fields=False)
    def date_validate(cls, v):  # pylint: disable=E0213
        v = parser.isoparse(str(v))
        if v.hour == 0 and v.minute == 0:
            return v.date()
        return v

    ```
    """

humbldata.core.standard_models.portfolio ¤

Context: Portfolio || Category: Analytics.

This module defines the QueryParams and Data classes for the Portfolio context.

humbldata.core.standard_models.portfolio.analytics ¤
humbldata.core.standard_models.portfolio.analytics.etf_category ¤

UserTable Standard Model.

Context: Portfolio || Category: Analytics || Command: user_table.

This module is used to define the QueryParams and Data model for the UserTable command.

humbldata.core.standard_models.portfolio.analytics.etf_category.ETFCategoryData ¤

Bases: Data

Data model for the etf_category command, a Pandera.Polars Model.

Used for simple validation of ETF category data for the UserTableFetcher internal logic aggregate_user_table_data()

Source code in src/humbldata/core/standard_models/portfolio/analytics/etf_category.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class ETFCategoryData(Data):
    """
    Data model for the etf_category command, a Pandera.Polars Model.

    Used for simple validation of ETF category data for the UserTableFetcher
    internal logic `aggregate_user_table_data()`
    """

    symbol: str = pa.Field(
        default=None,
        title="Symbol",
        description=QUERY_DESCRIPTIONS.get("symbol", ""),
    )
    category: pl.Utf8 | None = pa.Field(
        default=None,
        title="Category/Sector",
        description=QUERY_DESCRIPTIONS.get("category", ""),
        nullable=True,
    )
humbldata.core.standard_models.portfolio.analytics.user_table ¤

UserTable Standard Model.

Context: Portfolio || Category: Analytics || Command: user_table.

This module is used to define the QueryParams and Data model for the UserTable command.

humbldata.core.standard_models.portfolio.analytics.user_table.UserTableQueryParams ¤

Bases: QueryParams

QueryParams model for the UserTable command, a Pydantic v2 model.

Parameters:

Name Type Description Default
symbols str | list[str] | set[str]

The symbol or ticker of the stock(s). Can be a single symbol, a comma-separated string, or a list/set of symbols. Default is "AAPL". Examples: "AAPL", "AAPL,MSFT", ["AAPL", "MSFT"] All inputs will be converted to uppercase.

required
Notes

The symbols input will be processed to ensure all symbols are uppercase and properly formatted, regardless of the input format.

Source code in src/humbldata/core/standard_models/portfolio/analytics/user_table.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
class UserTableQueryParams(QueryParams):
    """
    QueryParams model for the UserTable command, a Pydantic v2 model.

    Parameters
    ----------
    symbols : str | list[str] | set[str]
        The symbol or ticker of the stock(s). Can be a single symbol, a comma-separated string,
        or a list/set of symbols. Default is "AAPL".
        Examples: "AAPL", "AAPL,MSFT", ["AAPL", "MSFT"]
        All inputs will be converted to uppercase.

    Notes
    -----
    The `symbols` input will be processed to ensure all symbols are uppercase
    and properly formatted, regardless of the input format.
    """

    symbols: str | list[str] | set[str] = pa.Field(
        default="AAPL",
        title="Symbol",
        description=QUERY_DESCRIPTIONS.get("symbol", ""),
    )

    @field_validator("symbols", mode="before", check_fields=False)
    @classmethod
    def upper_symbol(cls, v: str | list[str] | set[str]) -> str | list[str]:
        """
        Convert the stock symbol to uppercase.

        Parameters
        ----------
        v : Union[str, List[str], Set[str]]
            The stock symbol or collection of symbols to be converted.

        Returns
        -------
        Union[str, List[str]]
            The uppercase stock symbol or a comma-separated string of uppercase
            symbols.
        """
        # Handle empty inputs
        if not v:
            return []
        # If v is a string, split it by commas into a list. Otherwise, ensure it's a list.
        v = v.split(",") if isinstance(v, str) else v

        # Trim whitespace and check if all elements in the list are strings
        if not all(isinstance(item.strip(), str) for item in v):
            msg = "Every element in `symbol` list must be a `str`"
            raise ValueError(msg)

        # Convert all elements to uppercase, trim whitespace, and join them with a comma
        return [symbol.strip().upper() for symbol in v]
humbldata.core.standard_models.portfolio.analytics.user_table.UserTableQueryParams.upper_symbol classmethod ¤
upper_symbol(v: str | list[str] | set[str]) -> str | list[str]

Convert the stock symbol to uppercase.

Parameters:

Name Type Description Default
v Union[str, List[str], Set[str]]

The stock symbol or collection of symbols to be converted.

required

Returns:

Type Description
Union[str, List[str]]

The uppercase stock symbol or a comma-separated string of uppercase symbols.

Source code in src/humbldata/core/standard_models/portfolio/analytics/user_table.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
@field_validator("symbols", mode="before", check_fields=False)
@classmethod
def upper_symbol(cls, v: str | list[str] | set[str]) -> str | list[str]:
    """
    Convert the stock symbol to uppercase.

    Parameters
    ----------
    v : Union[str, List[str], Set[str]]
        The stock symbol or collection of symbols to be converted.

    Returns
    -------
    Union[str, List[str]]
        The uppercase stock symbol or a comma-separated string of uppercase
        symbols.
    """
    # Handle empty inputs
    if not v:
        return []
    # If v is a string, split it by commas into a list. Otherwise, ensure it's a list.
    v = v.split(",") if isinstance(v, str) else v

    # Trim whitespace and check if all elements in the list are strings
    if not all(isinstance(item.strip(), str) for item in v):
        msg = "Every element in `symbol` list must be a `str`"
        raise ValueError(msg)

    # Convert all elements to uppercase, trim whitespace, and join them with a comma
    return [symbol.strip().upper() for symbol in v]
humbldata.core.standard_models.portfolio.analytics.user_table.UserTableData ¤

Bases: Data

Data model for the user_table command, a Pandera.Polars Model.

This Data model is used to validate data in the .transform_data() method of the UserTableFetcher class.

Attributes:

Name Type Description
symbol Utf8

The stock symbol.

last_price Float64

The last known price of the stock.

buy_price Float64

The recommended buy price for the stock.

sell_price Float64

The recommended sell price for the stock.

ud_pct Utf8

The upside/downside percentage.

ud_ratio Float64

The upside/downside ratio.

asset_class Utf8

The asset class of the stock.

sector Utf8

The sector of the stock.

humbl_suggestion Utf8 | None

The suggestion provided by HUMBL.

Methods:

Name Description
None
Source code in src/humbldata/core/standard_models/portfolio/analytics/user_table.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
class UserTableData(Data):
    """
    Data model for the user_table command, a Pandera.Polars Model.

    This Data model is used to validate data in the `.transform_data()` method of the `UserTableFetcher` class.

    Attributes
    ----------
    symbol : pl.Utf8
        The stock symbol.
    last_price : pl.Float64
        The last known price of the stock.
    buy_price : pl.Float64
        The recommended buy price for the stock.
    sell_price : pl.Float64
        The recommended sell price for the stock.
    ud_pct : pl.Utf8
        The upside/downside percentage.
    ud_ratio : pl.Float64
        The upside/downside ratio.
    asset_class : pl.Utf8
        The asset class of the stock.
    sector : pl.Utf8
        The sector of the stock.
    humbl_suggestion : pl.Utf8 | None
        The suggestion provided by HUMBL.

    Methods
    -------
    None

    """

    symbol: pl.Utf8 = pa.Field(
        default=None,
        title="Symbol",
        description=DATA_DESCRIPTIONS.get("symbol", ""),
        alias="(symbols|symbol)",
        regex=True,
    )
    last_price: pl.Float64 = pa.Field(
        default=None,
        title="Last Price",
        description=DATA_DESCRIPTIONS.get("last_price", ""),
    )
    buy_price: pl.Float64 = pa.Field(
        default=None,
        title="Buy Price",
        description=DATA_DESCRIPTIONS.get("buy_price", ""),
    )
    sell_price: pl.Float64 = pa.Field(
        default=None,
        title="Sell Price",
        description=DATA_DESCRIPTIONS.get("sell_price", ""),
    )
    ud_pct: pl.Utf8 = pa.Field(
        default=None,
        title="Upside/Downside Percentage",
        description=DATA_DESCRIPTIONS.get("ud_pct", ""),
    )
    ud_ratio: pl.Float64 = pa.Field(
        default=None,
        title="Upside/Downside Ratio",
        description=DATA_DESCRIPTIONS.get("ud_ratio", ""),
    )
    asset_class: pl.Utf8 = pa.Field(
        default=None,
        title="Asset Class",
        description=DATA_DESCRIPTIONS.get("asset_class", ""),
    )
    sector: pl.Utf8 = pa.Field(
        default=None,
        title="Sector",
        description=DATA_DESCRIPTIONS.get("sector", ""),
        nullable=True,
    )
    humbl_suggestion: pl.Utf8 | None = pa.Field(
        default=None,
        title="humblSuggestion",
        description=QUERY_DESCRIPTIONS.get("humbl_suggestion", ""),
    )
humbldata.core.standard_models.portfolio.analytics.user_table.UserTableFetcher ¤

Fetcher for the UserTable command.

Parameters:

Name Type Description Default
context_params PortfolioQueryParams

The context parameters for the Portfolio query.

required
command_params UserTableQueryParams

The command-specific parameters for the UserTable query.

required

Attributes:

Name Type Description
context_params PortfolioQueryParams

Stores the context parameters passed during initialization.

command_params UserTableQueryParams

Stores the command-specific parameters passed during initialization.

data DataFrame

The raw data extracted from the data provider, before transformation.

Methods:

Name Description
transform_query

Transform the command-specific parameters into a query.

extract_data

Extracts the data from the provider and returns it as a Polars DataFrame.

transform_data

Transforms the command-specific data according to the UserTable logic.

fetch_data

Execute TET Pattern.

Returns:

Type Description
HumblObject

results : UserTableData Serializable results. provider : Literal['fmp', 'intrinio', 'polygon', 'tiingo', 'yfinance'] Provider name. warnings : Optional[List[Warning_]] List of warnings. chart : Optional[Chart] Chart object. context_params : PortfolioQueryParams Context-specific parameters. command_params : UserTableQueryParams Command-specific parameters.

Source code in src/humbldata/core/standard_models/portfolio/analytics/user_table.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
class UserTableFetcher:
    """
    Fetcher for the UserTable command.

    Parameters
    ----------
    context_params : PortfolioQueryParams
        The context parameters for the Portfolio query.
    command_params : UserTableQueryParams
        The command-specific parameters for the UserTable query.

    Attributes
    ----------
    context_params : PortfolioQueryParams
        Stores the context parameters passed during initialization.
    command_params : UserTableQueryParams
        Stores the command-specific parameters passed during initialization.
    data : pl.DataFrame
        The raw data extracted from the data provider, before transformation.

    Methods
    -------
    transform_query()
        Transform the command-specific parameters into a query.
    extract_data()
        Extracts the data from the provider and returns it as a Polars DataFrame.
    transform_data()
        Transforms the command-specific data according to the UserTable logic.
    fetch_data()
        Execute TET Pattern.

    Returns
    -------
    HumblObject
        results : UserTableData
            Serializable results.
        provider : Literal['fmp', 'intrinio', 'polygon', 'tiingo', 'yfinance']
            Provider name.
        warnings : Optional[List[Warning_]]
            List of warnings.
        chart : Optional[Chart]
            Chart object.
        context_params : PortfolioQueryParams
            Context-specific parameters.
        command_params : UserTableQueryParams
            Command-specific parameters.
    """

    def __init__(
        self,
        context_params: PortfolioQueryParams,
        command_params: UserTableQueryParams,
    ):
        """
        Initialize the UserTableFetcher with context and command parameters.

        Parameters
        ----------
        context_params : PortfolioQueryParams
            The context parameters for the Portfolio query.
        command_params : UserTableQueryParams
            The command-specific parameters for the UserTable query.
        """
        self.context_params = context_params
        self.command_params = command_params

    def transform_query(self):
        """
        Transform the command-specific parameters into a query.

        If command_params is not provided, it initializes a default UserTableQueryParams object.
        """
        if not self.command_params:
            self.command_params = None
            # Set Default Arguments
            self.command_params: UserTableQueryParams = UserTableQueryParams()
        else:
            self.command_params: UserTableQueryParams = UserTableQueryParams(
                **self.command_params
            )

    async def extract_data(self):
        """
        Extract the data from the provider and returns it as a Polars DataFrame.

        Returns
        -------
        pl.DataFrame
            The extracted data as a Polars DataFrame.

        """
        self.etf_data = await aget_etf_category(self.context_params.symbols)

        # Dates are automatically selected based on membership
        self.toolbox = Toolbox(
            symbols=self.context_params.symbols,
            membership=self.context_params.membership,
            interval="1d",
        )
        self.mandelbrot = self.toolbox.technical.mandelbrot_channel().to_polars(
            collect=False
        )
        return self

    async def transform_data(self):
        """
        Transform the command-specific data according to the user_table logic.

        Returns
        -------
        pl.DataFrame
            The transformed data as a Polars DataFrame
        """
        # Implement data transformation logic here
        transformed_data: pl.LazyFrame = await user_table_engine(
            symbols=self.context_params.symbols,
            etf_data=self.etf_data,
            mandelbrot_data=self.mandelbrot,
            toolbox=self.toolbox,
        )
        self.transformed_data = UserTableData(transformed_data.collect()).lazy()
        self.transformed_data = self.transformed_data.with_columns(
            pl.col(pl.Float64).round(2)
        )
        return self

    @log_start_end(logger=logger)
    async def fetch_data(self):
        """
        Execute TET Pattern.

        This method executes the query transformation, data fetching and
        transformation process by first calling `transform_query` to prepare the query parameters, then
        extracting the raw data using `extract_data` method, and finally
        transforming the raw data using `transform_data` method.

        Returns
        -------
        HumblObject
            The HumblObject containing the transformed data and metadata.
        """
        logger.debug("Running .transform_query()")
        self.transform_query()
        logger.debug("Running .extract_data()")
        await self.extract_data()
        logger.debug("Running .transform_data()")
        await self.transform_data()

        return HumblObject(
            results=self.transformed_data,
            provider=self.context_params.provider,
            warnings=None,
            chart=None,
            context_params=self.context_params,
            command_params=self.command_params,
        )
humbldata.core.standard_models.portfolio.analytics.user_table.UserTableFetcher.__init__ ¤
__init__(context_params: PortfolioQueryParams, command_params: UserTableQueryParams)

Initialize the UserTableFetcher with context and command parameters.

Parameters:

Name Type Description Default
context_params PortfolioQueryParams

The context parameters for the Portfolio query.

required
command_params UserTableQueryParams

The command-specific parameters for the UserTable query.

required
Source code in src/humbldata/core/standard_models/portfolio/analytics/user_table.py
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
def __init__(
    self,
    context_params: PortfolioQueryParams,
    command_params: UserTableQueryParams,
):
    """
    Initialize the UserTableFetcher with context and command parameters.

    Parameters
    ----------
    context_params : PortfolioQueryParams
        The context parameters for the Portfolio query.
    command_params : UserTableQueryParams
        The command-specific parameters for the UserTable query.
    """
    self.context_params = context_params
    self.command_params = command_params
humbldata.core.standard_models.portfolio.analytics.user_table.UserTableFetcher.transform_query ¤
transform_query()

Transform the command-specific parameters into a query.

If command_params is not provided, it initializes a default UserTableQueryParams object.

Source code in src/humbldata/core/standard_models/portfolio/analytics/user_table.py
248
249
250
251
252
253
254
255
256
257
258
259
260
261
def transform_query(self):
    """
    Transform the command-specific parameters into a query.

    If command_params is not provided, it initializes a default UserTableQueryParams object.
    """
    if not self.command_params:
        self.command_params = None
        # Set Default Arguments
        self.command_params: UserTableQueryParams = UserTableQueryParams()
    else:
        self.command_params: UserTableQueryParams = UserTableQueryParams(
            **self.command_params
        )
humbldata.core.standard_models.portfolio.analytics.user_table.UserTableFetcher.extract_data async ¤
extract_data()

Extract the data from the provider and returns it as a Polars DataFrame.

Returns:

Type Description
DataFrame

The extracted data as a Polars DataFrame.

Source code in src/humbldata/core/standard_models/portfolio/analytics/user_table.py
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
async def extract_data(self):
    """
    Extract the data from the provider and returns it as a Polars DataFrame.

    Returns
    -------
    pl.DataFrame
        The extracted data as a Polars DataFrame.

    """
    self.etf_data = await aget_etf_category(self.context_params.symbols)

    # Dates are automatically selected based on membership
    self.toolbox = Toolbox(
        symbols=self.context_params.symbols,
        membership=self.context_params.membership,
        interval="1d",
    )
    self.mandelbrot = self.toolbox.technical.mandelbrot_channel().to_polars(
        collect=False
    )
    return self
humbldata.core.standard_models.portfolio.analytics.user_table.UserTableFetcher.transform_data async ¤
transform_data()

Transform the command-specific data according to the user_table logic.

Returns:

Type Description
DataFrame

The transformed data as a Polars DataFrame

Source code in src/humbldata/core/standard_models/portfolio/analytics/user_table.py
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
async def transform_data(self):
    """
    Transform the command-specific data according to the user_table logic.

    Returns
    -------
    pl.DataFrame
        The transformed data as a Polars DataFrame
    """
    # Implement data transformation logic here
    transformed_data: pl.LazyFrame = await user_table_engine(
        symbols=self.context_params.symbols,
        etf_data=self.etf_data,
        mandelbrot_data=self.mandelbrot,
        toolbox=self.toolbox,
    )
    self.transformed_data = UserTableData(transformed_data.collect()).lazy()
    self.transformed_data = self.transformed_data.with_columns(
        pl.col(pl.Float64).round(2)
    )
    return self
humbldata.core.standard_models.portfolio.analytics.user_table.UserTableFetcher.fetch_data async ¤
fetch_data()

Execute TET Pattern.

This method executes the query transformation, data fetching and transformation process by first calling transform_query to prepare the query parameters, then extracting the raw data using extract_data method, and finally transforming the raw data using transform_data method.

Returns:

Type Description
HumblObject

The HumblObject containing the transformed data and metadata.

Source code in src/humbldata/core/standard_models/portfolio/analytics/user_table.py
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
@log_start_end(logger=logger)
async def fetch_data(self):
    """
    Execute TET Pattern.

    This method executes the query transformation, data fetching and
    transformation process by first calling `transform_query` to prepare the query parameters, then
    extracting the raw data using `extract_data` method, and finally
    transforming the raw data using `transform_data` method.

    Returns
    -------
    HumblObject
        The HumblObject containing the transformed data and metadata.
    """
    logger.debug("Running .transform_query()")
    self.transform_query()
    logger.debug("Running .extract_data()")
    await self.extract_data()
    logger.debug("Running .transform_data()")
    await self.transform_data()

    return HumblObject(
        results=self.transformed_data,
        provider=self.context_params.provider,
        warnings=None,
        chart=None,
        context_params=self.context_params,
        command_params=self.command_params,
    )
humbldata.core.standard_models.portfolio.PortfolioQueryParams ¤

Bases: QueryParams

Query parameters for the PortfolioController.

This class defines the query parameters used by the PortfolioController.

Parameters:

Name Type Description Default
symbol str or list of str

The stock symbol(s) to query. Default is "AAPL".

required
provider OBB_EQUITY_PRICE_HISTORICAL_PROVIDERS

The data provider for historical price data. Default is "yahoo".

required
membership Literal['anonymous', 'peon', 'premium', 'power', 'permanent', 'admin']

The membership level of the user accessing the data. Default is "anonymous".

required

Attributes:

Name Type Description
symbol str or list of str

The stock symbol(s) to query.

provider OBB_EQUITY_PRICE_HISTORICAL_PROVIDERS

The data provider for historical price data.

membership Literal['anonymous', 'peon', 'premium', 'power', 'permanent', 'admin']

The membership level of the user.

Source code in src/humbldata/core/standard_models/portfolio/__init__.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
class PortfolioQueryParams(QueryParams):
    """
    Query parameters for the PortfolioController.

    This class defines the query parameters used by the PortfolioController.

    Parameters
    ----------
    symbol : str or list of str
        The stock symbol(s) to query. Default is "AAPL".
    provider : OBB_EQUITY_PRICE_HISTORICAL_PROVIDERS
        The data provider for historical price data. Default is "yahoo".
    membership : Literal["anonymous", "peon", "premium", "power", "permanent", "admin"]
        The membership level of the user accessing the data. Default is "anonymous".

    Attributes
    ----------
    symbol : str or list of str
        The stock symbol(s) to query.
    provider : OBB_EQUITY_PRICE_HISTORICAL_PROVIDERS
        The data provider for historical price data.
    membership : Literal["anonymous", "peon", "premium", "power", "permanent", "admin"]
        The membership level of the user.
    """

    symbols: str | list[str] = Field(
        default=["AAPL"],
        title="Symbols",
        description=QUERY_DESCRIPTIONS.get("symbols", ""),
    )
    provider: OBB_EQUITY_PRICE_HISTORICAL_PROVIDERS = Field(
        default="yfinance",
        title="Provider",
        description=QUERY_DESCRIPTIONS.get("provider", ""),
    )
    membership: Literal[
        "anonymous", "peon", "premium", "power", "permanent", "admin"
    ] = Field(
        default="anonymous",
        title="Membership",
        description=QUERY_DESCRIPTIONS.get("membership", ""),
    )

    @field_validator("symbols", mode="before", check_fields=False)
    @classmethod
    def upper_symbol(cls, v: str | list[str] | set[str]) -> list[str]:
        """
        Convert the stock symbols to uppercase and remove empty strings.

        Parameters
        ----------
        v : Union[str, List[str], Set[str]]
            The stock symbol or collection of symbols to be converted.

        Returns
        -------
        List[str]
            A list of uppercase stock symbols with empty strings removed.
        """
        # Handle empty inputs
        if not v:
            return []

        # If v is a string, split it by commas into a list. Otherwise, ensure it's a list.
        v = v.split(",") if isinstance(v, str) else list(v)

        # Convert all elements to uppercase, trim whitespace, and remove empty strings
        valid_symbols = [
            symbol.strip().upper() for symbol in v if symbol.strip()
        ]

        if not valid_symbols:
            msg = "At least one valid symbol (str) must be provided"
            raise ValueError(msg)

        return valid_symbols
humbldata.core.standard_models.portfolio.PortfolioQueryParams.upper_symbol classmethod ¤
upper_symbol(v: str | list[str] | set[str]) -> list[str]

Convert the stock symbols to uppercase and remove empty strings.

Parameters:

Name Type Description Default
v Union[str, List[str], Set[str]]

The stock symbol or collection of symbols to be converted.

required

Returns:

Type Description
List[str]

A list of uppercase stock symbols with empty strings removed.

Source code in src/humbldata/core/standard_models/portfolio/__init__.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
@field_validator("symbols", mode="before", check_fields=False)
@classmethod
def upper_symbol(cls, v: str | list[str] | set[str]) -> list[str]:
    """
    Convert the stock symbols to uppercase and remove empty strings.

    Parameters
    ----------
    v : Union[str, List[str], Set[str]]
        The stock symbol or collection of symbols to be converted.

    Returns
    -------
    List[str]
        A list of uppercase stock symbols with empty strings removed.
    """
    # Handle empty inputs
    if not v:
        return []

    # If v is a string, split it by commas into a list. Otherwise, ensure it's a list.
    v = v.split(",") if isinstance(v, str) else list(v)

    # Convert all elements to uppercase, trim whitespace, and remove empty strings
    valid_symbols = [
        symbol.strip().upper() for symbol in v if symbol.strip()
    ]

    if not valid_symbols:
        msg = "At least one valid symbol (str) must be provided"
        raise ValueError(msg)

    return valid_symbols
humbldata.core.standard_models.portfolio.PortfolioData ¤

Bases: Data

The Data for the PortfolioController.

Source code in src/humbldata/core/standard_models/portfolio/__init__.py
 96
 97
 98
 99
100
101
102
class PortfolioData(Data):
    """
    The Data for the PortfolioController.
    """

    # Add your data model fields here
    pass

humbldata.core.standard_models.toolbox ¤

Context: Toolbox || Category: Standardized Framework Model.

This module defines the QueryParams and Data classes for the Toolbox context. THis is where all of the context(s) of your project go. The STANDARD MODELS for categories and subsequent commands are nested here.

Classes:

Name Description
ToolboxQueryParams

Query parameters for the ToolboxController.

ToolboxData

A Pydantic model that defines the data returned by the ToolboxController.

Attributes:

Name Type Description
symbol str

The symbol/ticker of the stock.

interval Optional[str]

The interval of the data. Defaults to '1d'.

start_date str

The start date of the data.

end_date str

The end date of the data.

humbldata.core.standard_models.toolbox.technical ¤

Context: Toolbox || Category: Technical.

humbldata.core.standard_models.toolbox.technical.realized_volatility ¤

Volatility Standard Model.

Context: Toolbox || Category: Technical || Command: Volatility.

This module is used to define the QueryParams and Data model for the Volatility command.

humbldata.core.standard_models.toolbox.technical.realized_volatility.RealizedVolatilityQueryParams ¤

Bases: QueryParams

QueryParams for the Realized Volatility command.

Source code in src/humbldata/core/standard_models/toolbox/technical/realized_volatility.py
21
22
23
24
class RealizedVolatilityQueryParams(QueryParams):
    """
    QueryParams for the Realized Volatility command.
    """
humbldata.core.standard_models.toolbox.technical.realized_volatility.RealizedVolatilityData ¤

Bases: Data

Data model for the Realized Volatility command.

Source code in src/humbldata/core/standard_models/toolbox/technical/realized_volatility.py
27
28
29
30
class RealizedVolatilityData(Data):
    """
    Data model for the Realized Volatility command.
    """
humbldata.core.standard_models.toolbox.technical.realized_volatility.RealizedVolatilityFetcher ¤

Bases: RealizedVolatilityQueryParams

Fetcher for the Realized Volatility command.

Source code in src/humbldata/core/standard_models/toolbox/technical/realized_volatility.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
class RealizedVolatilityFetcher(RealizedVolatilityQueryParams):
    """
    Fetcher for the Realized Volatility command.
    """

    data_list: ClassVar[list[RealizedVolatilityData]] = []

    def __init__(
        self,
        context_params: ToolboxQueryParams,
        command_params: RealizedVolatilityQueryParams,
    ):
        self._context_params = context_params
        self._command_params = command_params

    def transform_query(self):
        """Transform the params to the command-specific query."""

    def extract_data(self):
        """Extract the data from the provider."""
        # Assuming 'obb' is a predefined object in your context
        df = (
            obb.equity.price.historical(
                symbol=self.context_params.symbol,
                start_date=str(self.context_params.start_date),
                end_date=str(self.context_params.end_date),
                provider=self.command_params.provider,
                verbose=not self.command_params.kwargs.get("silent", False),
                **self.command_params.kwargs,
            )
            .to_df()
            .reset_index()
        )
        return df

    def transform_data(self):
        """Transform the command-specific data."""
        # Placeholder for data transformation logic

    def fetch_data(self):
        """Execute the TET pattern."""
        # Call the methods in the desired order
        query = self.transform_query()
        raw_data = (
            self.extract_data()
        )  # This should use 'query' to fetch the data
        transformed_data = (
            self.transform_data()
        )  # This should transform 'raw_data'

        # Validate with VolatilityData, unpack dict into pydantic row by row
        return transformed_data
humbldata.core.standard_models.toolbox.technical.realized_volatility.RealizedVolatilityFetcher.transform_query ¤
transform_query()

Transform the params to the command-specific query.

Source code in src/humbldata/core/standard_models/toolbox/technical/realized_volatility.py
48
49
def transform_query(self):
    """Transform the params to the command-specific query."""
humbldata.core.standard_models.toolbox.technical.realized_volatility.RealizedVolatilityFetcher.extract_data ¤
extract_data()

Extract the data from the provider.

Source code in src/humbldata/core/standard_models/toolbox/technical/realized_volatility.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def extract_data(self):
    """Extract the data from the provider."""
    # Assuming 'obb' is a predefined object in your context
    df = (
        obb.equity.price.historical(
            symbol=self.context_params.symbol,
            start_date=str(self.context_params.start_date),
            end_date=str(self.context_params.end_date),
            provider=self.command_params.provider,
            verbose=not self.command_params.kwargs.get("silent", False),
            **self.command_params.kwargs,
        )
        .to_df()
        .reset_index()
    )
    return df
humbldata.core.standard_models.toolbox.technical.realized_volatility.RealizedVolatilityFetcher.transform_data ¤
transform_data()

Transform the command-specific data.

Source code in src/humbldata/core/standard_models/toolbox/technical/realized_volatility.py
68
69
def transform_data(self):
    """Transform the command-specific data."""
humbldata.core.standard_models.toolbox.technical.realized_volatility.RealizedVolatilityFetcher.fetch_data ¤
fetch_data()

Execute the TET pattern.

Source code in src/humbldata/core/standard_models/toolbox/technical/realized_volatility.py
72
73
74
75
76
77
78
79
80
81
82
83
84
def fetch_data(self):
    """Execute the TET pattern."""
    # Call the methods in the desired order
    query = self.transform_query()
    raw_data = (
        self.extract_data()
    )  # This should use 'query' to fetch the data
    transformed_data = (
        self.transform_data()
    )  # This should transform 'raw_data'

    # Validate with VolatilityData, unpack dict into pydantic row by row
    return transformed_data
humbldata.core.standard_models.toolbox.technical.mandelbrot_channel ¤

Mandelbrot Channel Standard Model.

Context: Toolbox || Category: Technical || Command: Mandelbrot Channel.

This module is used to define the QueryParams and Data model for the Mandelbrot Channel command.

humbldata.core.standard_models.toolbox.technical.mandelbrot_channel.MandelbrotChannelQueryParams ¤

Bases: QueryParams

QueryParams model for the Mandelbrot Channel command, a Pydantic v2 model.

Parameters:

Name Type Description Default
window str

The width of the window used for splitting the data into sections for detrending. Defaults to "1m".

required
rv_adjustment bool

Whether to adjust the calculation for realized volatility. If True, the data is filtered to only include observations in the same volatility bucket that the stock is currently in. Defaults to True.

required
rv_method str

The method to calculate the realized volatility. Only need to define when rv_adjustment is True. Defaults to "std".

required
rs_method Literal['RS', 'RS_min', 'RS_max', 'RS_mean']

The method to use for Range/STD calculation. This is either, min, max or mean of all RS ranges per window. If not defined, just used the most recent RS window. Defaults to "RS".

required
rv_grouped_mean bool

Whether to calculate the mean value of realized volatility over multiple window lengths. Defaults to False.

required
live_price bool

Whether to calculate the ranges using the current live price, or the most recent 'close' observation. Defaults to False.

required
historical bool

Whether to calculate the Historical Mandelbrot Channel (over-time), and return a time-series of channels from the start to the end date. If False, the Mandelbrot Channel calculation is done aggregating all of the data into one observation. If True, then it will enable daily observations over-time. Defaults to False.

required
chart bool

Whether to return a chart object. Defaults to False.

required
Source code in src/humbldata/core/standard_models/toolbox/technical/mandelbrot_channel.py
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
class MandelbrotChannelQueryParams(QueryParams):
    """
    QueryParams model for the Mandelbrot Channel command, a Pydantic v2 model.

    Parameters
    ----------
    window : str
        The width of the window used for splitting the data into sections for
        detrending. Defaults to "1m".
    rv_adjustment : bool
        Whether to adjust the calculation for realized volatility. If True, the
        data is filtered
        to only include observations in the same volatility bucket that the
        stock is currently in. Defaults to True.
    rv_method : str
        The method to calculate the realized volatility. Only need to define
        when rv_adjustment is True. Defaults to "std".
    rs_method : Literal["RS", "RS_min", "RS_max", "RS_mean"]
        The method to use for Range/STD calculation. This is either, min, max
        or mean of all RS ranges
        per window. If not defined, just used the most recent RS window.
        Defaults to "RS".
    rv_grouped_mean : bool
        Whether to calculate the mean value of realized volatility over
        multiple window lengths. Defaults to False.
    live_price : bool
        Whether to calculate the ranges using the current live price, or the
        most recent 'close' observation. Defaults to False.
    historical : bool
        Whether to calculate the Historical Mandelbrot Channel (over-time), and
        return a time-series of channels from the start to the end date. If
        False, the Mandelbrot Channel calculation is done aggregating all of the
        data into one observation. If True, then it will enable daily
        observations over-time. Defaults to False.
    chart : bool
        Whether to return a chart object. Defaults to False.
    """

    window: str = Field(
        default="1mo",
        title="Window",
        description=MANDELBROT_QUERY_DESCRIPTIONS.get("window", ""),
    )
    rv_adjustment: bool = Field(
        default=True,
        title="Realized Volatility Adjustment",
        description=MANDELBROT_QUERY_DESCRIPTIONS.get("rv_adjustment", ""),
    )
    rv_method: Literal[
        "std",
        "parkinson",
        "garman_klass",
        "gk",
        "hodges_tompkins",
        "ht",
        "rogers_satchell",
        "rs",
        "yang_zhang",
        "yz",
        "squared_returns",
        "sq",
    ] = Field(
        default="std",
        title="Realized Volatility Method",
        description=MANDELBROT_QUERY_DESCRIPTIONS.get("rv_method", ""),
    )
    rs_method: Literal["RS", "RS_min", "RS_max", "RS_mean"] = Field(
        default="RS",
        title="R/S Method",
        description=MANDELBROT_QUERY_DESCRIPTIONS.get("rs_method", ""),
    )
    rv_grouped_mean: bool = Field(
        default=False,
        title="Realized Volatility Grouped Mean",
        description=MANDELBROT_QUERY_DESCRIPTIONS.get("rv_grouped_mean", ""),
    )
    live_price: bool = Field(
        default=False,
        title="Live Price",
        description=MANDELBROT_QUERY_DESCRIPTIONS.get("live_price", ""),
    )
    historical: bool = Field(
        default=False,
        title="Historical Mandelbrot Channel",
        description=MANDELBROT_QUERY_DESCRIPTIONS.get("historical", ""),
    )
    chart: bool = Field(
        default=False,
        title="Results Chart",
        description=MANDELBROT_QUERY_DESCRIPTIONS.get("chart", ""),
    )
    template: Literal[
        "humbl_dark",
        "humbl_light",
        "ggplot2",
        "seaborn",
        "simple_white",
        "plotly",
        "plotly_white",
        "plotly_dark",
        "presentation",
        "xgridoff",
        "ygridoff",
        "gridon",
        "none",
    ] = Field(
        default="humbl_dark",
        title="Plotly Template",
        description=MANDELBROT_QUERY_DESCRIPTIONS.get("template", ""),
    )

    @field_validator("window", mode="after", check_fields=False)
    @classmethod
    def window_format(cls, v: str) -> str:
        """
        Format the window string into a standardized format.

        Parameters
        ----------
        v : str
            The window size as a string.

        Returns
        -------
        str
            The window string in a standardized format.

        Raises
        ------
        ValueError
            If the input is not a string.
        """
        if isinstance(v, str):
            return _window_format(v, _return_timedelta=False)
        else:
            msg = "Window must be a string."
            raise ValueError(msg)
humbldata.core.standard_models.toolbox.technical.mandelbrot_channel.MandelbrotChannelQueryParams.window_format classmethod ¤
window_format(v: str) -> str

Format the window string into a standardized format.

Parameters:

Name Type Description Default
v str

The window size as a string.

required

Returns:

Type Description
str

The window string in a standardized format.

Raises:

Type Description
ValueError

If the input is not a string.

Source code in src/humbldata/core/standard_models/toolbox/technical/mandelbrot_channel.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
@field_validator("window", mode="after", check_fields=False)
@classmethod
def window_format(cls, v: str) -> str:
    """
    Format the window string into a standardized format.

    Parameters
    ----------
    v : str
        The window size as a string.

    Returns
    -------
    str
        The window string in a standardized format.

    Raises
    ------
    ValueError
        If the input is not a string.
    """
    if isinstance(v, str):
        return _window_format(v, _return_timedelta=False)
    else:
        msg = "Window must be a string."
        raise ValueError(msg)
humbldata.core.standard_models.toolbox.technical.mandelbrot_channel.MandelbrotChannelData ¤

Bases: Data

Data model for the Mandelbrot Channel command, a Pandera.Polars Model.

Parameters:

Name Type Description Default
date Union[date, datetime]

The date of the data point. Defaults to None.

required
symbol str

The stock symbol. Defaults to None.

required
bottom_price float

The bottom price in the Mandelbrot Channel. Defaults to None.

required
recent_price float

The most recent price within the Mandelbrot Channel. Defaults to None.

required
top_price float

The top price in the Mandelbrot Channel. Defaults to None.

required
Source code in src/humbldata/core/standard_models/toolbox/technical/mandelbrot_channel.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
class MandelbrotChannelData(Data):
    """
    Data model for the Mandelbrot Channel command, a Pandera.Polars Model.

    Parameters
    ----------
    date : Union[dt.date, dt.datetime], optional
        The date of the data point. Defaults to None.
    symbol : str, optional
        The stock symbol. Defaults to None.
    bottom_price : float, optional
        The bottom price in the Mandelbrot Channel. Defaults to None.
    recent_price : float, optional
        The most recent price within the Mandelbrot Channel. Defaults to None.
    top_price : float, optional
        The top price in the Mandelbrot Channel. Defaults to None.
    """

    date: pl.Date = pa.Field(
        default=None,
        title="Date",
        description="The date of the data point.",
    )
    symbol: str = pa.Field(
        default=None,
        title="Symbol",
        description="The stock symbol.",
    )
    bottom_price: float = pa.Field(
        default=None,
        title="Bottom Price",
        description="The bottom price in the Mandelbrot Channel.",
    )
    recent_price: float = pa.Field(
        default=None,
        title="Recent Price",
        description="The most recent price within the Mandelbrot Channel.",
        alias="(close_price|recent_price|last_price)",
        regex=True,
    )
    top_price: float = pa.Field(
        default=None,
        title="Top Price",
        description="The top price in the Mandelbrot Channel.",
    )
humbldata.core.standard_models.toolbox.technical.mandelbrot_channel.MandelbrotChannelFetcher ¤

Fetcher for the Mandelbrot Channel command.

Parameters:

Name Type Description Default
context_params ToolboxQueryParams

The context parameters for the toolbox query.

required
command_params MandelbrotChannelQueryParams

The command-specific parameters for the Mandelbrot Channel query.

required

Attributes:

Name Type Description
context_params ToolboxQueryParams

Stores the context parameters passed during initialization.

command_params MandelbrotChannelQueryParams

Stores the command-specific parameters passed during initialization.

equity_historical_data DataFrame

The raw data extracted from the data provider, before transformation.

Methods:

Name Description
transform_query

Transform the command-specific parameters into a query.

extract_data

Extracts the data from the provider and returns it as a Polars DataFrame.

transform_data

Transforms the command-specific data according to the Mandelbrot Channel logic.

fetch_data

Execute TET Pattern.

Returns:

Type Description
HumblObject

results : MandelbrotChannelData Serializable results. provider : Literal['fmp', 'intrinio', 'polygon', 'tiingo', 'yfinance'] Provider name. warnings : Optional[List[Warning_]] List of warnings. chart : Optional[Chart] Chart object. context_params : ToolboxQueryParams Context-specific parameters. command_params : MandelbrotChannelQueryParams Command-specific parameters.

Source code in src/humbldata/core/standard_models/toolbox/technical/mandelbrot_channel.py
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
class MandelbrotChannelFetcher:
    """
    Fetcher for the Mandelbrot Channel command.

    Parameters
    ----------
    context_params : ToolboxQueryParams
        The context parameters for the toolbox query.
    command_params : MandelbrotChannelQueryParams
        The command-specific parameters for the Mandelbrot Channel query.

    Attributes
    ----------
    context_params : ToolboxQueryParams
        Stores the context parameters passed during initialization.
    command_params : MandelbrotChannelQueryParams
        Stores the command-specific parameters passed during initialization.
    equity_historical_data : pl.DataFrame
        The raw data extracted from the data provider, before transformation.

    Methods
    -------
    transform_query()
        Transform the command-specific parameters into a query.
    extract_data()
        Extracts the data from the provider and returns it as a Polars DataFrame.
    transform_data()
        Transforms the command-specific data according to the Mandelbrot Channel logic.
    fetch_data()
        Execute TET Pattern.

    Returns
    -------
    HumblObject
        results : MandelbrotChannelData
            Serializable results.
        provider : Literal['fmp', 'intrinio', 'polygon', 'tiingo', 'yfinance']
            Provider name.
        warnings : Optional[List[Warning_]]
            List of warnings.
        chart : Optional[Chart]
            Chart object.
        context_params : ToolboxQueryParams
            Context-specific parameters.
        command_params : MandelbrotChannelQueryParams
            Command-specific parameters.

    """

    def __init__(
        self,
        context_params: ToolboxQueryParams,
        command_params: MandelbrotChannelQueryParams,
    ):
        """
        Initialize the MandelbrotChannelFetcher with context and command parameters.

        Parameters
        ----------
        context_params : ToolboxQueryParams
            The context parameters for the toolbox query.
        command_params : MandelbrotChannelQueryParams
            The command-specific parameters for the Mandelbrot Channel query.
        """
        self.context_params = context_params
        self.command_params = command_params

    def transform_query(self):
        """
        Transform the command-specific parameters into a query.

        If command_params is not provided, it initializes a default MandelbrotChannelQueryParams object.
        """
        if not self.command_params:
            self.command_params = None
            # Set Default Arguments
            self.command_params: MandelbrotChannelQueryParams = (
                MandelbrotChannelQueryParams()
            )
        else:
            self.command_params: MandelbrotChannelQueryParams = (
                MandelbrotChannelQueryParams(**self.command_params)
            )

    def extract_data(self):
        """
        Extract the data from the provider and returns it as a Polars DataFrame.

        Drops unnecessary columns like dividends and stock splits from the data.

        Returns
        -------
        pl.DataFrame
            The extracted data as a Polars DataFrame.
        """
        self.equity_historical_data: pl.LazyFrame = (
            obb.equity.price.historical(
                symbol=self.context_params.symbols,
                start_date=self.context_params.start_date,
                end_date=self.context_params.end_date,
                provider=self.context_params.provider,
                adjustment="splits_and_dividends",
                # add kwargs
            )
            .to_polars()
            .lazy()
        ).drop(["dividend", "split_ratio"])  # TODO: drop `capital_gains` col

        if len(self.context_params.symbols) == 1:
            self.equity_historical_data = (
                self.equity_historical_data.with_columns(
                    symbol=pl.lit(self.context_params.symbols[0])
                )
            )
        return self

    def transform_data(self):
        """
        Transform the command-specific data according to the Mandelbrot Channel logic.

        Returns
        -------
        pl.DataFrame
            The transformed data as a Polars DataFrame
        """
        if self.command_params.historical is False:
            transformed_data = calc_mandelbrot_channel(
                data=self.equity_historical_data,
                window=self.command_params.window,
                rv_adjustment=self.command_params.rv_adjustment,
                rv_method=self.command_params.rv_method,
                rv_grouped_mean=self.command_params.rv_grouped_mean,
                rs_method=self.command_params.rs_method,
                live_price=self.command_params.live_price,
            )
        else:
            transformed_data = calc_mandelbrot_channel_historical_concurrent(
                data=self.equity_historical_data,
                window=self.command_params.window,
                rv_adjustment=self.command_params.rv_adjustment,
                rv_method=self.command_params.rv_method,
                rv_grouped_mean=self.command_params.rv_grouped_mean,
                rs_method=self.command_params.rs_method,
                live_price=self.command_params.live_price,
                use_processes=False,
            )

        self.transformed_data = MandelbrotChannelData(
            transformed_data.collect().drop_nulls()  ## HOTFIX - need to trace where coming from w/ unequal data
        ).lazy()

        if self.command_params.chart:
            self.chart = generate_plots(
                self.transformed_data,
                self.equity_historical_data,
                template=self.command_params.template,
            )
        else:
            self.chart = None

        self.transformed_data = self.transformed_data.serialize(format="binary")
        return self

    @log_start_end(logger=logger)
    def fetch_data(self):
        """
        Execute TET Pattern.

        This method executes the query transformation, data fetching and
        transformation process by first calling `transform_query` to prepare the query parameters, then
        extracting the raw data using `extract_data` method, and finally
        transforming the raw data using `transform_data` method.

        Returns
        -------
        pl.DataFrame
            The transformed data as a Polars DataFrame, ready for further analysis
            or visualization.
        """
        self.transform_query()
        self.extract_data()
        self.transform_data()

        if not hasattr(self.context_params, "warnings"):
            self.context_params.warnings = []

        return HumblObject(
            results=self.transformed_data,
            provider=self.context_params.provider,
            equity_data=self.equity_historical_data.serialize(),
            warnings=self.context_params.warnings,
            chart=self.chart,
            context_params=self.context_params,
            command_params=self.command_params,
        )
humbldata.core.standard_models.toolbox.technical.mandelbrot_channel.MandelbrotChannelFetcher.__init__ ¤
__init__(context_params: ToolboxQueryParams, command_params: MandelbrotChannelQueryParams)

Initialize the MandelbrotChannelFetcher with context and command parameters.

Parameters:

Name Type Description Default
context_params ToolboxQueryParams

The context parameters for the toolbox query.

required
command_params MandelbrotChannelQueryParams

The command-specific parameters for the Mandelbrot Channel query.

required
Source code in src/humbldata/core/standard_models/toolbox/technical/mandelbrot_channel.py
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
def __init__(
    self,
    context_params: ToolboxQueryParams,
    command_params: MandelbrotChannelQueryParams,
):
    """
    Initialize the MandelbrotChannelFetcher with context and command parameters.

    Parameters
    ----------
    context_params : ToolboxQueryParams
        The context parameters for the toolbox query.
    command_params : MandelbrotChannelQueryParams
        The command-specific parameters for the Mandelbrot Channel query.
    """
    self.context_params = context_params
    self.command_params = command_params
humbldata.core.standard_models.toolbox.technical.mandelbrot_channel.MandelbrotChannelFetcher.transform_query ¤
transform_query()

Transform the command-specific parameters into a query.

If command_params is not provided, it initializes a default MandelbrotChannelQueryParams object.

Source code in src/humbldata/core/standard_models/toolbox/technical/mandelbrot_channel.py
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
def transform_query(self):
    """
    Transform the command-specific parameters into a query.

    If command_params is not provided, it initializes a default MandelbrotChannelQueryParams object.
    """
    if not self.command_params:
        self.command_params = None
        # Set Default Arguments
        self.command_params: MandelbrotChannelQueryParams = (
            MandelbrotChannelQueryParams()
        )
    else:
        self.command_params: MandelbrotChannelQueryParams = (
            MandelbrotChannelQueryParams(**self.command_params)
        )
humbldata.core.standard_models.toolbox.technical.mandelbrot_channel.MandelbrotChannelFetcher.extract_data ¤
extract_data()

Extract the data from the provider and returns it as a Polars DataFrame.

Drops unnecessary columns like dividends and stock splits from the data.

Returns:

Type Description
DataFrame

The extracted data as a Polars DataFrame.

Source code in src/humbldata/core/standard_models/toolbox/technical/mandelbrot_channel.py
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
def extract_data(self):
    """
    Extract the data from the provider and returns it as a Polars DataFrame.

    Drops unnecessary columns like dividends and stock splits from the data.

    Returns
    -------
    pl.DataFrame
        The extracted data as a Polars DataFrame.
    """
    self.equity_historical_data: pl.LazyFrame = (
        obb.equity.price.historical(
            symbol=self.context_params.symbols,
            start_date=self.context_params.start_date,
            end_date=self.context_params.end_date,
            provider=self.context_params.provider,
            adjustment="splits_and_dividends",
            # add kwargs
        )
        .to_polars()
        .lazy()
    ).drop(["dividend", "split_ratio"])  # TODO: drop `capital_gains` col

    if len(self.context_params.symbols) == 1:
        self.equity_historical_data = (
            self.equity_historical_data.with_columns(
                symbol=pl.lit(self.context_params.symbols[0])
            )
        )
    return self
humbldata.core.standard_models.toolbox.technical.mandelbrot_channel.MandelbrotChannelFetcher.transform_data ¤
transform_data()

Transform the command-specific data according to the Mandelbrot Channel logic.

Returns:

Type Description
DataFrame

The transformed data as a Polars DataFrame

Source code in src/humbldata/core/standard_models/toolbox/technical/mandelbrot_channel.py
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
def transform_data(self):
    """
    Transform the command-specific data according to the Mandelbrot Channel logic.

    Returns
    -------
    pl.DataFrame
        The transformed data as a Polars DataFrame
    """
    if self.command_params.historical is False:
        transformed_data = calc_mandelbrot_channel(
            data=self.equity_historical_data,
            window=self.command_params.window,
            rv_adjustment=self.command_params.rv_adjustment,
            rv_method=self.command_params.rv_method,
            rv_grouped_mean=self.command_params.rv_grouped_mean,
            rs_method=self.command_params.rs_method,
            live_price=self.command_params.live_price,
        )
    else:
        transformed_data = calc_mandelbrot_channel_historical_concurrent(
            data=self.equity_historical_data,
            window=self.command_params.window,
            rv_adjustment=self.command_params.rv_adjustment,
            rv_method=self.command_params.rv_method,
            rv_grouped_mean=self.command_params.rv_grouped_mean,
            rs_method=self.command_params.rs_method,
            live_price=self.command_params.live_price,
            use_processes=False,
        )

    self.transformed_data = MandelbrotChannelData(
        transformed_data.collect().drop_nulls()  ## HOTFIX - need to trace where coming from w/ unequal data
    ).lazy()

    if self.command_params.chart:
        self.chart = generate_plots(
            self.transformed_data,
            self.equity_historical_data,
            template=self.command_params.template,
        )
    else:
        self.chart = None

    self.transformed_data = self.transformed_data.serialize(format="binary")
    return self
humbldata.core.standard_models.toolbox.technical.mandelbrot_channel.MandelbrotChannelFetcher.fetch_data ¤
fetch_data()

Execute TET Pattern.

This method executes the query transformation, data fetching and transformation process by first calling transform_query to prepare the query parameters, then extracting the raw data using extract_data method, and finally transforming the raw data using transform_data method.

Returns:

Type Description
DataFrame

The transformed data as a Polars DataFrame, ready for further analysis or visualization.

Source code in src/humbldata/core/standard_models/toolbox/technical/mandelbrot_channel.py
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
@log_start_end(logger=logger)
def fetch_data(self):
    """
    Execute TET Pattern.

    This method executes the query transformation, data fetching and
    transformation process by first calling `transform_query` to prepare the query parameters, then
    extracting the raw data using `extract_data` method, and finally
    transforming the raw data using `transform_data` method.

    Returns
    -------
    pl.DataFrame
        The transformed data as a Polars DataFrame, ready for further analysis
        or visualization.
    """
    self.transform_query()
    self.extract_data()
    self.transform_data()

    if not hasattr(self.context_params, "warnings"):
        self.context_params.warnings = []

    return HumblObject(
        results=self.transformed_data,
        provider=self.context_params.provider,
        equity_data=self.equity_historical_data.serialize(),
        warnings=self.context_params.warnings,
        chart=self.chart,
        context_params=self.context_params,
        command_params=self.command_params,
    )
humbldata.core.standard_models.toolbox.ToolboxQueryParams ¤

Bases: QueryParams

Query parameters for the ToolboxController.

This class defines the query parameters used by the ToolboxController, including the stock symbol, data interval, start date, and end date. It also includes a method to ensure the stock symbol is in uppercase. If no dates constraints are given, it will collect the MAX amount of data available.

Parameters:

Name Type Description Default
symbol str | list[str] | set[str]

The symbol or ticker of the stock. You can pass multiple tickers like: "AAPL", "AAPL, MSFT" or ["AAPL", "MSFT"]. The input will be converted to uppercase.

""
interval str | None

The interval of the data. Can be None.

"1d"
start_date str

The start date for the data query.

""
end_date str

The end date for the data query.

""
provider OBB_EQUITY_PRICE_HISTORICAL_PROVIDERS

The data provider to be used for the query.

"yfinance"
membership str

The membership level of the user.

"anonymous"

Methods:

Name Description
upper_symbol

A Pydantic @field_validator() that converts the stock symbol to uppercase. If a list or set of symbols is provided, each symbol in the collection is converted to uppercase and returned as a comma-separated string.

validate_interval

A Pydantic @field_validator() that validates the interval format. Ensures the interval is a number followed by one of 's', 'm', 'h', 'd', 'W', 'M', 'Q', 'Y'.

validate_date_format

A Pydantic @field_validator() that validates the date format to ensure it is YYYY-MM-DD.

validate_start_date

A Pydantic @model_validator() that validates and adjusts the start date based on membership level.

Raises:

Type Description
ValueError

If the symbol parameter is a list and not all elements are strings, or if symbol is not a string, list, or set. If the interval format is invalid. If the date format is invalid.

Notes

A Pydantic v2 Model

Source code in src/humbldata/core/standard_models/toolbox/__init__.py
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
class ToolboxQueryParams(QueryParams):
    """
    Query parameters for the ToolboxController.

    This class defines the query parameters used by the ToolboxController,
    including the stock symbol, data interval, start date, and end date. It also
    includes a method to ensure the stock symbol is in uppercase.
    If no dates constraints are given, it will collect the MAX amount of data
    available.

    Parameters
    ----------
    symbol : str | list[str] | set[str], default=""
        The symbol or ticker of the stock. You can pass multiple tickers like:
        "AAPL", "AAPL, MSFT" or ["AAPL", "MSFT"]. The input will be converted
        to uppercase.
    interval : str | None, default="1d"
        The interval of the data. Can be None.
    start_date : str, default=""
        The start date for the data query.
    end_date : str, default=""
        The end date for the data query.
    provider : OBB_EQUITY_PRICE_HISTORICAL_PROVIDERS, default="yfinance"
        The data provider to be used for the query.
    membership : str, default="anonymous"
        The membership level of the user.

    Methods
    -------
    upper_symbol(cls, v: Union[str, list[str], set[str]]) -> Union[str, list[str]]
        A Pydantic `@field_validator()` that converts the stock symbol to
        uppercase. If a list or set of symbols is provided, each symbol in the
        collection is converted to uppercase and returned as a comma-separated
        string.
    validate_interval(cls, v: str) -> str
        A Pydantic `@field_validator()` that validates the interval format.
        Ensures the interval is a number followed by one of 's', 'm', 'h', 'd', 'W', 'M', 'Q', 'Y'.
    validate_date_format(cls, v: str | date) -> date
        A Pydantic `@field_validator()` that validates the date format to ensure it is YYYY-MM-DD.
    validate_start_date(self) -> 'ToolboxQueryParams'
        A Pydantic `@model_validator()` that validates and adjusts the start date based on membership level.

    Raises
    ------
    ValueError
        If the `symbol` parameter is a list and not all elements are strings, or
        if `symbol` is not a string, list, or set.
        If the `interval` format is invalid.
        If the `date` format is invalid.

    Notes
    -----
    A Pydantic v2 Model

    """

    symbols: str | list[str] = Field(
        default=["AAPL"],
        title="Symbols/Tickers",
        description=QUERY_DESCRIPTIONS.get("symbols", ""),
    )
    interval: str | None = Field(
        default="1d",
        title="Interval",
        description=QUERY_DESCRIPTIONS.get("interval", ""),
    )
    start_date: dt.date | str = Field(
        default_factory=lambda: dt.date(1950, 1, 1),
        title="Start Date",
        description="The starting date for the data query.",
    )
    end_date: dt.date | str = Field(
        default_factory=lambda: dt.datetime.now(
            tz=pytz.timezone("America/New_York")
        ).date(),
        title="End Date",
        description="The ending date for the data query.",
    )
    provider: OBB_EQUITY_PRICE_HISTORICAL_PROVIDERS = Field(
        default="yfinance",
        title="Provider",
        description=QUERY_DESCRIPTIONS.get("provider", ""),
    )
    membership: Literal[
        "anonymous", "peon", "premium", "power", "permanent", "admin"
    ] = Field(
        default="anonymous",
        title="Membership",
        description=QUERY_DESCRIPTIONS.get("membership", ""),
    )

    @field_validator("symbols", mode="before", check_fields=False)
    @classmethod
    def upper_symbol(cls, v: str | list[str] | set[str]) -> list[str]:
        """
        Convert the stock symbols to uppercase and remove empty strings.

        Parameters
        ----------
        v : Union[str, List[str], Set[str]]
            The stock symbol or collection of symbols to be converted.

        Returns
        -------
        List[str]
            A list of uppercase stock symbols with empty strings removed.
        """
        # Handle empty inputs
        if not v:
            return []

        # If v is a string, split it by commas into a list. Otherwise, ensure it's a list.
        v = v.split(",") if isinstance(v, str) else list(v)

        # Convert all elements to uppercase, trim whitespace, and remove empty strings
        valid_symbols = [
            symbol.strip().upper() for symbol in v if symbol.strip()
        ]

        if not valid_symbols:
            msg = "At least one valid symbol (str) must be provided"
            raise ValueError(msg)

        return valid_symbols

    @field_validator("interval", mode="before", check_fields=False)
    @classmethod
    def validate_interval(cls, v: str) -> str:
        """
        Validate the interval format.

        Parameters
        ----------
        v : str
            The interval string to be validated.

        Returns
        -------
        str
            The validated interval string.

        Raises
        ------
        ValueError
            If the interval format is invalid.
        """
        if not re.match(r"^\d*[smhdWMQY]$", v):
            msg = "Invalid interval format. Must be a number followed by one of 's', 'm', 'h', 'd', 'W', 'M', 'Q', 'Y'."
            raise ValueError(msg)
        return v

    @field_validator("start_date", "end_date", mode="before")
    @classmethod
    def validate_date_format(cls, v: str | dt.date) -> dt.date:
        """
        Validate and convert the input date to a datetime.date object.

        This method accepts either a string in 'YYYY-MM-DD' format or a datetime.date object.
        It converts the input to a datetime.date object, ensuring it's in the correct format.

        Parameters
        ----------
        v : str | dt.date
            The input date to validate and convert.

        Returns
        -------
        dt.date
            The validated and converted date.

        Raises
        ------
        ValueError
            If the input string is not in the correct format.
        TypeError
            If the input is neither a string nor a datetime.date object.
        """
        if isinstance(v, str):
            try:
                date = datetime.strptime(v, "%Y-%m-%d").replace(
                    tzinfo=pytz.timezone("America/New_York")
                )
            except ValueError as e:
                msg = f"Invalid date format. Must be YYYY-MM-DD: {e}"
                raise ValueError(msg) from e
        elif isinstance(v, dt.date):
            date = datetime.combine(v, datetime.min.time()).replace(
                tzinfo=pytz.timezone("America/New_York")
            )
        else:
            msg = f"Expected str or date, got {type(v)}"
            raise TypeError(msg)

        # Check if the date is in the correct format
        if date.strftime("%Y-%m-%d") != date.strftime("%Y-%m-%d"):
            msg = "Date must be in YYYY-MM-DD format"
            raise ValueError(msg)
        if date.date() < dt.date(1950, 1, 1):
            msg = "Date must be after 1950-01-01"
            raise ValueError(msg)

        return date.date()

    @model_validator(mode="after")
    def validate_start_date(self) -> "ToolboxQueryParams":
        end_date: dt.date = self.end_date  # type: ignore  # noqa: PGH003 the date has already been converted to date

        start_date_mapping = {
            "anonymous": (end_date - timedelta(days=365), "1Y"),
            "peon": (end_date - timedelta(days=730), "2Y"),
            "premium": (end_date - timedelta(days=1825), "5Y"),
            "power": (end_date - timedelta(days=7300), "20Y"),
            "permanent": (end_date - timedelta(days=10680), "30Y"),
            "admin": (
                datetime(
                    1950, 1, 1, tzinfo=pytz.timezone("America/New_York")
                ).date(),
                "All",
            ),
        }

        allowed_start_date, data_length = start_date_mapping.get(
            self.membership.lower(), (end_date - timedelta(days=365), "1Y")
        )

        if self.start_date < allowed_start_date:  # type: ignore  # noqa: PGH003 the date has already been converted to date
            logger.warning(
                f"Start date adjusted to {allowed_start_date} based on {self.membership} membership ({data_length} of data)."
            )
            self.start_date = allowed_start_date
            if not hasattr(self, "warnings"):
                self.warnings = []
            self.warnings.append(
                HumblDataWarning(
                    category="ToolboxQueryParams",
                    message=f"Start date adjusted to {allowed_start_date} based on {self.membership} membership ({data_length} of data).",
                )
            )

        return self
humbldata.core.standard_models.toolbox.ToolboxQueryParams.upper_symbol classmethod ¤
upper_symbol(v: str | list[str] | set[str]) -> list[str]

Convert the stock symbols to uppercase and remove empty strings.

Parameters:

Name Type Description Default
v Union[str, List[str], Set[str]]

The stock symbol or collection of symbols to be converted.

required

Returns:

Type Description
List[str]

A list of uppercase stock symbols with empty strings removed.

Source code in src/humbldata/core/standard_models/toolbox/__init__.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
@field_validator("symbols", mode="before", check_fields=False)
@classmethod
def upper_symbol(cls, v: str | list[str] | set[str]) -> list[str]:
    """
    Convert the stock symbols to uppercase and remove empty strings.

    Parameters
    ----------
    v : Union[str, List[str], Set[str]]
        The stock symbol or collection of symbols to be converted.

    Returns
    -------
    List[str]
        A list of uppercase stock symbols with empty strings removed.
    """
    # Handle empty inputs
    if not v:
        return []

    # If v is a string, split it by commas into a list. Otherwise, ensure it's a list.
    v = v.split(",") if isinstance(v, str) else list(v)

    # Convert all elements to uppercase, trim whitespace, and remove empty strings
    valid_symbols = [
        symbol.strip().upper() for symbol in v if symbol.strip()
    ]

    if not valid_symbols:
        msg = "At least one valid symbol (str) must be provided"
        raise ValueError(msg)

    return valid_symbols
humbldata.core.standard_models.toolbox.ToolboxQueryParams.validate_interval classmethod ¤
validate_interval(v: str) -> str

Validate the interval format.

Parameters:

Name Type Description Default
v str

The interval string to be validated.

required

Returns:

Type Description
str

The validated interval string.

Raises:

Type Description
ValueError

If the interval format is invalid.

Source code in src/humbldata/core/standard_models/toolbox/__init__.py
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
@field_validator("interval", mode="before", check_fields=False)
@classmethod
def validate_interval(cls, v: str) -> str:
    """
    Validate the interval format.

    Parameters
    ----------
    v : str
        The interval string to be validated.

    Returns
    -------
    str
        The validated interval string.

    Raises
    ------
    ValueError
        If the interval format is invalid.
    """
    if not re.match(r"^\d*[smhdWMQY]$", v):
        msg = "Invalid interval format. Must be a number followed by one of 's', 'm', 'h', 'd', 'W', 'M', 'Q', 'Y'."
        raise ValueError(msg)
    return v
humbldata.core.standard_models.toolbox.ToolboxQueryParams.validate_date_format classmethod ¤
validate_date_format(v: str | date) -> date

Validate and convert the input date to a datetime.date object.

This method accepts either a string in 'YYYY-MM-DD' format or a datetime.date object. It converts the input to a datetime.date object, ensuring it's in the correct format.

Parameters:

Name Type Description Default
v str | date

The input date to validate and convert.

required

Returns:

Type Description
date

The validated and converted date.

Raises:

Type Description
ValueError

If the input string is not in the correct format.

TypeError

If the input is neither a string nor a datetime.date object.

Source code in src/humbldata/core/standard_models/toolbox/__init__.py
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
@field_validator("start_date", "end_date", mode="before")
@classmethod
def validate_date_format(cls, v: str | dt.date) -> dt.date:
    """
    Validate and convert the input date to a datetime.date object.

    This method accepts either a string in 'YYYY-MM-DD' format or a datetime.date object.
    It converts the input to a datetime.date object, ensuring it's in the correct format.

    Parameters
    ----------
    v : str | dt.date
        The input date to validate and convert.

    Returns
    -------
    dt.date
        The validated and converted date.

    Raises
    ------
    ValueError
        If the input string is not in the correct format.
    TypeError
        If the input is neither a string nor a datetime.date object.
    """
    if isinstance(v, str):
        try:
            date = datetime.strptime(v, "%Y-%m-%d").replace(
                tzinfo=pytz.timezone("America/New_York")
            )
        except ValueError as e:
            msg = f"Invalid date format. Must be YYYY-MM-DD: {e}"
            raise ValueError(msg) from e
    elif isinstance(v, dt.date):
        date = datetime.combine(v, datetime.min.time()).replace(
            tzinfo=pytz.timezone("America/New_York")
        )
    else:
        msg = f"Expected str or date, got {type(v)}"
        raise TypeError(msg)

    # Check if the date is in the correct format
    if date.strftime("%Y-%m-%d") != date.strftime("%Y-%m-%d"):
        msg = "Date must be in YYYY-MM-DD format"
        raise ValueError(msg)
    if date.date() < dt.date(1950, 1, 1):
        msg = "Date must be after 1950-01-01"
        raise ValueError(msg)

    return date.date()
humbldata.core.standard_models.toolbox.ToolboxData ¤

Bases: Data

The Data for the ToolboxController.

WIP: I'm thinking that this is the final layer around which the HumblDataObject will be returned to the user, with all necessary information about the query, command, data and charts that they should want. This HumblDataObject will return values in json/dict format, with methods to allow transformation into polars_df, pandas_df, a list, a dict...

Source code in src/humbldata/core/standard_models/toolbox/__init__.py
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
class ToolboxData(Data):
    """
    The Data for the ToolboxController.

    WIP: I'm thinking that this is the final layer around which the
    HumblDataObject will be returned to the user, with all necessary information
    about the query, command, data and charts that they should want.
    This HumblDataObject will return values in json/dict format, with methods
    to allow transformation into polars_df, pandas_df, a list, a dict...
    """

    date: pl.Date = pa.Field(
        default=None,
        title="Date",
        description=DATA_DESCRIPTIONS.get("date", ""),
    )
    open: float = pa.Field(
        default=None,
        title="Open",
        description=DATA_DESCRIPTIONS.get("open", ""),
    )
    high: float = pa.Field(
        default=None,
        title="High",
        description=DATA_DESCRIPTIONS.get("high", ""),
    )
    low: float = pa.Field(
        default=None,
        title="Low",
        description=DATA_DESCRIPTIONS.get("low", ""),
    )
    close: float = pa.Field(
        default=None,
        title="Close",
        description=DATA_DESCRIPTIONS.get("close", ""),
    )
    volume: int = pa.Field(
        default=None,
        title="Volume",
        description=DATA_DESCRIPTIONS.get("volume", ""),
    )