4.1.3. BLE 연결

블루투스 기기 Scan

Python에서 바나나 체온계를 스캔하고 연결하기 위해 필요한 모듈들을 import해야 한다. main.py 파일에 다음과 같이 모듈들을 import 한다.

import asyncio # 비동기 프로그래밍을 위한 모듈
from typing import Optional # 타입 힌트를 위한 모듈
from bleak import BleakScanner, BleakClient # Bluetooth Low Energy(BLE) 통신을 위한 모듈

이 모듈들은 각각 특정 기능을 제공한다.

  • asyncio

    • 비동기 프로그래밍을 지원하는 모듈이다.

    • BLE 장치와의 통신은 시간이 걸릴 수 있으므로, 비동기 처리를 통해 프로그램의 효율성을 높인다.

  • typing (Optional)

    • Python에 타입 힌트를 추가하기 위한 모듈이다.

    • 코드의 가독성을 높이고 잠재적인 오류를 미리 발견할 수 있게 도와준다.

  • bleak (BleakScanner, BleakClient)

    • Bluetooth Low Energy (BLE) 장치와 통신하기 위한 라이브러리이다.

    • BleakScanner: BLE 장치를 검색(스캔)하는 데 사용된다.

    • BleakClient: 검색된 BLE 장치에 연결하고 데이터를 주고받는 데 사용된다.

모듈을 import한 후, 프로그램의 메인 부분을 작성한다. Python은 스크립트가 직접 실행될 때 특별한 동작을 수행하도록 하기 위해 다음과 같은 조건문을 사용한다.

이 조건문은 '이 파일이 직접 실행되는 경우에만 아래 코드를 실행하라'는 의미이다. 이렇게 함으로써 이 파일이 다른 프로그램에 의해 모듈로 import될 때는 실행되지 않고, 직접 실행될 때만 원하는 동작을 수행하게 할 수 있습니다.

if __name__ == '__main__':
    # 메인 프로그램 실행

이후 비동기 처리를 위해 현재 실행중인 이벤트 루프를 가져온다. 코드를 추가한다.

if __name__ == '__main__':
    # 비동기 이벤트 루프 생성
    loop = asyncio.get_event_loop()

이후 전체 기기를 Scan하기 위해 scan 함수를 정의한다. scan하는 함수는 비동기 처리를 위해 async 함수로 구현된다.

# 비동기 형태로 BLE 장치 검색
async def scan():
    ts100_device = []

    print('Scan Start')
    # scan start
    scan_devices = await BleakScanner.discover()
    print('Scan End')

    # 스캔된 device 탐색
    for device in scan_devices:
        # 이름이 없는 Device "Unknown Device로 변경"
        device_name = device.name if device.name else "Unknown Device"
        # 이름에 TS100을 포함하면 콘솔에 출력
        if "TS100" in device_name:
            ts100_device.append(device)
            print(device_name)
   
    return ts100_device

main 함수에서 비동기 이벤트 루프를 사용해서 scan 함수를 호출하여 scan 함수가 종료될 때 까지 기다린다.

if __name__ == '__main__':
    # 비동기 이벤트 루프 생성
    loop = asyncio.get_event_loop()
    # 비동기 형태로 Scan 함수 실행
    scan_devices = loop.run_until_complete(scan())

이후 코드를 실행하면 콘솔에 scan된 바나나 체온계 기기를 확인할 수 있다.

지금까지의 내용이 반영된 코드이다.

import asyncio # 비동기 프로그래밍을 위한 모듈
from typing import Optional # 타입 힌트를 위한 모듈
from bleak import BleakScanner, BleakClient # Bluetooth Low Energy(BLE) 통신을 위한 모듈

# 비동기 형태로 BLE 장치 검색
async def scan():
    ts100_device = []

    print('Scan Start')
    # scan start
    scan_devices = await BleakScanner.discover()
    print('Scan End')

    # 스캔된 device 탐색
    for device in scan_devices:
        # 이름이 없는 Device "Unknown Device로 변경"
        device_name = device.name if device.name else "Unknown Device"
        # 이름에 TS100을 포함하면 콘솔에 출력
        if "TS100" in device_name:
            ts100_device.append(device)
            print(device_name)
   
    return ts100_device

if __name__ == '__main__':
    # 비동기 이벤트 루프 생성
    loop = asyncio.get_event_loop()
    # 비동기 형태로 Scan 함수 실행
    scan_devices = loop.run_until_complete(scan())

블루투스 기기 선택

스캔된 BLE(Bluetooth Low Energy) 기기의 정보를 사용자에게 시각적으로 보여주기 위해, GUI(그래픽 사용자 인터페이스)를 만들 필요가 있다. 이를 위해 Python의 표준 GUI 라이브러리인 tkinter를 사용한다.

tkinter는 Python에 기본으로 포함된 GUI 툴킷이다. 별도의 설치 없이 바로 사용할 수 있다. 'tk'라는 별칭으로 import하는 것이 일반적이다.

주요 기능으로는 윈도우를 생성하여 애플리케이션의 메인 창을 만들 수 있고, 버튼, 레이블, 텍스트 박스, 리스트 박스 등 다양한 UI 요소를 제공한다. 이 뿐만 아니라, 사용자의 마우스 클릭, 키보드 입력 등의 이벤트를 처리할 수있고, 위젯들을 원하는 위치에 배치할 수 있는 다양한 레이아웃 관리 기능도 제공한다.

이렇게 tkinter를 사용함으로써, 사용자는 터미널에서 텍스트로만 정보를 보는 것이 아니라, 시각적으로 더 이해하기 쉽고 상호작용이 가능한 형태로 BLE 기기 정보를 확인할 수 있다.

import asyncio # 비동기 프로그래밍을 위한 모듈
from typing import Optional # 타입 힌트를 위한 모듈
from bleak import BleakScanner, BleakClient # Bluetooth Low Energy(BLE) 통신을 위한 모듈
import tkinter as tk # 추가된 부분(GUI 관련 라이브러리)

Scan된 device를 확인할 수 있는 show_scan_devices() 함수를 추가한다. Scan된 device가 화면에 버튼 형태로 나타나고, 버튼을 클릭했을 때 button_click() 함수가 호출되어 화면이 사라지는 형태로 구현되었다.

# GUI로 화면에 나타내는 함수
def show_scan_devices(devices):
    if not devices:
        return False

    window = tk.Tk()
    # 화면 제목
    window.title("TS100 Scan List")

    buttons = []
    for device in devices:
        text = device.name
        # 버튼 이벤트 추가(button_click 함수 호출)
        button = tk.Button(window, text=text, command=lambda device=device: button_click(device, window))
        button.pack()
        buttons.append(button)

    # 윈도우 종료까지 실행
    window.mainloop()


# 버튼 클릭 시 호출되는 함수
def button_click(device, window):
    print("Select Device is " + device.name)

    # GUI 제거
    window.quit()


if __name__ == '__main__':
    # 비동기 이벤트 루프 생성
    loop = asyncio.get_event_loop()
    # 비동기 형태로 Scan함수 실행
    scan_devices = loop.run_until_complete(scan())
    # Scan된 기기 화면으로 출력
    show_scan_devices(scan_devices)

위 코드를 실행하면 그림과 같이 화면에 Scan된 기기를 확인할 수 있고, 연결하고자 하는 Device를 선택하면 화면이 종료된다.

지금까지의 내용이 반영된 코드이다.

import asyncio # 비동기 프로그래밍을 위한 모듈
from typing import Optional # 타입 힌트를 위한 모듈
from bleak import BleakScanner, BleakClient # Bluetooth Low Energy(BLE) 통신을 위한 모듈
import tkinter as tk # GUI 라이브러리

# 비동기 형태로 BLE 장치 검색
async def scan():
    ts100_device = []

    print('Scan Start')
    # scan start
    scan_devices = await BleakScanner.discover()
    print('Scan End')

    # 스캔된 device 탐색
    for device in scan_devices:
        # 이름이 없는 Device "Unknown Device로 변경"
        device_name = device.name if device.name else "Unknown Device"
        # 이름에 TS100을 포함하면 콘솔에 출력
        if "TS100" in device_name:
            ts100_device.append(device)
            print(device_name)
   
    return ts100_device


# GUI로 화면에 나타내는 함수
def show_scan_devices(devices):
    if not devices:
        return False

    window = tk.Tk()
    # 화면 제목
    window.title("TS100 Scan List")

    buttons = []
    for device in devices:
        text = device.name
        # 버튼 이벤트 추가(button_click 함수 호출)
        button = tk.Button(window, text=text, command=lambda device=device: button_click(device, window))
        button.pack()
        buttons.append(button)

    # 윈도우 종료까지 실행
    window.mainloop()


# 버튼 클릭 시 호출되는 함수
def button_click(device, window):
    print("Select Device is " + device.name)

    # GUI 제거
    window.quit()

if __name__ == '__main__':
    # 비동기 이벤트 루프 생성
    loop = asyncio.get_event_loop()
    # 비동기 형태로 Scan함수 실행
    scan_devices = loop.run_until_complete(scan())
    # Scan된 기기 화면으로 출력
    show_scan_devices(scan_devices)

블루투스 기기 연결

Scan된 device를 담는 전역변수 selected_device를 추가한다.

selected_device = None

button_click() 함수에 연결하고자 하는 바나나 체온계를 selected_device 변수에 담는 코드를 추가한다.

def button_click(device, window):
    global selected_device # 전역변수 함수내 사용하도록 설정
    selected_device = device # 선택한 Device selected_device 변수에 담기

    # remove gui
    window.quit()

블루투스 기기를 연결하기 위해 connect 함수를 추가한다. 비동기 처리를 위해 async 함수로 구현한다.

# 선택한 바나나 체온계에 연결하는 함수
async def connect(candidate_device):
    print('connect start')

    selected_client = BleakClient(candidate_device)

    try:
        # 장치 연결 시작
        await selected_client.connect()
        print('connected')
    except Exception as e:
        # 연결 실패시 발생
        print('error: ', e, end='')
        return None

    return selected_client

main 함수에서 연결요청을 수행할 device를 선택했을 때 connect 과정을 수행하는 코드를 추가한다.

if __name__ == '__main__':
    # 비동기 이벤트 루프 생성
    loop = asyncio.get_event_loop()
    # 비동기 형태로 Scan함수 실행
    scan_devices = loop.run_until_complete(scan())
    # Scan된 기기 화면으로 출력
    show_scan_devices(scan_devices)

    connected_device = None
    # 선택한 기기가 None이 아니라면
    if selected_device is not None:
        # connect 요청 후 함수가 끝날때까지 기다림
        connected_device = loop.run_until_complete(connect(selected_device))

위 코드를 실행하면 콘솔(Terminal)에 다음과 같이 출력된다.이 출력되는것을 확인할 수 있다.

처음 페어링을 진행하는 경우 PIN Code를 입력해야한다.

바나나 체온계 뒷면의 PIN Code 6자리를 입력하면 된다.

지금까지의 내용이 반영된 코드이다. 다음은 위 과정을 모두 수행한 코드이다.

import asyncio # 비동기 프로그래밍을 위한 모듈
from typing import Optional # 타입 힌트를 위한 모듈
from bleak import BleakScanner, BleakClient # Bluetooth Low Energy(BLE) 통신을 위한 모듈
import tkinter as tk # GUI 라이브러리

# 비동기 형태로 BLE 장치 검색
async def scan():
    ts100_device = []

    print('Scan Start')
    # scan start
    scan_devices = await BleakScanner.discover()
    print('Scan End')

    # 스캔된 device 탐색
    for device in scan_devices:
        # 이름이 없는 Device "Unknown Device로 변경"
        device_name = device.name if device.name else "Unknown Device"
        # 이름에 TS100을 포함하면 콘솔에 출력
        if "TS100" in device_name:
            ts100_device.append(device)
            print(device_name)
   
    return ts100_device


# GUI로 화면에 나타내는 함수
def show_scan_devices(devices):
    if not devices:
        return False

    window = tk.Tk()
    # 화면 제목
    window.title("TS100 Scan List")

    buttons = []
    for device in devices:
        text = device.name
        # 버튼 이벤트 추가(button_click 함수 호출)
        button = tk.Button(window, text=text, command=lambda device=device: button_click(device, window))
        button.pack()
        buttons.append(button)

    # 윈도우 종료까지 실행
    window.mainloop()


# 버튼 클릭 시 호출되는 함수
def button_click(device, window):
    global selected_device # 전역변수 함수내 사용하도록 설정
    selected_device = device # 선택한 Device selected_device 변수에 담기

    # remove gui
    window.quit()


# 선택한 바나나 체온계에 연결하는 함수
async def connect(candidate_device):
    print('connect start')

    selected_client = BleakClient(candidate_device)

    try:
        # 장치 연결 시작
        await selected_client.connect()
        print('connected')
    except Exception as e:
        # 연결 실패시 발생
        print('error: ', e, end='')
        return None

    return selected_client


selected_device = None


if __name__ == '__main__':
    # 비동기 이벤트 루프 생성
    loop = asyncio.get_event_loop()
    # 비동기 형태로 Scan함수 실행
    scan_devices = loop.run_until_complete(scan())
    # Scan된 기기 화면으로 출력
    show_scan_devices(scan_devices)


    connected_device = None
    # 선택한 기기가 None이 아니라면
    if selected_device is not None:
        # connect 요청 후 함수가 끝날때까지 기다림
        connected_device = loop.run_until_complete(connect(selected_device))

Service, Characteristic 요청

블루투스 연결이 완료 후, 바나나 체온계에서 제공하는 전체 Service와 Characteristic을 확인하기 위해 바나나 체온계에 Service와 Characteristic을 요청한다.

Service 요청을 위한 get_service_and_characterisc 함수를 추가한다.

# Service, Characteristic 확인하는 함수
async def get_service_and_characteristic(connected_device):
    # Service 확인
    for service in connected_device.services:
        print(service)
        # Characteristic 확인
        for characteristic in service.characteristics:
            print(characteristic)
       
        print()

get_service_and_characteristic 함수를 main함수에서 호출한다.

if __name__ == '__main__':
    # 비동기 이벤트 루프 생성
    loop = asyncio.get_event_loop()
    # 비동기 형태로 Scan함수 실행
    scan_devices = loop.run_until_complete(scan())
    # Scan된 기기 화면으로 출력
    show_scan_devices(scan_devices)

    connected_device = None
    if selected_device is not None:
        connected_device = loop.run_until_complete(connect(selected_device))
        print("device is " + selected_device.name)

    # 바나나 체온계와 연결되어 있다면
    if connected_device is not None:
        # Service, Characteristic 출력하는 함수 호출
        loop.run_until_complete(get_service_and_characteristic(connected_device))

위 코드를 실행하여 바나나 체온계에서 제공되는 Service와 Characteristic을 살펴볼 수 있다.

지금까지의 내용이 반영된 코드이다.

import asyncio # 비동기 프로그래밍을 위한 모듈
from typing import Optional # 타입 힌트를 위한 모듈
from bleak import BleakScanner, BleakClient # Bluetooth Low Energy(BLE) 통신을 위한 모듈
import tkinter as tk # GUI 라이브러리

# 비동기 형태로 BLE 장치 검색
async def scan():
    ts100_device = []

    print('Scan Start')
    # scan start
    scan_devices = await BleakScanner.discover()
    print('Scan End')

    # 스캔된 device 탐색
    for device in scan_devices:
        # 이름이 없는 Device "Unknown Device로 변경"
        device_name = device.name if device.name else "Unknown Device"
        # 이름에 TS100을 포함하면 콘솔에 출력
        if "TS100" in device_name:
            ts100_device.append(device)
            print(device_name)
   
    return ts100_device

