Skip to content
This repository has been archived by the owner on Nov 10, 2023. It is now read-only.

Item to Item func #14

Merged
merged 10 commits into from
Dec 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"mktestdocs>=0.1.0",
"interrogate>=1.2.0",
"pre-commit>=2.15.0",
"pyarrow>=6.0.1",
] + all_dep_packages

docs_packages = [
Expand Down
15 changes: 15 additions & 0 deletions tests/test_error.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import pytest
from valves import item_item_counts, sessionize, bayes_average


@pytest.mark.parametrize("func", [item_item_counts, sessionize])
def test_dataf_error_occurs(func):
"""These functions dont have required arugments."""
with pytest.raises(ValueError):
func(1)


def test_dataf_error_bayes_average():
"""bayes_average has required arugments."""
with pytest.raises(ValueError):
bayes_average(1, group_cols="a", target_col="b", C=10)
60 changes: 60 additions & 0 deletions tests/test_item_item_counts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import pandas as pd
import polars as pl
import dask.dataframe as dd

from valves.polars import item_item_counts as ii_pl
from valves.pandas import item_item_counts as ii_pd
from valves.dask import item_item_counts as ii_dd
from valves import item_item_counts

going_in = [
{"user": 1, "item": 1},
{"user": 1, "item": 2},
{"user": 1, "item": 3},
{"user": 2, "item": 2},
{"user": 2, "item": 3},
{"user": 2, "item": 4},
]

going_out = [
{"item": 1, "item_rec": 2, "n_item": 1, "n_item_rec": 2, "n_both": 1},
{"item": 1, "item_rec": 3, "n_item": 1, "n_item_rec": 2, "n_both": 1},
{"item": 2, "item_rec": 1, "n_item": 2, "n_item_rec": 1, "n_both": 1},
{"item": 2, "item_rec": 3, "n_item": 2, "n_item_rec": 2, "n_both": 2},
{"item": 3, "item_rec": 1, "n_item": 2, "n_item_rec": 1, "n_both": 1},
{"item": 3, "item_rec": 2, "n_item": 2, "n_item_rec": 2, "n_both": 2},
{"item": 2, "item_rec": 4, "n_item": 2, "n_item_rec": 1, "n_both": 1},
{"item": 3, "item_rec": 4, "n_item": 2, "n_item_rec": 1, "n_both": 1},
{"item": 4, "item_rec": 2, "n_item": 1, "n_item_rec": 2, "n_both": 1},
{"item": 4, "item_rec": 3, "n_item": 1, "n_item_rec": 2, "n_both": 1},
]


def test_pandas_item_item_counts():
"""It needs to work on some simple pandas datasets"""
result = pd.DataFrame(going_in).pipe(ii_pd).to_dict(orient="records")
assert result == going_out
result = pd.DataFrame(going_in).pipe(item_item_counts).to_dict(orient="records")
assert result == going_out


def test_dask_item_item_counts():
"""It needs to work on some simple dask datasets"""
dask_df = dd.from_pandas(pd.DataFrame(going_in), npartitions=1)
result = dask_df.pipe(ii_dd).compute().to_dict(orient="records")
assert result == going_out
result = dask_df.pipe(item_item_counts).compute().to_dict(orient="records")
assert result == going_out


def test_polars_item_item_counts():
"""It needs to work on some simple polars datasets"""
result = pl.DataFrame(going_in).pipe(ii_pl).to_pandas().to_dict(orient="records")
assert result == going_out
result = (
pl.DataFrame(going_in)
.pipe(item_item_counts)
.to_pandas()
.to_dict(orient="records")
)
assert result == going_out
25 changes: 24 additions & 1 deletion valves/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
else:
from .polars import sessionize as sess_pl
from .polars import bayes_average as bayes_average_pl
from .polars import item_item_counts as ii_pl

try:
import pandas as pd
Expand All @@ -17,6 +18,7 @@
else:
from .pandas import sessionize as sess_pd
from .pandas import bayes_average as bayes_average_pd
from .pandas import item_item_counts as ii_pd

try:
import dask.dataframe as dd
Expand All @@ -27,6 +29,7 @@
else:
from .dask import sessionize as sess_dd
from .dask import bayes_average as bayes_average_dd
from .dask import item_item_counts as ii_dd


def _raise_dataf_error(dataf):
Expand Down Expand Up @@ -79,7 +82,7 @@ def bayes_average(
This function is meant to be used in a `.pipe()`-line.

Arguments:
- dataf: dask dataframe
- dataf: pandas, polars or dask dataframe
- group_cols: list of columns to group by
- target_col: name of the column containing the target value, typically a rating
- C: smoothing parameter
Expand All @@ -93,3 +96,23 @@ def bayes_average(
if _POLARS_AVAILABLE and isinstance(dataf, pl.DataFrame):
return bayes_average_pl(dataf, group_cols, target_col, C, prior_mean, out_col)
_raise_dataf_error(dataf)


def item_item_counts(dataf, user_col="user", item_col="item"):
"""
Computes item-item overlap counts from user-item interactions, useful for recommendations.

This function is meant to be used in a `.pipe()`-line.

Arguments:
- dataf: pandas, polars or dask dataframe
- user_col: name of the column containing the user id
- item_col: name of the column containing the item id
"""
if _DASK_AVAILABLE and isinstance(dataf, dd.DataFrame):
return ii_dd(dataf, user_col, item_col)
if _PANDAS_AVAILABLE and isinstance(dataf, pd.DataFrame):
return ii_pd(dataf, user_col, item_col)
if _POLARS_AVAILABLE and isinstance(dataf, pl.DataFrame):
return ii_pl(dataf, user_col, item_col)
_raise_dataf_error(dataf)
43 changes: 43 additions & 0 deletions valves/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,46 @@ def bayes_average(
.drop(columns=["sum", "size"])
)
return dataf.join(to_join_back, on=group_cols).reset_index()


def item_item_counts(dataf, user_col="user", item_col="item"):
"""
Computes item-item overlap counts from user-item interactions, useful for recommendations.

This function is meant to be used in a `.pipe()`-line.

Arguments:
- dataf: dask dataframe
- user_col: name of the column containing the user id
- item_col: name of the column containing the item id
"""
user_items = dataf[["user", "item"]].drop_duplicates()
return (
user_items.merge(user_items, how="left", on="user")
.rename(columns={f"{item_col}_x": item_col, f"{item_col}_y": f"{item_col}_rec"})
.loc[lambda d: d[item_col] != d[f"{item_col}_rec"]]
.assign(
**{
"n_both": lambda s: s.groupby([item_col, f"{item_col}_rec"])[
user_col
].transform(lambda d: d.nunique()),
f"n_{item_col}_rec": lambda s: s.groupby([f"{item_col}_rec"])[
user_col
].transform(lambda d: d.nunique()),
f"n_{item_col}": lambda s: s.groupby([item_col])[user_col].transform(
lambda d: d.nunique()
),
}
)
.drop(columns=[user_col])
.drop_duplicates()
.reset_index(drop=True)[
[
f"{item_col}",
f"{item_col}_rec",
f"n_{item_col}",
f"n_{item_col}_rec",
"n_both",
]
]
)
43 changes: 43 additions & 0 deletions valves/pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,46 @@ def bayes_average(
.drop(columns=["_s", "_c"])
)
return dataf.join(to_join_back, on=group_cols).reset_index()


def item_item_counts(dataf, user_col="user", item_col="item"):
"""
Computes item-item overlap counts from user-item interactions, useful for recommendations.

This function is meant to be used in a `.pipe()`-line.

Arguments:
- dataf: pandas dataframe
- user_col: name of the column containing the user id
- item_col: name of the column containing the item id
"""
user_items = dataf[["user", "item"]].drop_duplicates()
return (
user_items.merge(user_items, how="left", on="user")
.rename(columns={f"{item_col}_x": item_col, f"{item_col}_y": f"{item_col}_rec"})
.loc[lambda d: d[item_col] != d[f"{item_col}_rec"]]
.assign(
**{
"n_both": lambda s: s.groupby([item_col, f"{item_col}_rec"])[
user_col
].transform(lambda d: d.nunique()),
f"n_{item_col}_rec": lambda s: s.groupby([f"{item_col}_rec"])[
user_col
].transform(lambda d: d.nunique()),
f"n_{item_col}": lambda s: s.groupby([item_col])[user_col].transform(
lambda d: d.nunique()
),
}
)
.drop(columns=[user_col])
.drop_duplicates()
.reset_index(drop=True)[
[
f"{item_col}",
f"{item_col}_rec",
f"n_{item_col}",
f"n_{item_col}_rec",
"n_both",
]
]
)
48 changes: 48 additions & 0 deletions valves/polars.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,51 @@ def bayes_average(
/ (C + pl.col(target_col).count().over(group_cols))
).alias(out_col)
)


def item_item_counts(dataf, user_col="user", item_col="item"):
"""
Computes item-item overlap counts from user-item interactions, useful for recommendations.

This function is meant to be used in a `.pipe()`-line.

Arguments:
- dataf: polars dataframe
- user_col: name of the column containing the user id
- item_col: name of the column containing the item id
"""
return (
dataf.with_columns(
[
pl.col(item_col).list().over("user").alias(f"{item_col}_rec"),
]
)
.explode(f"{item_col}_rec")
.filter(pl.col(item_col) != pl.col(f"{item_col}_rec"))
.with_columns(
[
pl.col(user_col)
.n_unique()
.over(pl.col(item_col))
.alias(f"n_{item_col}"),
pl.col(user_col)
.n_unique()
.over(f"{item_col}_rec")
.alias(f"n_{item_col}_rec"),
pl.col(user_col)
.n_unique()
.over([pl.col(item_col), f"{item_col}_rec"])
.alias("n_both"),
]
)
.select(
[
f"{item_col}",
f"{item_col}_rec",
f"n_{item_col}",
f"n_{item_col}_rec",
"n_both",
]
)
.drop_duplicates()
)