Skip to content

Commit

Permalink
Also update the example gatt
Browse files Browse the repository at this point in the history
  • Loading branch information
kevincar committed Jun 26, 2023
1 parent fd29197 commit 41c3315
Showing 1 changed file with 52 additions and 47 deletions.
99 changes: 52 additions & 47 deletions examples/gattserver.py
Original file line number Diff line number Diff line change
@@ -1,43 +1,40 @@

"""
Example for a BLE 4.0 Server using a GATT dictionary of services and
characteristics
"""

import sys
import logging
import asyncio
import threading

from typing import Any, Dict
from typing import Any, Dict, Union

from bless import ( # type: ignore
BlessServer,
BlessGATTCharacteristic,
GATTCharacteristicProperties,
GATTAttributePermissions
)
BlessServer,
BlessGATTCharacteristic,
GATTCharacteristicProperties,
GATTAttributePermissions,
)

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(name=__name__)
trigger: threading.Event = threading.Event()

trigger: Union[asyncio.Event, threading.Event]
if sys.platform in ["darwin", "win32"]:
trigger = threading.Event()
else:
trigger = asyncio.Event()


def read_request(
characteristic: BlessGATTCharacteristic,
**kwargs
) -> bytearray:
def read_request(characteristic: BlessGATTCharacteristic, **kwargs) -> bytearray:
logger.debug(f"Reading {characteristic.value}")
return characteristic.value


def write_request(
characteristic: BlessGATTCharacteristic,
value: Any,
**kwargs
):
def write_request(characteristic: BlessGATTCharacteristic, value: Any, **kwargs):
characteristic.value = value
logger.debug(f"Char value set to {characteristic.value}")
if characteristic.value == b'\x0f':
if characteristic.value == b"\x0f":
logger.debug("Nice")
trigger.set()

Expand All @@ -47,48 +44,56 @@ async def run(loop):

# Instantiate the server
gatt: Dict = {
"A07498CA-AD5B-474E-940D-16F1FBE7E8CD": {
"51FF12BB-3ED8-46E5-B4F9-D64E2FEC021B": {
"Properties": (GATTCharacteristicProperties.read |
GATTCharacteristicProperties.write |
GATTCharacteristicProperties.indicate),
"Permissions": (GATTAttributePermissions.readable |
GATTAttributePermissions.writeable),
"Value": None
}
},
"5c339364-c7be-4f23-b666-a8ff73a6a86a": {
"bfc0c92f-317d-4ba9-976b-cc11ce77b4ca": {
"Properties": GATTCharacteristicProperties.read,
"Permissions": GATTAttributePermissions.readable,
"Value": bytearray(b'\x69')
}
"A07498CA-AD5B-474E-940D-16F1FBE7E8CD": {
"51FF12BB-3ED8-46E5-B4F9-D64E2FEC021B": {
"Properties": (
GATTCharacteristicProperties.read
| GATTCharacteristicProperties.write
| GATTCharacteristicProperties.indicate
),
"Permissions": (
GATTAttributePermissions.readable
| GATTAttributePermissions.writeable
),
"Value": None,
}
}
},
"5c339364-c7be-4f23-b666-a8ff73a6a86a": {
"bfc0c92f-317d-4ba9-976b-cc11ce77b4ca": {
"Properties": GATTCharacteristicProperties.read,
"Permissions": GATTAttributePermissions.readable,
"Value": bytearray(b"\x69"),
}
},
}
my_service_name = "Test Service"
server = BlessServer(name=my_service_name, loop=loop)
server.read_request_func = read_request
server.write_request_func = write_request

await server.add_gatt(gatt)
await server.start()
logger.debug(server.get_characteristic(
"51FF12BB-3ED8-46E5-B4F9-D64E2FEC021B"))
logger.debug(server.get_characteristic("51FF12BB-3ED8-46E5-B4F9-D64E2FEC021B"))
logger.debug("Advertising")
logger.info("Write '0xF' to the advertised characteristic: " +
"51FF12BB-3ED8-46E5-B4F9-D64E2FEC021B")
trigger.wait()
logger.info(
"Write '0xF' to the advertised characteristic: "
+ "51FF12BB-3ED8-46E5-B4F9-D64E2FEC021B"
)
if trigger.__module__ == "threading":
trigger.wait()
else:
await trigger.wait()
await asyncio.sleep(2)
logger.debug("Updating")
server.get_characteristic("51FF12BB-3ED8-46E5-B4F9-D64E2FEC021B").value = (
bytearray(b"i")
)
server.get_characteristic("51FF12BB-3ED8-46E5-B4F9-D64E2FEC021B").value = bytearray(
b"i"
)
server.update_value(
"A07498CA-AD5B-474E-940D-16F1FBE7E8CD",
"51FF12BB-3ED8-46E5-B4F9-D64E2FEC021B"
)
"A07498CA-AD5B-474E-940D-16F1FBE7E8CD", "51FF12BB-3ED8-46E5-B4F9-D64E2FEC021B"
)
await asyncio.sleep(5)
await server.stop()


loop = asyncio.get_event_loop()
loop.run_until_complete(run(loop))

0 comments on commit 41c3315

Please sign in to comment.