"""Annual features from SEC sources."""
import logging
import multiprocessing as mp
from typing import Literal
import numpy as np
import pandas as pd
import sqlalchemy as sa
from sqlalchemy.engine import Engine
from sqlalchemy.exc import NoResultFound
from .... import backend, utils
from ... import api, sql
from .. import _raw
logging.basicConfig(
format="%(asctime)s | %(levelname)s | %(message)s", level=logging.INFO
)
logger = logging.getLogger(__name__)
[docs]class IndustryAnnual:
"""Methods for gathering industry-averaged annual data from SEC
features.
The class variable :attr:`finagg.sec.feat.Annual.industry` is an
instance of this feature set implementation and is the most popular
interface for calling feature methods.
Examples:
You can aggregate this feature set using a ticker or an industry code
directly.
>>> df1 = finagg.sec.feat.annual.industry.from_refined(ticker="MSFT").head(5)
>>> df2 = finagg.sec.feat.annual.industry.from_refined(code=73).head(5)
>>> pd.testing.assert_frame_equal(df1, df2, rtol=1e-4)
"""
[docs] @classmethod
def from_refined(
cls,
/,
*,
ticker: None | str = None,
code: None | str = None,
level: Literal[2, 3, 4] = 2,
start: None | str = None,
end: None | str = None,
engine: None | Engine = None,
) -> pd.DataFrame:
"""Get annual features from the feature store,
aggregated for an entire industry.
The industry can be chosen according to a company or
by an industry code directly. If a company is provided,
then the first ``level`` digits of the company's SIC code
is used for the industry code.
Args:
ticker: Company ticker. Lookup the industry associated
with this company. Mutually exclusive with ``code``.
code: Industry SIC code to use for industry lookup.
Mutually exclusive with ``ticker``.
level: Industry level to aggregate features at.
The industry used according to ``ticker`` or ``code``
is subsampled according to this value. Options include:
- 2 = major group (e.g., furniture and fixtures)
- 3 = industry group (e.g., office furnitures)
- 4 = industry (e.g., wood office furniture)
start: The start date of the observation period. Defaults to the
first recorded date.
end: The end date of the observation period. Defaults to the
last recorded date.
engine: Feature store database engine. Defaults to the engine
at :data:`finagg.backend.engine`.
Returns:
Annual data dataframe with each tag as a
separate column. Sorted by filing date.
Raises:
`ValueError`: If neither a ``ticker`` nor ``code`` are provided.
`NoResultFound`: If there are no rows for ``ticker`` or ``code``
in the refined SQL table.
Examples:
>>> df = finagg.sec.feat.annual.industry.from_refined(ticker="AAPL").head(5)
>>> df["mean"] # doctest: +SKIP
name AssetCoverageRatio DebtEquityRatio EarningsPerShareBasic ...
fy filed ...
2010 2011-02-24 1.577924 1.112079 3.630000 ...
2011 2012-02-24 1.998633 0.886148 1.851429 ...
2012 2013-02-26 1.997748 1.413677 3.750833 ...
2013 2014-03-03 2.232804 1.176639 4.475000 ...
2014 2015-03-17 2.019007 1.080055 2.742143 ...
>>> df["std"] # doctest: +SKIP
name AssetCoverageRatio DebtEquityRatio EarningsPerShareBasic ...
fy filed ...
2010 2011-02-24 0.320778 0.149843 3.436292 ...
2011 2012-02-24 1.371792 0.331815 3.837792 ...
2012 2013-02-26 1.480323 1.085814 3.975254 ...
2013 2014-03-03 1.787131 0.575039 7.236695 ...
2014 2015-03-17 1.303284 0.458704 3.015323 ...
"""
start = start or "1776-07-04"
end = end or utils.today
engine = engine or backend.engine
if not sa.inspect(engine).has_table(sql.submissions.name):
sql.submissions.create(engine)
if not sa.inspect(engine).has_table(sql.annual.name):
sql.annual.create(engine)
with engine.begin() as conn:
if ticker:
(sic,) = conn.execute(
sa.select(sql.submissions.c.sic).where(
sql.submissions.c.ticker == ticker
)
).one()
code = str(sic)[:level]
elif code:
code = str(code)[:level]
else:
raise ValueError("Must provide a `ticker` or `code`.")
df = pd.DataFrame(
conn.execute(
sql.annual.select()
.join(
sql.submissions,
(sql.submissions.c.cik == sql.annual.c.cik)
& (sql.submissions.c.sic.startswith(code)),
)
.where(sql.annual.c.filed >= start, sql.annual.c.filed <= end)
)
)
if not len(df.index):
raise NoResultFound(f"No industry annual rows found for industry {code}.")
df = df.drop(columns=["cik"])
df = df.melt(["fy", "filed"], var_name="name", value_name="value").set_index(
["fy"]
)
df["filed"] = df.groupby(["fy"])["filed"].max()
return (
df.reset_index() # type: ignore[return-value]
.set_index(["fy", "filed"])
.groupby(["fy", "filed", "name"])
.agg(["mean", "std"])
.reset_index()
.pivot(index=["fy", "filed"], columns="name")["value"]
.sort_index()
.dropna()
)
[docs]class NormalizedAnnual:
"""Annual features from SEC EDGAR data normalized according to industry
averages and standard deviations.
The class variable :attr:`finagg.sec.feat.Annual.normalized` is an
instance of this feature set implementation and is the most popular
interface for calling feature methods.
Examples:
It doesn't matter which data source you use to gather features.
They both return equivalent dataframes.
>>> df1 = finagg.sec.feat.annual.normalized.from_other_refined("AAPL").head(5)
>>> df2 = finagg.sec.feat.annual.normalized.from_refined("AAPL").head(5)
>>> pd.testing.assert_frame_equal(df1, df2, rtol=1e-4)
"""
[docs] @classmethod
def from_other_refined(
cls,
ticker: str,
/,
*,
level: Literal[2, 3, 4] = 2,
start: None | str = None,
end: None | str = None,
engine: None | Engine = None,
) -> pd.DataFrame:
"""Get features from other feature SQL tables.
Args:
ticker: Company ticker.
level: Industry level to aggregate relative features at.
The industry used according to ``ticker`` is subsampled
according to this value. Options include:
- 2 = major group (e.g., furniture and fixtures)
- 3 = industry group (e.g., office furnitures)
- 4 = industry (e.g., wood office furniture)
start: The start date of the observation period. Defaults to the
first recorded date.
end: The end date of the observation period. Defaults to the
last recorded date.
engine: Feature store database engine. Defaults to the engine
at :data:`finagg.backend.engine`.
Returns:
Relative annual data dataframe with each tag as a
separate column. Sorted by filing date.
Examples:
>>> finagg.sec.feat.annual.normalized.from_other_refined("AAPL").head(5) # doctest: +SKIP
NORM(LOG_CHANGE(Assets)) NORM(LOG_CHANGE(AssetsCurrent)) NORM(LOG_CHANGE(CommonStockSharesOutstanding)) ...
fy filed ...
2010 2010-10-27 -0.707107 0.707107 0.707107 ...
2011 2011-10-26 0.549435 0.485644 0.574215 ...
2012 2012-10-31 2.160612 -0.609935 0.802452 ...
2013 2013-10-30 0.756833 0.336215 -0.148804 ...
2014 2014-10-27 0.244212 1.384738 2.836717 ...
"""
start = start or "1776-07-04"
end = end or utils.today
engine = engine or backend.engine
company_df = Annual.from_refined(
ticker, start=start, end=end, engine=engine
).reset_index(["filed"])
filed = company_df["filed"]
industry_df = IndustryAnnual.from_refined(
ticker=ticker, level=level, start=start, end=end, engine=engine
).reset_index(["filed"])
company_df = (company_df - industry_df["mean"]) / industry_df["std"]
company_df["filed"] = filed
func_cols = utils.get_func_cols(sql.annual)
company_df[func_cols] = company_df[func_cols].fillna(value=0.0)
company_df = (
company_df.reset_index()
.set_index(["fy", "filed"])
.rename(lambda x: f"NORM({x})", axis=1)
)
company_df = (
company_df.ffill()
.dropna()
.reset_index()
.drop_duplicates("filed")
.set_index(["fy", "filed"])
.sort_index()
)
company_df = utils.resolve_col_order(
sql.normalized_annual, company_df, extra_ignore=["filed"]
)
return company_df
[docs] @classmethod
def from_refined(
cls,
ticker: str,
/,
*,
start: None | str = None,
end: None | str = None,
engine: None | Engine = None,
) -> pd.DataFrame:
"""Get features from the features SQL table.
This is the preferred method for accessing features for
offline analysis (assuming data in the local SQL table
is current).
Args:
ticker: Company ticker.
start: The start date of the observation period. Defaults to the
first recorded date.
end: The end date of the observation period. Defaults to the
last recorded date.
engine: Feature store database engine. Defaults to the engine
at :data:`finagg.backend.engine`.
Returns:
Annual data dataframe with each tag as a
separate column. Sorted by filing date.
Raises:
`NoResultFound`: If there are no rows for ``ticker`` in the
refined SQL table.
Examples:
>>> finagg.sec.feat.annual.normalized.from_refined("AAPL").head(5) # doctest: +SKIP
NORM(LOG_CHANGE(Assets)) NORM(LOG_CHANGE(AssetsCurrent)) NORM(LOG_CHANGE(CommonStockSharesOutstanding)) ...
fy filed ...
2010 2010-10-27 -0.707107 0.707107 0.707107 ...
2011 2011-10-26 0.549435 0.485644 0.574215 ...
2012 2012-10-31 2.160612 -0.609935 0.802452 ...
2013 2013-10-30 0.756833 0.336215 -0.148804 ...
2014 2014-10-27 0.244212 1.384738 2.836717 ...
"""
start = start or "1776-07-04"
end = end or utils.today
engine = engine or backend.engine
if not sa.inspect(engine).has_table(sql.submissions.name):
sql.submissions.create(engine)
if not sa.inspect(engine).has_table(sql.normalized_annual.name):
sql.normalized_annual.create(engine)
with engine.begin() as conn:
df = pd.DataFrame(
conn.execute(
sql.normalized_annual.select()
.join(
sql.submissions,
(sql.submissions.c.cik == sql.normalized_annual.c.cik)
& (sql.submissions.c.ticker == ticker),
)
.where(
sql.normalized_annual.c.filed >= start,
sql.normalized_annual.c.filed <= end,
)
)
)
if not len(df.index):
raise NoResultFound(
f"No industry-normalized annual rows found for {ticker}."
)
df = df.drop(columns=["cik"]).set_index(["fy", "filed"]).sort_index()
return df
[docs] @classmethod
def get_candidate_ticker_set(
cls,
lb: int = 1,
*,
start: None | str = None,
end: None | str = None,
engine: None | Engine = None,
) -> set[str]:
"""Get all unique tickers in the annual SQL table that MAY BE
ELIGIBLE to be in the feature's SQL table.
This is just an alias for :meth:`finagg.sec.feat.Annual.get_ticker_set`.
Args:
lb: Minimum number of rows required to include a ticker in the
returned set.
start: The start date of the observation period to include when
searching for tickers. Defaults to the first recorded date.
end: The end date of the observation period to include when
searching for tickers. Defaults to the last recorded date.
engine: Feature store database engine. Defaults to the engine
at :data:`finagg.backend.engine`.
Returns:
All unique tickers that may be valid for creating
industry-normalized annual features that also have at least
``lb`` rows for each tag used for constructing the features.
Examples:
>>> "AAPL" in finagg.sec.feat.annual.normalized.get_candidate_ticker_set() # doctest: +SKIP
True
"""
return Annual.get_ticker_set(lb=lb, start=start, end=end, engine=engine)
[docs] @classmethod
def get_ticker_set(
cls,
lb: int = 1,
*,
start: None | str = None,
end: None | str = None,
engine: None | Engine = None,
) -> set[str]:
"""Get all unique tickers in the feature's SQL table.
Args:
lb: Minimum number of rows required to include a ticker in the
returned set.
start: The start date of the observation period to include when
searching for tickers. Defaults to the first recorded date.
end: The end date of the observation period to include when
searching for tickers. Defaults to the last recorded date.
engine: Feature store database engine. Defaults to the engine
at :data:`finagg.backend.engine`.
Returns:
All unique tickers that contain all the columns for creating
industry-normalized annual features that also have at least
``lb`` rows.
Examples:
>>> "AAPL" in finagg.sec.feat.annual.normalized.get_ticker_set() # doctest: +SKIP
True
"""
start = start or "1776-07-04"
end = end or utils.today
engine = engine or backend.engine
if not sa.inspect(engine).has_table(sql.submissions.name):
sql.submissions.create(engine)
if not sa.inspect(engine).has_table(sql.normalized_annual.name):
sql.normalized_annual.create(engine)
with engine.begin() as conn:
tickers = (
conn.execute(
sa.select(sql.submissions.c.ticker)
.join(
sql.normalized_annual,
sql.normalized_annual.c.cik == sql.submissions.c.cik,
)
.where(
sql.normalized_annual.c.filed >= start,
sql.normalized_annual.c.filed <= end,
)
.group_by(sql.normalized_annual.c.cik)
.having(sa.func.count(sql.normalized_annual.c.filed) >= lb)
)
.scalars()
.all()
)
return set(tickers)
[docs] @classmethod
def get_tickers_sorted_by(
cls,
column: str,
/,
*,
ascending: bool = True,
year: int = -1,
engine: None | Engine = None,
) -> list[str]:
"""Get all tickers in the feature's SQL table sorted by a particular
column.
Args:
column: Feature column to sort by.
ascending: Whether to return results in ascending order according
to the values in ``column``.
year: Year to select from. Defaults to the most recent year that
has data available.
engine: Feature store database engine. Defaults to the engine
at :data:`finagg.backend.engine`.
Returns:
Tickers sorted by a feature column for a particular year.
Examples:
>>> ts = finagg.sec.feat.annual.normalized.get_tickers_sorted_by(
... "NORM(EarningsPerShareBasic)",
... year=2020,
... )
>>> "AMD" == ts[0] # doctest: +SKIP
True
"""
engine = engine or backend.engine
if not sa.inspect(engine).has_table(sql.submissions.name):
sql.submissions.create(engine)
if not sa.inspect(engine).has_table(sql.normalized_annual.name):
sql.normalized_annual.create(engine)
with engine.begin() as conn:
if year == -1:
(max_year,) = conn.execute(
sa.select(sa.func.max(sql.normalized_annual.c.fy))
).one()
year = int(max_year)
tickers = (
conn.execute(
sa.select(sql.submissions.c.ticker)
.join(
sql.normalized_annual,
sql.normalized_annual.c.cik == sql.submissions.c.cik,
)
.where(
sql.normalized_annual.c.fy == year,
)
.order_by(sql.normalized_annual.c[column])
)
.scalars()
.all()
)
if not ascending:
return list(reversed(tickers))
return list(tickers)
[docs] @classmethod
def install(
cls,
tickers: None | set[str] = None,
*,
processes: int = mp.cpu_count() - 1,
engine: None | Engine = None,
recreate_tables: bool = False,
) -> int:
"""Install data associated with ``tickers`` by pulling data from the
annual SQL tables, transforming them into normalized features, and
then writing to the refined annual normalized SQL table.
Tables associated with this method are created if they don't already
exist.
Args:
tickers: Set of tickers to install features for. Defaults to all
the candidate tickers from
:meth:`NormalizedAnnual.get_candidate_ticker_set`.
processes: Number of background processes to run in parallel
when processing ``tickers``.
engine: Feature store database engine. Defaults to the engine
at :data:`finagg.backend.engine`.
recreate_tables: Whether to drop and recreate tables, wiping all
previously installed data.
Returns:
Number of rows written to the feature's SQL table.
"""
tickers = tickers or cls.get_candidate_ticker_set(engine=engine)
if not tickers:
logger.info(
"Skipping finagg.sec.feat.annual.normalized installation because no"
" tickers were provided or no tickers were found with prerequisite data"
" (i.e., finagg.sec.feat.annual data)"
)
return 0
engine = engine or backend.engine
if recreate_tables or not sa.inspect(engine).has_table(
sql.normalized_annual.name
):
sql.normalized_annual.drop(engine, checkfirst=True)
sql.normalized_annual.create(engine)
return utils._install(
cls.from_other_refined,
cls.to_refined,
logger,
list(tickers),
engine,
desc="Installing refined SEC industry-normalized annual data",
processes=processes,
)
[docs] @classmethod
def to_refined(
cls,
ticker: str,
df: pd.DataFrame,
/,
*,
engine: None | Engine = None,
) -> int:
"""Write the dataframe to the feature store for ``ticker``.
Args:
ticker: Company ticker.
df: Dataframe to store completely as rows in a local SQL
table.
engine: Feature store database engine. Defaults to the engine
at :data:`finagg.backend.engine`.
Returns:
Number of rows written to the SQL table.
"""
engine = engine or backend.engine
if not sa.inspect(engine).has_table(sql.normalized_annual.name):
sql.normalized_annual.create(engine)
df = df.reset_index(["fy", "filed"])
df["cik"] = sql.get_cik(ticker, engine=engine)
with engine.begin() as conn:
conn.execute(sql.normalized_annual.insert(), df.to_dict(orient="records")) # type: ignore[arg-type]
return len(df.index)
[docs]class Annual:
"""Methods for gathering annual features from SEC EDGAR data.
The module variable :data:`finagg.sec.feat.annual` is an instance of
this feature set implementation and is the most popular interface for
calling feature methods.
Examples:
It doesn't matter which data source you use to gather features.
They all return equivalent dataframes.
>>> df1 = finagg.sec.feat.annual.from_api("AAPL").head(5)
>>> df2 = finagg.sec.feat.annual.from_raw("AAPL").head(5)
>>> df3 = finagg.sec.feat.annual.from_refined("AAPL").head(5)
>>> pd.testing.assert_frame_equal(df1, df2, rtol=1e-4)
>>> pd.testing.assert_frame_equal(df1, df3, rtol=1e-4)
"""
industry = IndustryAnnual()
"""Annual features aggregated for an entire industry.
The most popular way for accessing the :class:`finagg.sec.feat.IndustryAnnual`
feature set.
:meta hide-value:
"""
normalized = NormalizedAnnual()
"""A company's annual features normalized by its industry.
The most popular way for accessing the :class:`finagg.sec.feat.NormalizedAnnual`
feature set.
:meta hide-value:
"""
@classmethod
def _normalize(cls, df: pd.DataFrame, /) -> pd.DataFrame:
"""Normalize annual features columns."""
df = api.compute_financial_ratios(df)
df = df.replace([-np.inf, np.inf], np.nan).ffill()
df = utils.resolve_func_cols(sql.annual, df, drop=True, inplace=True)
df.columns = df.columns.rename(None)
df = utils.resolve_col_order(sql.annual, df, extra_ignore=["filed"])
return df.dropna()
[docs] @classmethod
def from_api(
cls, ticker: str, /, *, start: None | str = None, end: None | str = None
) -> pd.DataFrame:
"""Get annual features directly from the SEC API.
Args:
ticker: Company ticker.
start: The start date of the observation period. Defaults to the
first recorded date.
end: The end date of the observation period. Defaults to the
last recorded date.
Returns:
Annual data dataframe with each tag as a
separate column. Sorted by filing date.
Examples:
>>> finagg.sec.feat.annual.from_api("AAPL").head(5) # doctest: +SKIP
LOG_CHANGE(Assets) LOG_CHANGE(AssetsCurrent) LOG_CHANGE(CommonStockSharesOutstanding) ...
fy filed ...
2010 2010-10-27 -0.089864 -0.023676 0.012840 ...
2011 2011-10-26 0.272493 0.278241 0.017805 ...
2012 2012-10-31 0.896033 0.076422 0.014423 ...
2013 2013-10-30 0.414064 0.248046 0.010630 ...
2014 2014-10-27 0.161871 0.239927 1.902394 ...
"""
df = api.company_concept.get_multiple_original(
api.popular_concepts, ticker=ticker, form="10-K", start=start, end=end
)
df = api.group_and_pivot_filings(df, form="10-K")
return cls._normalize(df)
[docs] @classmethod
def from_raw(
cls,
ticker: str,
/,
*,
start: None | str = None,
end: None | str = None,
engine: None | Engine = None,
) -> pd.DataFrame:
"""Get annual features from a local SEC SQL table.
Not all data series are published at the same rate or
time. Missing rows for less-frequent annual publications
are forward filled.
Args:
ticker: Company ticker.
start: The start date of the observation period. Defaults to the
first recorded date.
end: The end date of the observation period. Defaults to the
last recorded date.
engine: Feature store database engine. Defaults to the engine
at :data:`finagg.backend.engine`.
Returns:
Annual data dataframe with each tag as a
separate column. Sorted by filing date.
Examples:
>>> finagg.sec.feat.annual.from_raw("AAPL").head(5) # doctest: +SKIP
LOG_CHANGE(Assets) LOG_CHANGE(AssetsCurrent) LOG_CHANGE(CommonStockSharesOutstanding) ...
fy filed ...
2010 2010-10-27 -0.089864 -0.023676 0.012840 ...
2011 2011-10-26 0.272493 0.278241 0.017805 ...
2012 2012-10-31 0.896033 0.076422 0.014423 ...
2013 2013-10-30 0.414064 0.248046 0.010630 ...
2014 2014-10-27 0.161871 0.239927 1.902394 ...
"""
df = _raw.Tags.group_and_pivot_from_raw(
ticker,
[concept["tag"] for concept in api.popular_concepts],
form="10-K",
start=start,
end=end,
engine=engine,
)
return cls._normalize(df)
[docs] @classmethod
def from_refined(
cls,
ticker: str,
/,
*,
start: None | str = None,
end: None | str = None,
engine: None | Engine = None,
) -> pd.DataFrame:
"""Get features from the refined SQL table.
This is the preferred method for accessing features for
offline analysis (assuming data in the local SQL table
is current).
Args:
ticker: Company ticker.
start: The start date of the observation period. Defaults to the
first recorded date.
end: The end date of the observation period. Defaults to the
last recorded date.
engine: Feature store database engine. Defaults to the engine
at :data:`finagg.backend.engine`.
Returns:
Annual data dataframe with each tag as a
separate column. Sorted by filing date.
Raises:
`NoResultFound`: If there are no rows for ``ticker`` in the
refined SQL table.
Examples:
>>> finagg.sec.feat.annual.from_refined("AAPL").head(5) # doctest: +SKIP
LOG_CHANGE(Assets) LOG_CHANGE(AssetsCurrent) LOG_CHANGE(CommonStockSharesOutstanding) ...
fy filed ...
2010 2010-10-27 -0.089864 -0.023676 0.012840 ...
2011 2011-10-26 0.272493 0.278241 0.017805 ...
2012 2012-10-31 0.896033 0.076422 0.014423 ...
2013 2013-10-30 0.414064 0.248046 0.010630 ...
2014 2014-10-27 0.161871 0.239927 1.902394 ...
"""
start = start or "1776-07-04"
end = end or utils.today
engine = engine or backend.engine
if not sa.inspect(engine).has_table(sql.submissions.name):
sql.submissions.create(engine)
if not sa.inspect(engine).has_table(sql.annual.name):
sql.annual.create(engine)
with engine.begin() as conn:
df = pd.DataFrame(
conn.execute(
sql.annual.select()
.join(
sql.submissions,
(sql.submissions.c.cik == sql.annual.c.cik)
& (sql.submissions.c.ticker == ticker),
)
.where(
sql.annual.c.filed >= start,
sql.annual.c.filed <= end,
)
)
)
if not len(df.index):
raise NoResultFound(f"No annual rows found for {ticker}.")
df = df.drop(columns=["cik"]).set_index(["fy", "filed"]).sort_index()
return df
[docs] @classmethod
def get_candidate_ticker_set(
cls,
lb: int = 1,
*,
start: None | str = None,
end: None | str = None,
engine: None | Engine = None,
) -> set[str]:
"""Get all unique tickers in the raw SQL table that MAY BE ELIGIBLE
to be in the feature's SQL table.
Args:
lb: Minimum number of rows required to include a ticker in the
returned set.
start: The start date of the observation period to include when
searching for tickers. Defaults to the first recorded date.
end: The end date of the observation period to include when
searching for tickers. Defaults to the last recorded date.
engine: Feature store database engine. Defaults to the engine
at :data:`finagg.backend.engine`.
Returns:
All unique tickers that may be valid for creating annual features
that also have at least ``lb`` rows for each tag used for
constructing the features.
Examples:
>>> "AAPL" in finagg.sec.feat.annual.get_candidate_ticker_set() # doctest: +SKIP
True
"""
start = start or "1776-07-04"
end = end or utils.today
engine = engine or backend.engine
if not sa.inspect(engine).has_table(sql.submissions.name):
sql.submissions.create(engine)
if not sa.inspect(engine).has_table(sql.tags.name):
sql.tags.create(engine)
with engine.begin() as conn:
tickers = (
conn.execute(
sa.select(
sql.submissions.c.ticker,
*[
sa.func.sum(
sa.case(
{concept["tag"]: 1}, value=sql.tags.c.tag, else_=0
)
).label(concept["tag"])
for concept in api.popular_concepts
],
)
.join(sql.tags, sql.tags.c.cik == sql.submissions.c.cik)
.where(
sql.tags.c.form == "10-K",
sql.tags.c.filed >= start,
sql.tags.c.filed <= end,
)
.group_by(sql.tags.c.cik)
.having(
*[
sa.text(f"{concept['tag']} >= {lb}")
for concept in api.popular_concepts
]
)
)
.scalars()
.all()
)
return set(tickers)
[docs] @classmethod
def get_ticker_set(
cls,
lb: int = 1,
*,
start: None | str = None,
end: None | str = None,
engine: None | Engine = None,
) -> set[str]:
"""Get all unique tickers in the feature's SQL table.
Args:
lb: Minimum number of rows required to include a ticker in the
returned set.
start: The start date of the observation period to include when
searching for tickers. Defaults to the first recorded date.
end: The end date of the observation period to include when
searching for tickers. Defaults to the last recorded date.
engine: Feature store database engine. Defaults to the engine
at :data:`finagg.backend.engine`.
Returns:
All unique tickers that contain all the columns for creating
annual features that also have at least ``lb`` rows.
Examples:
>>> "AAPL" in finagg.sec.feat.annual.get_ticker_set() # doctest: +SKIP
True
"""
start = start or "1776-07-04"
end = end or utils.today
engine = engine or backend.engine
if not sa.inspect(engine).has_table(sql.submissions.name):
sql.submissions.create(engine)
if not sa.inspect(engine).has_table(sql.annual.name):
sql.annual.create(engine)
with engine.begin() as conn:
tickers = (
conn.execute(
sa.select(sql.submissions.c.ticker)
.join(sql.annual, sql.annual.c.cik == sql.submissions.c.cik)
.where(
sql.annual.c.filed >= start,
sql.annual.c.filed <= end,
)
.group_by(sql.annual.c.cik)
.having(sa.func.count(sql.annual.c.filed) >= lb)
)
.scalars()
.all()
)
return set(tickers)
[docs] @classmethod
def install(
cls,
tickers: None | set[str] = None,
*,
processes: int = mp.cpu_count() - 1,
engine: None | Engine = None,
recreate_tables: bool = False,
) -> int:
"""Install data associated with ``tickers`` by pulling data from the
raw SQL tables, transforming them into annual features, and then
writing to the refined annual SQL table.
Tables associated with this method are created if they don't already
exist.
Args:
tickers: Set of tickers to install features for. Defaults to all
the candidate tickers from
:meth:`Annual.get_candidate_ticker_set`.
processes: Number of background processes to run in parallel
when processing ``tickers``.
engine: Feature store database engine. Defaults to the engine
at :data:`finagg.backend.engine`.
recreate_tables: Whether to drop and recreate tables, wiping all
previously installed data.
Returns:
Number of rows written to the feature's SQL table.
"""
tickers = tickers or cls.get_candidate_ticker_set(engine=engine)
if not tickers:
logger.info(
"Skipping finagg.sec.feat.annual installation because no tickers were"
" provided or no tickers were found with prerequisite data (i.e.,"
" finagg.sec.feat.submissions and finagg.sec.feat.tags data)"
)
return 0
engine = engine or backend.engine
if recreate_tables or not sa.inspect(engine).has_table(sql.annual.name):
sql.annual.drop(engine, checkfirst=True)
sql.annual.create(engine)
return utils._install(
cls.from_raw,
cls.to_refined,
logger,
list(tickers),
engine,
desc="Installing refined SEC annual data",
processes=processes,
)
[docs] @classmethod
def to_refined(
cls,
ticker: str,
df: pd.DataFrame,
/,
*,
engine: None | Engine = None,
) -> int:
"""Write the given dataframe to the refined feature table
while using the ticker ``ticker``.
Args:
ticker: Company ticker.
df: Dataframe to store as rows in a local SQL table
engine: Feature store database engine. Defaults to the engine
at :data:`finagg.backend.engine`.
Returns:
Number of rows written to the SQL table.
"""
engine = engine or backend.engine
if not sa.inspect(engine).has_table(sql.annual.name):
sql.annual.create(engine)
df = df.reset_index(["fy", "filed"])
df["cik"] = sql.get_cik(ticker, engine=engine)
with engine.begin() as conn:
conn.execute(sql.annual.insert(), df.to_dict(orient="records")) # type: ignore[arg-type]
return len(df.index)