# GUI로 화면에 나타내는 함수
def show_scan_devices(devices):
    if not devices:
        return False

    window = tk.Tk()
    # 화면 제목
    window.title("TS100 Scan List")

    buttons = []
    for device in devices:
        text = device.name
        # 버튼 이벤트 추가(button_click 함수 호출)
        button = tk.Button(window, text=text, command=lambda device=device: button_click(device, window))
        button.pack()
        buttons.append(button)

    # 윈도우 종료까지 실행
    window.mainloop()


# 버튼 클릭 시 호출되는 함수
def button_click(device, window):
    global selected_device # 전역변수 함수내 사용하도록 설정
    selected_device = device # 선택한 Device selected_device 변수에 담기

    # remove gui
    window.quit()

# 선택한 바나나 체온계에 연결하는 함수
async def connect(candidate_device):
    print('connect start')

    selected_client = BleakClient(candidate_device)

    try:
        # 장치 연결 시작
        await selected_client.connect()
        print('connected')
    except Exception as e:
        # 연결 실패시 발생
        print('error: ', e, end='')
        return None

    return selected_client

# Service, Characteristic 확인하는 함수
async def get_service_and_characteristic(connected_device):
    # Service 확인
    for service in connected_device.services:
        print(service)
        # Characteristic 확인
        for characteristic in service.characteristics:
            print(characteristic)
       
        print()

selected_device = None

if __name__ == '__main__':
    # 비동기 이벤트 루프 생성
    loop = asyncio.get_event_loop()
    # 비동기 형태로 Scan함수 실행
    scan_devices = loop.run_until_complete(scan())
    # Scan된 기기 화면으로 출력
    show_scan_devices(scan_devices)

    connected_device = None
    # 선택한 기기가 None이 아니라면
    if selected_device is not None:
        # connect 요청 후 함수가 끝날때까지 기다림
        connected_device = loop.run_until_complete(connect(selected_device))
    # 바나나 체온계와 연결되어 있다면
    if connected_device is not None:
        # Service, Characteristic 출력하는 함수 호출
        loop.run_until_complete(get_service_and_characteristic(connected_device))

온도정보 가져오기

바나나 체온계의 여러 characteristic 중 온도정보를 받아오는 기능을 구현한다. 온도 정보는 characteristic 에서 확인할 수 있다.

Service: Health Thermometer(00001809-0000-1000-8000-00805f9b34fb)

Characteristic: Temperature Measurement(00002a1c-0000-1000-8000-00805f9b34fb)

온도정보 수신

여러 Service 중 Health Thermometer와 관련된 service만 수신받기 위해 get_service_and_characteristic 함수를 수정한다.

