Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OnDemandFeatureViews & online async feature retrieval & passing unknown entity values --> raises TypeError when it should not #4473

Open
job-almekinders opened this issue Sep 2, 2024 · 5 comments

Comments

@job-almekinders
Copy link
Contributor

job-almekinders commented Sep 2, 2024

Context

Factors to Consider

  1. I call the get_online_features_async method on the FeatureStore.
  2. I am passing an "entity value" that is not materialized in the online store. For example, only driver_id 1 and 2 are materialized in the online store. However, I’m passing 99999.
  3. I’m using an OnDemandFeatureView.

Expected Behavior

When only points 1 and 2 are true, I get a response that has None values for all the features. This functionality works as expected.

When all three points are true, I would also expect to get a response that has None values for all the features. However, this is not the case.

Current Behavior

When only points 1 and 2 are true, I get a response that has None values for all the features. This functionality works as expected.

I would expect this to be true as well when using OnDemandFeatureViews (ODFVs). However, when all three points are true, the following error is raised: TypeError: Couldn't infer value type from empty value.

Steps to Reproduce

This branch contains an additional integration test named test_async_online_retrieval_with_event_timestamps_null_only, which acts as a minimal failing example.

The error is raised when the python_values_to_proto_values method is called in the sdk/python/feast/utils.py file, which is invoked by the get_online_features_async method in the sdk/python/feast/infra/online_stores/online_store.py file (these locations are marked with # breakpoint() in the linked branch).

Furthermore, this error is only happening when we are "only" passing unknown entity values. For example, if we are only passing the unknown entity value 99999, it will fail. If we pass the known entity value 1 and the unknown value 99999, it will be successful.

You can run this test by creating a virtual environment, and run this command in the shell:

PYTHONPATH='.' \
                FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.online_stores.contrib.postgres_repo_configuration \
                PYTEST_PLUGINS=sdk.python.tests.integration.feature_repos.universal.online_store.postgres \
                python -m pytest --integration sdk/python/tests/integration/online_store/test_universal_online.py::test_async_online_retrieval_with_event_timestamps_null_only

Another method to re-produce:

docker-compose.yml

---
version: "3"
services:
  offline_store:
    image: postgres:16-alpine
    container_name: offline_store
    ports:
      - "6543:5432"
    environment:
      - POSTGRES_DB=offline_store
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
    volumes:
      - ./postgres_init:/docker-entrypoint-initdb.d
  online_store:
    image: postgres:16-alpine
    container_name: online_store
    ports:
      - "5432:5432"
    environment:
      - POSTGRES_DB=online_store
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres

feature_store.yml

project: feast_tryout
provider: local
registry:
  registry_type: sql
  path: postgresql+psycopg2://postgres:postgres@0.0.0.0:5432/online_store
  cache_ttl_seconds: 60
online_store:
  type: postgres
  host: 0.0.0.0
  port: 5432
  database: online_store
  db_schema: online
  user: postgres
  password: postgres
offline_store:
  type: postgres
  host: 0.0.0.0
  port: 6543
  database: offline_store
  db_schema: offline
  user: postgres
  password: postgres
entity_key_serialization_version: 2

Insert into offline store (postgres)
postgres_init/create-offline-store-database.sql

CREATE SCHEMA offline;

CREATE TABLE offline.features (
  "ENTITY_ID"         VARCHAR,
  "EVENT_TIMESTAMP" TIMESTAMP,
  "ENTITY_FLOAT"      FLOAT,
);

INSERT INTO offline.features
SELECT *
FROM (
  VALUES ('11111111', '2024-01-01 13:00:00' :: TIMESTAMP, 1.1),
         ('11111111', '2024-01-01 14:00:00' :: TIMESTAMP, 1.11),
         ('11111111', '2024-01-01 15:00:00' :: TIMESTAMP, 1.111),
         ('22222222', '2024-01-01 13:00:00' :: TIMESTAMP, 2.2),
         ('22222222', '2024-01-01 14:00:00' :: TIMESTAMP, 2.22),
         ('33333333', '2024-01-01 13:00:00' :: TIMESTAMP, 3.3),
         ('44444444', '2024-01-02 22:00:00' :: TIMESTAMP, 4.4)
  )

bootstrap.py

from datetime import timedelta
from typing import Any

import pandas as pd
from feast import (
    Entity,
    FeatureService,
    FeatureStore,
    FeatureView,
    Field,
    RequestSource,
    ValueType,
)
from feast.infra.offline_stores.contrib.postgres_offline_store.postgres_source import (
    PostgreSQLSource as PostgresSource,
)
from feast.on_demand_feature_view import on_demand_feature_view
from feast.types import Float32, Float64

feature_store = FeatureStore()

features_entity = Entity(
    name="entity_id",
    join_keys=["ENTITY_ID"],
    value_type=ValueType.STRING,
)

features_source = PostgresSource(
    name="features",
    timestamp_field="EVENT_TIMESTAMP",
    table="offline.features",
)

features_feature_view = FeatureView(
    name="features_feature_view",
    entities=[features_entity],
    ttl=timedelta(days=0),
    schema=[Field(name="ENTITY_FLOAT", dtype=Float32)],
    online=True,
    source=features_source,
)

request_source = RequestSource(
    name="request_feature",
    schema=[Field(name="REQUEST_FLOAT", dtype=Float32)],
)


@on_demand_feature_view(
    sources=[features_feature_view, request_source],
    schema=[
        Field(name="ENTITY_FLOAT_TRANSFORMED_PANDAS", dtype=Float64),
        Field(name="ENTITY_FLOAT_PLUS_REQUEST_SOURCE", dtype=Float64),
    ],
    mode="pandas",
)
def odfv_pandas(input: pd.DataFrame) -> pd.DataFrame:
    output = pd.DataFrame()
    output["ENTITY_FLOAT_TRANSFORMED_PANDAS"] = input["ENTITY_FLOAT"] * 2
    output["ENTITY_FLOAT_PLUS_REQUEST_SOURCE"] = (
        input["ENTITY_FLOAT"] * input["REQUEST_FLOAT"]
    )
    return output


@on_demand_feature_view(
    sources=[features_feature_view, request_source],
    schema=[Field(name="ENTITY_FLOAT_TRANSFORMED_PYTHON", dtype=Float64)],
    mode="python",
)
def odfv_python(input: dict[str, Any]) -> dict[str, Any]:
    output = {}
    output["ENTITY_FLOAT_TRANSFORMED_PYTHON"] = [
        value * 2 if value is not None else None for value in input["ENTITY_FLOAT"]
    ]

    output["ENTITY_FLOAT_PLUS_REQUEST_SOURCE_PYTHON"] = [
        (e + r) if e is not None and r is not None else None
        for e, r in zip(input["ENTITY_FLOAT"], input["REQUEST_FLOAT"])
    ]
    return output


features_feature_service_pandas = FeatureService(
    name="features_feature_service_pandas",
    features=[features_feature_view, odfv_pandas],
)

features_feature_service_python = FeatureService(
    name="features_feature_service_python",
    features=[features_feature_view, odfv_python],
)

feature_store.apply(
    [
        features_entity,
        features_source,
        features_feature_view,
        odfv_pandas,
        odfv_python,
        features_feature_service_pandas,
        features_feature_service_python,
    ]
)

materialize

from datetime import datetime

from feast import FeatureStore

feature_store = FeatureStore()
feature_store.materialize(
    start_date=datetime(1900, 1, 1),
    end_date=datetime(9999, 1, 1),
    feature_views=["features_feature_view"],
)

inference

import pandas as pd
from feast import FeatureStore

feature_store = FeatureStore()
feature_service_pandas = feature_store.get_feature_service(
    name="features_feature_service_pandas"
)
feature_service_python = feature_store.get_feature_service(
    name="features_feature_service_python"
)

entity_rows = [
    # This entity ID is not in the offline or online store
    {"ENTITY_ID": "1", "REQUEST_FLOAT": 1.0},
]
entity_df = pd.DataFrame(entity_rows)
entity_df["event_timestamp"] = pd.to_datetime("now", utc=True)

# This works.
print("offline with pandas")
offline_features = feature_store.get_historical_features(
    entity_df=entity_df,
    features=feature_service_pandas,
).to_df()
print(list(offline_features.to_dict().keys()))

## This doesn't work, raises the error
# print("online with pandas")
# online_features = feature_store.get_online_features(
#     entity_rows=entity_rows,
#     features=feature_service_pandas,
# ).to_dict()
# print(list(online_features.keys()))

## This doesn't work, raises the error
# print("online with python")
# online_features = feature_store.get_online_features(
#     entity_rows=entity_rows,
#     features=feature_service_python,
# ).to_dict()
# print(list(online_features.keys()))

Specifications

  • Version: 0.36.0
  • Platform: macOS - M1
  • Subsystem: Sonoma 14.1.1

Possible Solution

I’m not entirely sure why ValueType.UNKNOWN is passed to the feature_type argument of the python_values_to_proto_values method. If we were to pass another value, I believe the method would succeed, as the if statement that raises the error would not be triggered.

@job-almekinders
Copy link
Contributor Author

Hey @tokoko, I see that you were looking into this. Did you already manage to learn something about this issue?

No worries if you haven't had the time of course! Wouldn't want to pressure you :)

@job-almekinders
Copy link
Contributor Author

job-almekinders commented Sep 4, 2024

I've updated the description with an alternative re-producable method, let me know whether this helps!

@franciscojavierarceo franciscojavierarceo self-assigned this Sep 9, 2024
@franciscojavierarceo
Copy link
Member

Yeah, I get why this could happen. I'll try to make a patch for it. Should be reasonable.

@franciscojavierarceo
Copy link
Member

We'll maybe log skipping the odfv

@job-almekinders
Copy link
Contributor Author

Yeah, I get why this could happen. I'll try to make a patch for it. Should be reasonable.

Nice, good to hear! Let me know if I can assist with anything.

We'll maybe log skipping the odfv

What do you mean with this comment? I'm not sure I understand. @franciscojavierarceo

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants