Skip to content

Commit

Permalink
Merge pull request #131 from KRicketts6237/master
Browse files Browse the repository at this point in the history
Encrypted Read/Write Support for Linux
  • Loading branch information
kevincar authored May 18, 2024
2 parents 1baebef + 0e94947 commit 6da1a59
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 1 deletion.
25 changes: 24 additions & 1 deletion bless/backends/bluezdbus/characteristic.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ async def init(self, service: "BlessGATTService"):
service : BlessGATTService
The service to assign the characteristic to
"""
flags: List[Flags] = flags_to_dbus(self._properties)
flags: List[Flags] = [transform_flags_with_permissions(f, self._permissions) for f in flags_to_dbus(self._properties)]

# Add to our BlueZDBus app
bluez_service: "BlessGATTServiceBlueZDBus" = cast(
Expand Down Expand Up @@ -118,6 +118,29 @@ def uuid(self) -> str:
"""The uuid of this characteristic"""
return self.obj.get("UUID").value

def transform_flags_with_permissions(flag: Flags, permissions: GATTAttributePermissions) -> Flags:
"""
Returns the encrypted variant of a flag if the corresponding permission is set
Parameters
----------
flag : GATTCharacteristicProperties
The numerical enumeration of a single flag
permissions: GATTAttributePermissions
The permissions for the characteristic
Returns
-------
List[Flags]
A Flags enum value for use in BlueZDBus that has been updated to reflect if it should be encrypted
"""
if flag == Flags.READ and GATTAttributePermissions.read_encryption_required in permissions:
return Flags.ENCRYPT_READ
elif flag == Flags.WRITE and GATTAttributePermissions.write_encryption_required in permissions:
return Flags.ENCRYPT_WRITE

return flag

def flags_to_dbus(flags: GATTCharacteristicProperties) -> List[Flags]:
"""
Expand Down
9 changes: 9 additions & 0 deletions test/backends/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import uuid
import pytest
import asyncio
import os
import aioconsole # type: ignore

import numpy as np # type: ignore
Expand All @@ -26,6 +27,7 @@
)

hardware_only = pytest.mark.skipif("os.environ.get('TEST_HARDWARE') is None")
use_encrypted = os.environ.get('TEST_ENCRYPTED') is not None


@hardware_only
Expand Down Expand Up @@ -83,6 +85,13 @@ async def test_server(self):
GATTAttributePermissions.readable |
GATTAttributePermissions.writeable
)

if use_encrypted:
print("\nEncryption has been enabled, ensure that you are bonded")
permissions = (
GATTAttributePermissions.read_encryption_required |
GATTAttributePermissions.write_encryption_required
)

await server.add_new_characteristic(
service_uuid,
Expand Down

0 comments on commit 6da1a59

Please sign in to comment.