`# Service UUID
health_thermometer_service = '00001809-0000-1000-8000-00805f9b34fb'


async def get_service_and_characteristic(connected_device):
    # Service 확인(제대로 연결되었는지 테스트용)
    for service in connected_device.services:
        # Health Thermometer 관련 Service일때만 Characteristic 검색
        if service.uuid == health_thermometer_service:
            for characteristic in service.characteristics:
                print(characteristic)
       
        print()

이후 온도정보 characteristic을 통해 notify를 수신하기 위해 함수를 수정한다.

# Characteristic
temperature_characteristic = '00002a1c-0000-1000-8000-00805f9b34fb'


async def get_service_and_characteristic(connected_device):
    # Service 확인(제대로 연결되었는지 테스트용)
    for service in connected_device.services:
        # Health Thermometer 관련 Service일때만 Characteristic 검색
        if service.uuid == health_thermometer_service:
            for characteristic in service.characteristics:
                # 온도정보 notify 설정
                if characteristic.uuid == temperature_characteristic:
                    await connected_device.start_notify(characteristic, notify_callback)

# notify를 수신했을 때 호출되는 함수
def notify_callback(handle, data):
    print(data)

연결된 기기가 종료될 때까지 이벤트 루프를 종료하지 않기 위해 wait_connect 함수를 추가하고 main을 수정한다.

# 이벤트루프 종료를 방지하기위한 함수
async def wait_connect():
    while connected_device.is_connected:
        await asyncio.sleep(1)

if __name__ == '__main__':
    # 비동기 이벤트 루프 생성
    loop = asyncio.get_event_loop()
    # 비동기 형태로 Scan함수 실행
    scan_devices = loop.run_until_complete(scan())
    # Scan된 기기 화면으로 출력
    show_scan_devices(scan_devices)

    connected_device = None
    # 선택한 기기가 None이 아니라면
    if selected_device is not None:
        # connect 요청 후 함수가 끝날때까지 기다림
        connected_device = loop.run_until_complete(connect(selected_device))

    # 바나나 체온계바나나 체온계바나나 체온계바나나 체온계 연결되어 있다면
    if connected_device is not None:
        # Service, Characteristic 출력하는 함수 호출
        loop.run_until_complete(get_service_and_characteristic(connected_device))

    # 종료 방지함수 호출
    loop.run_until_complete(wait_connect())

이후 코드를 실행하면 온도정보가 콘솔창에 출력되는것을 확인할 수 있다. 현재 wait_connect 함수는 별도의 종료를 하지않으면 코드가 계속 실행되므로 우측 상단의 쓰레기통 아이콘을 클릭해 코드실행을 종료한다.

지금까지의 내용이 반영된 코드이다.

import asyncio # 비동기 프로그래밍을 위한 모듈
from typing import Optional # 타입 힌트를 위한 모듈
from bleak import BleakScanner, BleakClient # Bluetooth Low Energy(BLE) 통신을 위한 모듈
import tkinter as tk # GUI 라이브러리

# 비동기 형태로 BLE 장치 검색
async def scan():
    ts100_device = []

    print('Scan Start')
    # scan start
    scan_devices = await BleakScanner.discover()
    print('Scan End')

    # 스캔된 device 탐색
    for device in scan_devices:
        # 이름이 없는 Device "Unknown Device로 변경"
        device_name = device.name if device.name else "Unknown Device"
        # 이름에 TS100을 포함하면 콘솔에 출력
        if "TS100" in device_name:
            ts100_device.append(device)
            print(device_name)
   
    return ts100_device


# GUI로 화면에 나타내는 함수
def show_scan_devices(devices):
    if not devices:
        return False

    window = tk.Tk()
    # 화면 제목
    window.title("TS100 Scan List")

    buttons = []
    for device in devices:
        text = device.name
        # 버튼 이벤트 추가(button_click 함수 호출)
        button = tk.Button(window, text=text, command=lambda device=device: button_click(device, window))
        button.pack()
        buttons.append(button)

    # 윈도우 종료까지 실행
    window.mainloop()


# 버튼 클릭 시 호출되는 함수
def button_click(device, window):
    global selected_device # 전역변수 함수내 사용하도록 설정
    selected_device = device # 선택한 Device selected_device 변수에 담기

    # remove gui
    window.quit()

# 선택한 바나나 체온계에 연결하는 함수
async def connect(candidate_device):
    print('connect start')

    selected_client = BleakClient(candidate_device)

    try:
        # 장치 연결 시작
        await selected_client.connect()
        print('connected')
    except Exception as e:
        # 연결 실패시 발생
        print('error: ', e, end='')
        return None

    return selected_client

# Service, Charactristic 확인하는 함수
async def get_service_and_characteristic(connected_device):
    # Service 확인(제대로 연결되었는지 테스트용)
    for service in connected_device.services:
        # Health Thermometer 관련 Service일때만 Characteristic 검색
        if service.uuid == health_thermometer_service:
            for characteristic in service.characteristics:
                # 온도정보 notify 설정
                if characteristic.uuid == temperature_characteristic:
                    await connected_device.start_notify(characteristic, notify_callback)

# notify를 수신했을 때 호출되는 함수
def notify_callback(handle, data):
    print(data)

# 이벤트루프 종료를 방지하기위한 함수
async def wait_connect():
    while connected_device.is_connected:
        await asyncio.sleep(1)

selected_device = None

# Service, Characteristic
health_thermometer_service = '00001809-0000-1000-8000-00805f9b34fb'
temperature_characteristic = '00002a1c-0000-1000-8000-00805f9b34fb'

if __name__ == '__main__':
    # 비동기 이벤트 루프 생성
    loop = asyncio.get_event_loop()
    # 비동기 형태로 Scan함수 실행
    scan_devices = loop.run_until_complete(scan())
    # Scan된 기기 화면으로 출력
    show_scan_devices(scan_devices)

    connected_device = None
    # 선택한 기기가 None이 아니라면
    if selected_device is not None:
        # connect 요청 후 함수가 끝날때까지 기다림
        connected_device = loop.run_until_complete(connect(selected_device))

    # 바나나 체온계와 연결되어 있다면
    if connected_device is not None:
        # Service, Characteristic 출력하는 함수 호출
        loop.run_until_complete(get_service_and_characteristic(connected_device))

    # 종료 방지함수 호출
    loop.run_until_complete(wait_connect())

온도정보 변환

바나나 체온계로부터 수신받은 데이터는 온도와 날짜 정보로 이루어져 있다. 온도 정보를 변환하는 함수를 작성한다.

# 온도정보 계산하는 함수
def temperature_calculate(t1, t2, t3, d):
    # Int로 변환, 비트 시프트 후 합산
    int_t1 = int(t1)
    int_t2 = int(t2) << 8 # t2의 값을 왼쪽으로 8비트 shift 한다.
    int_t3 = int(t3) << 16 # t3의 값을 왼쪽으로 16비트 shift 한다.

    signed_value = int_t1 + int_t2 + int_t3

    # 자릿수 정하는 변수
    digit = int(d)
    if digit > 127:
        digit -= 256

    # 온도
    temperature = float(signed_value) * 10 ** digit
    # 소수점 1자리만 남도록 수정
    temperature = round(temperature * 10) / 10

    return temperature

날짜정보를 변환하는 함수를 작성한다.

# 날짜정보 변환하기 위해 사용하는 라이브러리
from datetime import datetime

# 날짜정보 계산하는 함수
def date_calculate(array):
    year = (array[1] & 0x0F) << 8 | (array[0] & 0x00FF)
    month = array[2]
    day = array[3]
    hour = array[4]
    minute = array[5]
    second = array[6]

    date_component = datetime(year, month, day, hour, minute, second)

    return date_component

기존에 작성했던 notify_callback 함수를 온도정보와 날짜정보를 받아와 출력하는 함수로 변경한다.

# notify를 수신했을 때 호출되는 함수
def notify_callback(handle, data):
    parse_temperature_information(data)

# 온도정보 변환
def parse_temperature_information(data: bytearray):
    # 온도정보 변환
    temperature = temperature_calculate(data[1], data[2], data[3], data[4])

    array_slice = data[5:]
    date_array = list(map(int, array_slice))
    # 날짜정보 변환
    date = date_calculate(date_array)


    # 온도, 날짜정보 출력
    print(date)
    print(temperature)

이제 프로그램을 이후 프로젝트를 실행하면 변환된 온도정보가 출력된다.

지금까지의 내용이 반영된 코드이다.

import asyncio # 비동기 프로그래밍을 위한 모듈
from typing import Optional # 타입 힌트를 위한 모듈
from bleak import BleakScanner, BleakClient # Bluetooth Low Energy(BLE) 통신을 위한 모듈
import tkinter as tk # GUI 라이브러리

# 날짜정보 변환하기 위해 사용하는 라이브러리
from datetime import datetime

# 비동기 형태로 BLE 장치 검색
async def scan():
    ts100_device = []

    print('Scan Start')
    # scan start
    scan_devices = await BleakScanner.discover()
    print('Scan End')

    # 스캔된 device 탐색
    for device in scan_devices:
        # 이름이 없는 Device "Unknown Device로 변경"
        device_name = device.name if device.name else "Unknown Device"
        # 이름에 TS100을 포함하면 콘솔에 출력
        if "TS100" in device_name:
            ts100_device.append(device)
            print(device_name)
   
    return ts100_device

# GUI로 화면에 나타내는 함수
def show_scan_devices(devices):
    if not devices:
        return False

    window = tk.Tk()
    # 화면 제목
    window.title("TS100 Scan List")

    buttons = []
    for device in devices:
        text = device.name
        # 버튼 이벤트 추가(button_click 함수 호출)
        button = tk.Button(window, text=text, command=lambda device=device: button_click(device, window))
        button.pack()
        buttons.append(button)

    # 윈도우 종료까지 실행
    window.mainloop()

# 버튼 클릭 시 호출되는 함수
def button_click(device, window):
    global selected_device # 전역변수 함수내 사용하도록 설정
    selected_device = device # 선택한 Device selected_device 변수에 담기

    # remove gui
    window.quit()

# 선택한 바나나 체온계에 연결하는 함수
async def connect(candidate_device):
    print('connect start')

    selected_client = BleakClient(candidate_device)

    try:
        # 장치 연결 시작
        await selected_client.connect()
        print('connected')
    except Exception as e:
        # 연결 실패시 발생
        print('error: ', e, end='')
        return None

    return selected_client

# Service, Charactristic 확인하는 함수
async def get_service_and_characteristic(connected_device):
    # Service 확인(제대로 연결되었는지 테스트용)
    for service in connected_device.services:
        # Health Thermometer 관련 Service일때만 Characteristic 검색
        if service.uuid == health_thermometer_service:
            for characteristic in service.characteristics:
                # 온도정보 notify 설정
                if characteristic.uuid == temperature_characteristic:
                    await connected_device.start_notify(characteristic, notify_callback)

# notify를 수신했을 때 호출되는 함수
def notify_callback(handle, data):
    parse_temperature_information(data)

# 온도정보 변환
def parse_temperature_information(data: bytearray):
    # 온도정보 변환
    temperature = temperature_calculate(data[1], data[2], data[3], data[4])

    array_slice = data[5:]
    date_array = list(map(int, array_slice))
    # 날짜정보 변환
    date = date_calculate(date_array)
    # 온도, 날짜정보 출력
    print(date)
    print(temperature)

# 온도정보 계산하는 함수
def temperature_calculate(t1, t2, t3, d):
    # Int로 변환, 비트 시프트 후 합산
    int_t1 = int(t1)
    int_t2 = int(t2) << 8 # t2의 값을 왼쪽으로 8비트 shift 한다.
    int_t3 = int(t3) << 16 # t3의 값을 왼쪽으로 16비트 shift 한다.

    signed_value = int_t1 + int_t2 + int_t3

    # 자릿수 정하는 변수
    digit = int(d)
    if digit > 127:
        digit -= 256

    # 온도
    temperature = float(signed_value) * 10 ** digit
    # 소수점 1자리만 남도록 수정
    temperature = round(temperature * 10) / 10

    return temperature

# 날짜정보 계산하는 함수
def date_calculate(array):
    year = (array[1] & 0x0F) << 8 | (array[0] & 0x00FF)
    month = array[2]
    day = array[3]
    hour = array[4]
    minute = array[5]
    second = array[6]

    date_component = datetime(year, month, day, hour, minute, second)

    return date_component

# 이벤트루프 종료를 방지하기위한 함수
async def wait_connect():
    while connected_device.is_connected:
        await asyncio.sleep(1)

selected_device = None

# Service, Characteristic
health_thermometer_service = '00001809-0000-1000-8000-00805f9b34fb'
temperature_characteristic = '00002a1c-0000-1000-8000-00805f9b34fb'

if __name__ == '__main__':
    # 비동기 이벤트 루프 생성
    loop = asyncio.get_event_loop()
    # 비동기 형태로 Scan함수 실행
    scan_devices = loop.run_until_complete(scan())
    # Scan된 기기 화면으로 출력
    show_scan_devices(scan_devices)

    connected_device = None
    # 선택한 기기가 None이 아니라면
    if selected_device is not None:
        # connect 요청 후 함수가 끝날때까지 기다림
        connected_device = loop.run_until_complete(connect(selected_device))


    # 바나나 체온계와 연결되어 있다면
    if connected_device is not None:
        # Service, Characteristic 출력하는 함수 호출
        loop.run_until_complete(get_service_and_characteristic(connected_device))

    # 종료 방지함수 호출
    loop.run_until_complete(wait_connect())

날짜정보 수정

현재 바나나 체온계에서 수신되는 날짜정보는 현재 시간과 맞지 않는다. 올바른 시간을 확인하려면 바나나 체온계에 현재 시간을 알려주는 과정이 필요하다.

다음은 날짜와 관련된 특성은 이다.

Service: Health Thermometer(00001809-0000-1000-8000-00805f9b34fb)

Characteristic: Date Time(00002a08-0000-1000-8000-00805f9634fb)

현재 시간을 리턴하는 함수를 구현한다.

# 현재 시간을 리턴하는 메서드(byte 타입)
def current_date():
    current_datetime = datetime.now()

    year = current_datetime.year
    month = current_datetime.month
    day = current_datetime.day
    hour = current_datetime.hour
    minute = current_datetime.minute
    second = current_datetime.second

    result = [
        year & 0xFF,
        (year >> 8) & 0xFF,
        month & 0xFF,
        day & 0xFF,
        hour & 0xFF,
        minute & 0xFF,
        second & 0xFF
    ]

    return bytes(result)

Date Time Characteristic이 수신되었을 때 현재 날씨를 업로드하기 위해 get_service_and_characteristic 함수를 수정한다.

# Service, Characteristic
date_time_characteristic = '00002a08-0000-1000-8000-00805f9b34fb'
temperature_characteristic = '00002a1c-0000-1000-8000-00805f9b34fb'


async def get_service_and_characteristic(connected_device):
    # Service 확인(제대로 연결되었는지 테스트용)
    for service in connected_device.services:
        # Health Thermometer 관련 Service일때만 Characteristic 검색
        if service.uuid == health_thermometer_service:
            for characteristic in service.characteristics:
                # 날짜정보 Update(추가된 부분)
                if characteristic.uuid == date_time_characteristic:
                    await connected_device.write_gatt_char(characteristic, current_date())

                # 온도정보 notify 설정
                if characteristic.uuid == temperature_characteristic:
                    await connected_device.start_notify(characteristic, notify_callback)

# notify를 수신했을 때 호출되는 함수
def notify_callback(handle, data):
    parse_temperature_information(data)

# 온도정보 변환
def parse_temperature_information(data: bytearray):
    # 온도정보 변환
    temperature = temperature_calculate(data[1], data[2], data[3], data[4])

    array_slice = data[5:]
    date_array = list(map(int, array_slice))
    # 날짜정보 변환
    date = date_calculate(date_array)


    # 온도, 날짜정보 출력
    print(date)
    print(temperature)

이후 코드를 실행하면 올바른 날짜로 수정되어 온도정보가 수신되는것을 확인할 수 있다.

지금까지의 내용이 반영된 코드이다.

import asyncio # 비동기 프로그래밍을 위한 모듈
from typing import Optional # 타입 힌트를 위한 모듈
from bleak import BleakScanner, BleakClient # Bluetooth Low Energy(BLE) 통신을 위한 모듈
import tkinter as tk # GUI 라이브러리

# 날짜정보 변환하기 위해 사용하는 라이브러리
from datetime import datetime

# 비동기 형태로 BLE 장치 검색
async def scan():
    ts100_device = []

    print('Scan Start')
    # scan start
    scan_devices = await BleakScanner.discover()
    print('Scan End')

    # 스캔된 device 탐색
    for device in scan_devices:
        # 이름이 없는 Device "Unknown Device로 변경"
        device_name = device.name if device.name else "Unknown Device"
        # 이름에 TS100을 포함하면 콘솔에 출력
        if "TS100" in device_name:
            ts100_device.append(device)
            print(device_name)
   
    return ts100_device

# GUI로 화면에 나타내는 함수
def show_scan_devices(devices):
    if not devices:
        return False

    window = tk.Tk()
    # 화면 제목
    window.title("TS100 Scan List")

    buttons = []
    for device in devices:
        text = device.name
        # 버튼 이벤트 추가(button_click 함수 호출)
        button = tk.Button(window, text=text, command=lambda device=device: button_click(device, window))
        button.pack()
        buttons.append(button)

    # 윈도우 종료까지 실행
    window.mainloop()


# 버튼 클릭 시 호출되는 함수
def button_click(device, window):
    global selected_device # 전역변수 함수내 사용하도록 설정
    selected_device = device # 선택한 Device selected_device 변수에 담기

    # remove gui
    window.quit()

# 선택한 바나나 체온계에 연결하는 함수
async def connect(candidate_device):
    print('connect start')

    selected_client = BleakClient(candidate_device)

    try:
        # 장치 연결 시작
        await selected_client.connect()
        print('connected')
    except Exception as e:
        # 연결 실패시 발생
        print('error: ', e, end='')
        return None

    return selected_client

# Service, Charactristic 확인하는 함수
async def get_service_and_characteristic(connected_device):
    # Service 확인(제대로 연결되었는지 테스트용)
    for service in connected_device.services:
        # Health Thermometer 관련 Service일때만 Characteristic 검색
        if service.uuid == health_thermometer_service:
            for characteristic in service.characteristics:
                # 날짜정보 Update(추가된 부분)
                if characteristic.uuid == date_time_characteristic:
                    await connected_device.write_gatt_char(characteristic, current_date())

                # 온도정보 notify 설정
                if characteristic.uuid == temperature_characteristic:
                    await connected_device.start_notify(characteristic, notify_callback)

# notify를 수신했을 때 호출되는 함수
def notify_callback(handle, data):
    parse_temperature_information(data)

# 온도정보 변환
def parse_temperature_information(data: bytearray):
    # 온도정보 변환
    temperature = temperature_calculate(data[1], data[2], data[3], data[4])

    array_slice = data[5:]
    date_array = list(map(int, array_slice))
    # 날짜정보 변환
    date = date_calculate(date_array)
    # 온도, 날짜정보 출력
    print(date)
    print(temperature)

# 온도정보 계산하는 함수
def temperature_calculate(t1, t2, t3, d):
    # Int로 변환, 비트 시프트 후 합산
    int_t1 = int(t1)
    int_t2 = int(t2) << 8 # t2의 값을 왼쪽으로 8비트 shift 한다.
    int_t3 = int(t3) << 16 # t3의 값을 왼쪽으로 16비트 shift 한다.

    signed_value = int_t1 + int_t2 + int_t3

    # 자릿수 정하는 변수
    digit = int(d)
    if digit > 127:
        digit -= 256

    # 온도
    temperature = float(signed_value) * 10 ** digit
    # 소수점 1자리만 남도록 수정
    temperature = round(temperature * 10) / 10

    return temperature

# 날짜정보 계산하는 함수
def date_calculate(array):
    year = (array[1] & 0x0F) << 8 | (array[0] & 0x00FF)
    month = array[2]
    day = array[3]
    hour = array[4]
    minute = array[5]
    second = array[6]

    date_component = datetime(year, month, day, hour, minute, second)

    return date_component

# 현재 시간을 리턴하는 메서드(byte 타입)
def current_date():
    current_datetime = datetime.now()

    year = current_datetime.year
    month = current_datetime.month
    day = current_datetime.day
    hour = current_datetime.hour
    minute = current_datetime.minute
    second = current_datetime.second

    result = [
        year & 0xFF,
        (year >> 8) & 0xFF,
        month & 0xFF,
        day & 0xFF,
        hour & 0xFF,
        minute & 0xFF,
        second & 0xFF
    ]

    return bytes(result)

# 이벤트루프 종료를 방지하기위한 함수
async def wait_connect():
    while connected_device.is_connected:
        await asyncio.sleep(1)

selected_device = None

# Service, Characteristic
health_thermometer_service = '00001809-0000-1000-8000-00805f9b34fb'

date_time_characteristic = '00002a08-0000-1000-8000-00805f9b34fb'
temperature_characteristic = '00002a1c-0000-1000-8000-00805f9b34fb'

if __name__ == '__main__':
    # 비동기 이벤트 루프 생성
    loop = asyncio.get_event_loop()
    # 비동기 형태로 Scan함수 실행
    scan_devices = loop.run_until_complete(scan())
    # Scan된 기기 화면으로 출력
    show_scan_devices(scan_devices)

    connected_device = None
    # 선택한 기기가 None이 아니라면
    if selected_device is not None:
        # connect 요청 후 함수가 끝날때까지 기다림
        connected_device = loop.run_until_complete(connect(selected_device))


    # 바나나 체온계와 연결되어 있다면
    if connected_device is not None:
        # Service, Characteristic 출력하는 함수 호출
        loop.run_until_complete(get_service_and_characteristic(connected_device))

    # 종료 방지함수 호출
    loop.run_until_complete(wait_connect())

데이터 저장

바나나 체온계로부터 수신한 온도정보를 데이터베이스를 사용하여 저장한다.

데이터베이스란

데이터베이스란 여러 사람이 공유하고 사용할 목적으로 통합 관리되는 정보의 집합이다. 몇 개의 자료 파일을 조직적으로 통합하여 자료 항목의 중복을 없애고 자료를 구조화하여 기억시켜 놓은 자료의 집합체라고 할 수 있다.

일반적으로 데이터베이스는 다음과 같은 기능을 제공한다

데이터 저장

  • 데이터를 체계적으로 저장하여 나중에 쉽게 검색하고 사용할 수 있다. 예를 들어, 바나나 체온계에서 측정한 온도 데이터를 날짜와 시간별로 저장할 수 있다.

데이터 검색

  • 저장된 데이터 중 원하는 정보를 쉽게 찾을 수 있다. 예를 들어, 특정 날짜의 체온 데이터를 검색할 수 있다.

데이터 수정

  • 저장된 데이터를 필요에 따라 수정할 수 있다. 예를 들어, 잘못된 온도 기록을 수정할 수 있다.

데이터 삭제

  • 더 이상 필요하지 않은 데이터를 삭제할 수 있다. 예를 들어, 오래된 데이터를 삭제하여 저장 공간을 확보할 수 있다.

데이터베이스의 구성 요소는 다음과 같다.

테이블

  • 데이터를 행(row)과 열(column)로 구성된 형태로 저장하는 가장 기본적인 단위이다. 예를 들어, 체온 데이터를 저장하는 테이블에는 날짜, 시간, 체온 등의 열이 있을 수 있다.

레코드

  • 테이블의 각 행을 레코드라고 한다. 각 레코드는 하나의 데이터 항목에 대한 정보를 포함한다. 예를 들어, 특정 시간에 측정된 체온 정보가 하나의 레코드가 된다.

필드

  • 테이블의 각 열을 필드라고 한다. 필드는 데이터의 특정 속성을 나타낸다. 예를 들어, 체온 테이블에서 날짜, 시간, 체온은 각각 하나의 필드가 된다.

기본 키

  • 각 레코드를 고유하게 식별할 수 있는 필드이다. 예를 들어, 체온 데이터를 고유하게 식별하기 위해 날짜와 시간을 조합하여 기본 키로 사용할 수 있다.

데이터베이스의 장점은 다음과 같다.

데이터 일관성

  • 데이터베이스는 데이터를 중복 없이 저장하고 관리하기 때문에 데이터의 일관성을 유지할 수 있다.

데이터 무결성

  • 데이터베이스는 저장된 데이터가 정확하고 신뢰할 수 있도록 다양한 제약 조건을 설정할 수 있다.

데이터 보안

  • 데이터베이스는 접근 권한을 설정하여 데이터를 보호할 수 있다. 예를 들어, 특정 사용자만 데이터를 조회하거나 수정할 수 있도록 제한할 수 있다.

이러한 데이터베이스의 기능과 구성 요소를 이해하면, 데이터를 효율적으로 저장하고 관리할 수 있게 된다. 초보자들도 데이터베이스를 활용하여 체계적으로 데이터를 다룰 수 있는 능력을 기를 수 있다.

이러한 장점때문에 여기서도 데이터베이스를 사용해서 데이터를 저장한다.

데이터베이스 저장

여기서는 별도의 라이브러리를 설치하지 않고, 단일 파일로 저장되어 간편하게 사용할 수 있는 데이터베이스를 사용하기 위한 툴로 SQLite를 사용한다. sqlite를 import하는 코드를 추가한다.

import sqlite3

이후 데이터베이스를 생성하기 위해 코드를 추가한다.

# 데이터베이스 객체 생성(temperature.db 파일 생성됨)
conn = sqlite3.connect("temperature.db")
# SQL 동작을 위해 cursor 오브젝트 생성
cursor = conn.cursor()

if __name__ == '__main__':
    # 테이블 생성(한번만 실행되도록 try-catch 이용)
    try:
        cursor.execute("CREATE TABLE TEMPERATURE(name text, temperature double, date datetime)")
    except:
        # 이미 테이블이 실행되었다면 구문 콘솔 출력
        print("database already created")
    ...

코드를 실행하면 좌측 상단에 temperature.db가 생성된것을 확인할 수 있다.

온도정보 notify를 수신했을 때 데이터 변환 후 데이터정보를 저장할 수 있도록 코드를 변환한다.

# 온도정보 변환
def parse_temperature_information(data: bytearray):
    # 온도정보 변환
    temperature = temperature_calculate(data[1], data[2], data[3], data[4])

    array_slice = data[5:]
    date_array = list(map(int, array_slice))
    # 날짜정보 변환
    date = date_calculate(date_array)
    # 온도, 날짜정보 출력
    print(date)
    print(temperature)

    # 데이터베이스에 저장(추가된 부분)
    cursor.execute("INSERT INTO TEMPERATURE (name, temperature, date) VALUES (?, ?, ?)", (selected_device.name, temperature, date))
    conn.commit()

코드작성 후 실행하면 올바르게 데이터베이스에 온도정보가 저장된다. 저장된 결과는 다음 장인 "온도정보 확인"에서 볼 수 있다.

지금까지의 내용이 반영된 코드이다.

import asyncio # 비동기 프로그래밍을 위한 모듈
from typing import Optional # 타입 힌트를 위한 모듈
from bleak import BleakScanner, BleakClient # Bluetooth Low Energy(BLE) 통신을 위한 모듈
import tkinter as tk # GUI 라이브러리

# 날짜정보 변환하기 위해 사용하는 라이브러리
from datetime import datetime

import sqlite3

# 비동기 형태로 BLE 장치 검색
async def scan():
    ts100_device = []

    print('Scan Start')
    # scan start
    scan_devices = await BleakScanner.discover()
    print('Scan End')

    # 스캔된 device 탐색
    for device in scan_devices:
        # 이름이 없는 Device "Unknown Device로 변경"
        device_name = device.name if device.name else "Unknown Device"
        # 이름에 TS100을 포함하면 콘솔에 출력
        if "TS100" in device_name:
            ts100_device.append(device)
            print(device_name)
   
    return ts100_device

# GUI로 화면에 나타내는 함수
def show_scan_devices(devices):
    if not devices:
        return False

    window = tk.Tk()
    # 화면 제목
    window.title("TS100 Scan List")

    buttons = []
    for device in devices:
        text = device.name
        # 버튼 이벤트 추가(button_click 함수 호출)
        button = tk.Button(window, text=text, command=lambda device=device: button_click(device, window))
        button.pack()
        buttons.append(button)

    # 윈도우 종료까지 실행
    window.mainloop()


# 버튼 클릭 시 호출되는 함수
def button_click(device, window):
    global selected_device # 전역변수 함수내 사용하도록 설정
    selected_device = device # 선택한 Device selected_device 변수에 담기

    # remove gui
    window.quit()

# 선택한 바나나 체온계에 연결하는 함수
async def connect(candidate_device):
    print('connect start')

    selected_client = BleakClient(candidate_device)

    try:
        # 장치 연결 시작
        await selected_client.connect()
        print('connected')
    except Exception as e:
        # 연결 실패시 발생
        print('error: ', e, end='')
        return None

    return selected_client

# Service, Charactristic 확인하는 함수
async def get_service_and_characteristic(connected_device):
    # Service 확인(제대로 연결되었는지 테스트용)
    for service in connected_device.services:
        # Health Thermometer 관련 Service일때만 Characteristic 검색
        if service.uuid == health_thermometer_service:
            for characteristic in service.characteristics:
                # 날짜정보 Update(추가된 부분)
                if characteristic.uuid == date_time_characteristic:
                    await connected_device.write_gatt_char(characteristic, current_date())

                # 온도정보 notify 설정
                if characteristic.uuid == temperature_characteristic:
                    await connected_device.start_notify(characteristic, notify_callback)

# notify를 수신했을 때 호출되는 함수
def notify_callback(handle, data):
    parse_temperature_information(data)

# 온도정보 변환
def parse_temperature_information(data: bytearray):
    # 온도정보 변환
    temperature = temperature_calculate(data[1], data[2], data[3], data[4])

    array_slice = data[5:]
    date_array = list(map(int, array_slice))
    # 날짜정보 변환
    date = date_calculate(date_array)
    # 온도, 날짜정보 출력
    print(date)
    print(temperature)

    # 데이터베이스에 저장(추가된 부분)
    cursor.execute("INSERT INTO TEMPERATURE (name, temperature, date) VALUES (?, ?, ?)", (selected_device.name, temperature, date))
    conn.commit()

# 온도정보 계산하는 함수
def temperature_calculate(t1, t2, t3, d):
    # Int로 변환, 비트 시프트 후 합산
    int_t1 = int(t1)
    int_t2 = int(t2) << 8 # t2의 값을 왼쪽으로 8비트 shift 한다.
    int_t3 = int(t3) << 16 # t3의 값을 왼쪽으로 16비트 shift 한다.

    signed_value = int_t1 + int_t2 + int_t3

    # 자릿수 정하는 변수
    digit = int(d)
    if digit > 127:
        digit -= 256

    # 온도
    temperature = float(signed_value) * 10 ** digit
    # 소수점 1자리만 남도록 수정
    temperature = round(temperature * 10) / 10

    return temperature

# 날짜정보 계산하는 함수
def date_calculate(array):
    year = (array[1] & 0x0F) << 8 | (array[0] & 0x00FF)
    month = array[2]
    day = array[3]
    hour = array[4]
    minute = array[5]
    second = array[6]

    date_component = datetime(year, month, day, hour, minute, second)

    return date_component

# 현재 시간을 리턴하는 메서드(byte 타입)
def current_date():
    current_datetime = datetime.now()

    year = current_datetime.year
    month = current_datetime.month
    day = current_datetime.day
    hour = current_datetime.hour
    minute = current_datetime.minute
    second = current_datetime.second

    result = [
        year & 0xFF,
        (year >> 8) & 0xFF,
        month & 0xFF,
        day & 0xFF,
        hour & 0xFF,
        minute & 0xFF,
        second & 0xFF
    ]

    return bytes(result)

# 이벤트루프 종료를 방지하기위한 함수
async def wait_connect():
    while connected_device.is_connected:
        await asyncio.sleep(1)

selected_device = None

# Service, Characteristic
health_thermometer_service = '00001809-0000-1000-8000-00805f9b34fb'

date_time_characteristic = '00002a08-0000-1000-8000-00805f9b34fb'
temperature_characteristic = '00002a1c-0000-1000-8000-00805f9b34fb'

# 데이터베이스 객체 생성(temperature.db 파일 생성됨)
conn = sqlite3.connect("temperature.db")
# SQL 동작을 위해 cursor 오브젝트 생성
cursor = conn.cursor()

if __name__ == '__main__':
    # 테이블 생성(한번만 실행되도록 try-catch 이용)
    try:
        cursor.execute("CREATE TABLE TEMPERATURE(name text, temperature double, date datetime)")
    except:
        # 이미 테이블이 실행되었다면 구문 콘솔 출력
        print("database already created")
    # 비동기 이벤트 루프 생성
    loop = asyncio.get_event_loop()
    # 비동기 형태로 Scan함수 실행
    scan_devices = loop.run_until_complete(scan())
    # Scan된 기기 화면으로 출력
    show_scan_devices(scan_devices)

    connected_device = None
    # 선택한 기기가 None이 아니라면
    if selected_device is not None:
        # connect 요청 후 함수가 끝날때까지 기다림
        connected_device = loop.run_until_complete(connect(selected_device))

    # 바나나 체온계바나나 체온계바나나 체온계바나나 체온계바나나 체온계바나나 체온계바나나 체온계바나나 체온계 연결되어 있다면
    if connected_device is not None:
        # Service, Characteristic 출력하는 함수 호출
        loop.run_until_complete(get_service_and_characteristic(connected_device))

    # 종료 방지함수 호출
    loop.run_until_complete(wait_connect())

온도정보 확인

온도정보 확인을 위해 VSC(Visual Studio Code)의 Extension → sqlite 검색 → SQLite Viewer를 설치한다.

설치완료 후 VSC(Visual Studio Code)의 Explorer → temperature.db 클릭 → Open Anyway 클릭 → SQLite Viewer를 순서대로 클릭한다.

temperature.db에 온도정보가 들어가 있는 것을 확인할 수 있다.

SQLite Viewer는 데이터가 실시간으로 업데이트되지 않으므로 데이터베이스를 새롭게 갱신하고 싶다면 좌측 상단의 Refresh 버튼을 클릭해 데이터를 갱신한다.

데이터 그래프로 그리기

온도정보를 그래프로 그리기 위해 코드를 추가해 라이브러리를 import한다.

import matplotlib.pyplot as plt

그래프에 그려질 온도정보를 담고있는 배열을 전역변수로 선언한다.

temperature_list = []

그래프 설정 함수를 추가한다.

# LineChart 변수
line_chart = None

# 그래프 설정하는 함수
def set_graph():
    global line_chart
    # Set Graph
    plt.ion()  # Activate interactive mode
    fig, ax = plt.subplots()
    line = ax.plot([], [], label='Temperature', marker='o', lw=1)
    ax.set_ylim(20, 40)
    ax.set_xlim(0, 20)
    ax.set_ylabel('Temperature (°C)')
    ax.legend()

    # 전역변수에 차트정보 설정
    line_chart = line

set_graph 함수를 호출하는 코드를 main함수에 추가한다.

if __name__ == '__main__':
    # Create Table(Call Once)
    try:
        cursor.execute("CREATE TABLE TEMPERATURE(name text, temperature double, date datetime)")
    except:
        print("database already created")

    loop = asyncio.get_event_loop()

    scan_devices = loop.run_until_complete(scan())

    show_scan_devices(scan_devices)

    connected_device = None
    if selected_device is not None:
        connected_device = loop.run_until_complete(connect(selected_device))
        print("device is " + selected_device.name)

    # 그래프 설정 함수(추가된 부분)
    set_graph()
   
    if connected_device is not None:
        # Get Service
        loop.run_until_complete(get_service_and_characteristic(connected_device))

    loop.run_until_complete(wait_connect())

parse_temperature_information 함수에 그래프 그리는 부분을 추가로 작성한다.

# parse temperature data
def parse_temperature_information(data: bytearray):
    # get temperature information
    temperature = temperature_calculate(data[1], data[2], data[3], data[4])

    array_slice = data[5:]
    date_array = list(map(int, array_slice))
    # get date information
    date = date_calculate(date_array)

    print(date)
    print(temperature)

    # save database
    cursor.execute("INSERT INTO TEMPERATURE (name, temperature, date) VALUES (?, ?, ?)", (selected_device.name, temperature, date))
    conn.commit()

    # 배열에 값 추가
    temperature_list.append(temperature)


    # 20개의 데이터만 graph_temperature_data 배열에 담을 수 있도록 설정
    graph_temperature_data = temperature_list
    if len(temperature_list) > 20:
        graph_temperature_data = temperature_list[-20:]

    # 그래프 그리기
    line_chart[0].set_data(range(len(graph_temperature_data)), graph_temperature_data)
    plt.pause(1)

이후 코드를 실행하면 온도정보가 그래프로 나타나는것을 확인할 수 있다.

지금까지의 내용이 반영된 코드이다.

import asyncio # 비동기 프로그래밍을 위한 모듈
from typing import Optional # 타입 힌트를 위한 모듈
from bleak import BleakScanner, BleakClient # Bluetooth Low Energy(BLE) 통신을 위한 모듈
import tkinter as tk # GUI 라이브러리

# 날짜정보 변환하기 위해 사용하는 라이브러리
from datetime import datetime

import sqlite3

# 그래프 관련 라이브러리
import matplotlib.pyplot as plt

# 비동기 형태로 BLE 장치 검색
async def scan():
    ts100_device = []

    print('Scan Start')
    # scan start
    scan_devices = await BleakScanner.discover()
    print('Scan End')

    # 스캔된 device 탐색
    for device in scan_devices:
        # 이름이 없는 Device "Unknown Device로 변경"
        device_name = device.name if device.name else "Unknown Device"
        # 이름에 TS100을 포함하면 콘솔에 출력
        if "TS100" in device_name:
            ts100_device.append(device)
            print(device_name)
   
    return ts100_device
# GUI로 화면에 나타내는 함수
def show_scan_devices(devices):
    if not devices:
        return False

    window = tk.Tk()
    # 화면 제목
    window.title("TS100 Scan List")

    buttons = []
    for device in devices:
        text = device.name
        # 버튼 이벤트 추가(button_click 함수 호출)
        button = tk.Button(window, text=text, command=lambda device=device: button_click(device, window))
        button.pack()
        buttons.append(button)

    # 윈도우 종료까지 실행
    window.mainloop()


# 버튼 클릭 시 호출되는 함수
def button_click(device, window):
    global selected_device # 전역변수 함수내 사용하도록 설정
    selected_device = device # 선택한 Device selected_device 변수에 담기

    # remove gui
    window.quit()

# 선택한 바나나 체온계에 연결하는 함수
async def connect(candidate_device):
    print('connect start')

    selected_client = BleakClient(candidate_device)

    try:
        # 장치 연결 시작
        await selected_client.connect()
        print('connected')
    except Exception as e:
        # 연결 실패시 발생
        print('error: ', e, end='')
        return None

    return selected_client

# Service, Charactristic 확인하는 함수
async def get_service_and_characteristic(connected_device):
    # Service 확인(제대로 연결되었는지 테스트용)
    for service in connected_device.services:
        # Health Thermometer 관련 Service일때만 Characteristic 검색
        if service.uuid == health_thermometer_service:
            for characteristic in service.characteristics:
                # 날짜정보 Update(추가된 부분)
                if characteristic.uuid == date_time_characteristic:
                    await connected_device.write_gatt_char(characteristic, current_date())

                # 온도정보 notify 설정
                if characteristic.uuid == temperature_characteristic:
                    await connected_device.start_notify(characteristic, notify_callback)

# notify를 수신했을 때 호출되는 함수
def notify_callback(handle, data):
    parse_temperature_information(data)

# 온도정보 변환
def parse_temperature_information(data: bytearray):
    # 온도정보 변환
    temperature = temperature_calculate(data[1], data[2], data[3], data[4])

    array_slice = data[5:]
    date_array = list(map(int, array_slice))
    # 날짜정보 변환
    date = date_calculate(date_array)
    # 온도, 날짜정보 출력
    print(date)
    print(temperature)

    # 데이터베이스에 저장(추가된 부분)
    cursor.execute("INSERT INTO TEMPERATURE (name, temperature, date) VALUES (?, ?, ?)", (selected_device.name, temperature, date))
    conn.commit()

    # 배열에 값 추가
    temperature_list.append(temperature)


    # 20개의 데이터만 graph_temperature_data 배열에 담을 수 있도록 설정
    graph_temperature_data = temperature_list
    if len(temperature_list) > 20:
        graph_temperature_data = temperature_list[-20:]

    # 그래프 그리기
    line_chart[0].set_data(range(len(graph_temperature_data)), graph_temperature_data)
    plt.pause(1)

# 온도정보 계산하는 함수
def temperature_calculate(t1, t2, t3, d):
    # Int로 변환, 비트 시프트 후 합산
    int_t1 = int(t1)
    int_t2 = int(t2) << 8 # t2의 값을 왼쪽으로 8비트 shift 한다.
    int_t3 = int(t3) << 16 # t3의 값을 왼쪽으로 16비트 shift 한다.

    signed_value = int_t1 + int_t2 + int_t3

    # 자릿수 정하는 변수
    digit = int(d)
    if digit > 127:
        digit -= 256

    # 온도
    temperature = float(signed_value) * 10 ** digit
    # 소수점 1자리만 남도록 수정
    temperature = round(temperature * 10) / 10

    return temperature

# 날짜정보 계산하는 함수
def date_calculate(array):
    year = (array[1] & 0x0F) << 8 | (array[0] & 0x00FF)
    month = array[2]
    day = array[3]
    hour = array[4]
    minute = array[5]
    second = array[6]

    date_component = datetime(year, month, day, hour, minute, second)

    return date_component

# 현재 시간을 리턴하는 메서드(byte 타입)
def current_date():
    current_datetime = datetime.now()

    year = current_datetime.year
    month = current_datetime.month
    day = current_datetime.day
    hour = current_datetime.hour
    minute = current_datetime.minute
    second = current_datetime.second

    result = [
        year & 0xFF,
        (year >> 8) & 0xFF,
        month & 0xFF,
        day & 0xFF,
        hour & 0xFF,
        minute & 0xFF,
        second & 0xFF
    ]

    return bytes(result)

# 그래프 설정하는 함수
def set_graph():
    global line_chart
    # Set Graph
    plt.ion()  # Activate interactive mode
    fig, ax = plt.subplots()
    line = ax.plot([], [], label='Temperature', marker='o', lw=1)
    ax.set_ylim(20, 40)
    ax.set_xlim(0, 20)
    ax.set_ylabel('Temperature (°C)')
    ax.legend()

    # 전역변수에 차트정보 설정
    line_chart = line

# 이벤트루프 종료를 방지하기위한 함수
async def wait_connect():
    while connected_device.is_connected:
        await asyncio.sleep(1)

selected_device = None

# Service, Characteristic
health_thermometer_service = '00001809-0000-1000-8000-00805f9b34fb'

date_time_characteristic = '00002a08-0000-1000-8000-00805f9b34fb'
temperature_characteristic = '00002a1c-0000-1000-8000-00805f9b34fb'

temperature_list = []

line_chart = None

# 데이터베이스 객체 생성(temperature.db 파일 생성됨)
conn = sqlite3.connect("temperature.db")
# SQL 동작을 위해 cursor 오브젝트 생성
cursor = conn.cursor()

connected_device = None

if __name__ == '__main__':
    # 테이블 생성(한번만 실행되도록 try-catch 이용)
    try:
        cursor.execute("CREATE TABLE TEMPERATURE(name text, temperature double, date datetime)")
    except:
        # 이미 테이블이 실행되었다면 구문 콘솔 출력
        print("database already created")
    # 비동기 이벤트 루프 생성
    loop = asyncio.get_event_loop()
    # 비동기 형태로 Scan함수 실행
    scan_devices = loop.run_until_complete(scan())
    # Scan된 기기 화면으로 출력
    show_scan_devices(scan_devices)

    # 선택한 기기가 None이 아니라면
    if selected_device is not None:
        # connect 요청 후 함수가 끝날때까지 기다림
        connected_device = loop.run_until_complete(connect(selected_device))

    # 그래프 설정 함수
    set_graph()

    # 바나나 체온계 연결되어 있다면
    if connected_device is not None:
        # Service, Characteristic 출력하는 함수 호출
        loop.run_until_complete(get_service_and_characteristic(connected_device))

    # 종료 방지함수 호출
    loop.run_until_complete(wait_connect())

Last updated