4.1.3. BLE 연결
블루투스 기기 Scan
Python에서 바나나 체온계를 스캔하고 연결하기 위해 필요한 모듈들을 import해야 한다. main.py 파일에 다음과 같이 모듈들을 import 한다.
import asyncio # 비동기 프로그래밍을 위한 모듈
from typing import Optional # 타입 힌트를 위한 모듈
from bleak import BleakScanner, BleakClient # Bluetooth Low Energy(BLE) 통신을 위한 모듈
이 모듈들은 각각 특정 기능을 제공한다.
asyncio
비동기 프로그래밍을 지원하는 모듈이다.
BLE 장치와의 통신은 시간이 걸릴 수 있으므로, 비동기 처리를 통해 프로그램의 효율성을 높인다.
typing (Optional)
Python에 타입 힌트를 추가하기 위한 모듈이다.
코드의 가독성을 높이고 잠재적인 오류를 미리 발견할 수 있게 도와준다.
bleak (BleakScanner, BleakClient)
Bluetooth Low Energy (BLE) 장치와 통신하기 위한 라이브러리이다.
BleakScanner: BLE 장치를 검색(스캔)하는 데 사용된다.
BleakClient: 검색된 BLE 장치에 연결하고 데이터를 주고받는 데 사용된다.
모듈을 import한 후, 프로그램의 메인 부분을 작성한다. Python은 스크립트가 직접 실행될 때 특별한 동작을 수행하도록 하기 위해 다음과 같은 조건문을 사용한다.
이 조건문은 '이 파일이 직접 실행되는 경우에만 아래 코드를 실행하라'는 의미이다. 이렇게 함으로써 이 파일이 다른 프로그램에 의해 모듈로 import될 때는 실행되지 않고, 직접 실행될 때만 원하는 동작을 수행하게 할 수 있다.
if __name__ == '__main__':
# 메인 프로그램 실행
비동기 처리를 위해 현재 실행중인 이벤트 루프를 가져온다.
if __name__ == '__main__':
# 비동기 이벤트 루프 생성
loop = asyncio.get_event_loop()
주변의 BLE 기기를 Scan하기 위해 scan 함수를 정의한다.
# 비동기 형태로 BLE 장치 검색
async def scan():
ts100_device = []
print('Scan Start')
# scan start
scan_devices = await BleakScanner.discover()
print('Scan End')
# 스캔된 device 탐색
for device in scan_devices:
# 이름이 없는 Device "Unknown Device로 변경"
device_name = device.name if device.name else "Unknown Device"
# 이름에 TS100을 포함하면 콘솔에 출력
if "TS100" in device_name:
ts100_device.append(device)
print(device_name)
return ts100_device
위 코드는 Bluetooth Low Energy (BLE) 장치를 스캔하는 비동기 함수이다. 주요 기능은 다음과 같다:
ts100_device
리스트를 초기화하여 발견된 TS100 장치들을 저장할 준비를 한다.스캔 시작을 알리는 메시지를 출력한다.
BleakScanner.discover()
를 사용하여 주변의 BLE 장치들을 비동기적으로 스캔한다.스캔 종료를 알리는 메시지를 출력한다.
스캔된 각 장치에 대해:
장치 이름이 없으면 "Unknown Device"로 설정한다.
장치 이름에 "TS100"이 포함되어 있으면:
해당 장치를
ts100_device
리스트에 추가한다.장치 이름을 출력한다.
발견된 TS100 장치들의 리스트를 반환한다. 이 함수는 비동기(
async
)로 정의되어 있어, 스캔 과정이 완료될 때까지 다른 작업을 수행할 수 있게 해준다. 이는 BLE 스캔이 시간이 걸릴 수 있는 작업이기 때문에 효율적인 방법이다.
비동기 이벤트 루프를 사용해서 scan 함수를 호출하여 scan 함수가 종료될 때 까지 기다린다.
if __name__ == '__main__':
# 비동기 이벤트 루프 생성
loop = asyncio.get_event_loop()
# 비동기 형태로 Scan 함수 실행
scan_devices = loop.run_until_complete(scan())
지금까지의 코드를 실행하면 콘솔에 scan된 바나나 체온계 기기를 확인할 수 있다.
다음은 지금까지의 전체 코드이다.
import asyncio # 비동기 프로그래밍을 위한 모듈
from typing import Optional # 타입 힌트를 위한 모듈
from bleak import BleakScanner, BleakClient # Bluetooth Low Energy(BLE) 통신을 위한 모듈
# 비동기 형태로 BLE 장치 검색
async def scan():
ts100_device = []
print('Scan Start')
# scan start
scan_devices = await BleakScanner.discover()
print('Scan End')
# 스캔된 device 탐색
for device in scan_devices:
# 이름이 없는 Device "Unknown Device로 변경"
device_name = device.name if device.name else "Unknown Device"
# 이름에 TS100을 포함하면 콘솔에 출력
if "TS100" in device_name:
ts100_device.append(device)
print(device_name)
return ts100_device
if __name__ == '__main__':
# 비동기 이벤트 루프 생성
loop = asyncio.get_event_loop()
# 비동기 형태로 Scan 함수 실행
scan_devices = loop.run_until_complete(scan())
블루투스 기기 선택
스캔된 BLE(Bluetooth Low Energy) 기기의 정보를 사용자에게 시각적으로 보여주기 위해, GUI(그래픽 사용자 인터페이스)를 만들 필요가 있다. 이를 위해 Python의 표준 GUI 라이브러리인 tkinter를 사용한다. tkinter는 Python에 기본으로 포함된 GUI 툴킷이다. 별도의 설치 없이 바로 사용할 수 있다. 'tk'라는 별칭으로 import하는 것이 일반적이다.
tkinter의 주요 기능은 다음과 같다.
애플리케이션의 메인 창 생성
버튼, 레이블, 텍스트 박스 등 다양한 UI 요소 제공
마우스 클릭, 키보드 입력 등의 이벤트 처리
다양한 레이아웃 관리 기능으로 위젯 배치
이렇게 tkinter를 사용함으로써, 사용자는 터미널에서 텍스트로만 정보를 보는 것이 아니라, 시각적으로 더 이해하기 쉽고 상호작용이 가능한 형태로 BLE 기기 정보를 확인할 수 있다.
import asyncio # 비동기 프로그래밍을 위한 모듈
from typing import Optional # 타입 힌트를 위한 모듈
from bleak import BleakScanner, BleakClient # Bluetooth Low Energy(BLE) 통신을 위한 모듈
import tkinter as tk # 추가된 부분(GUI 관련 라이브러리)
Scan된 device를 확인할 수 있는 show_scan_devices() 함수를 추가한다. Scan된 device가 화면에 버튼 형태로 나타나고, 버튼을 클릭했을 때 button_click() 함수가 호출되어 화면이 사라지는 형태로 구현되었다.
# GUI로 스캔된 장치들을 화면에 표시하는 함수
def show_scan_devices(devices):
# 스캔된 장치가 없으면 False 반환하고 함수 종료
if not devices:
return False
# tkinter 윈도우 생성
window = tk.Tk()
# 윈도우 제목 설정
window.title("TS100 Scan List")
buttons = []
# 스캔된 각 장치에 대해 버튼 생성
for device in devices:
# 버튼에 표시될 텍스트로 장치 이름 사용
text = device.name
# 버튼 생성. 클릭 시 button_click 함수 호출.
# lambda 함수를 사용해 현재 장치와 윈도우 객체를 전달
button = tk.Button(window, text=text, command=lambda device=device: button_click(device, window))
# 버튼을 윈도우에 추가
button.pack()
# 생성된 버튼을 리스트에 저장
buttons.append(button)
# 윈도우 이벤트 루프 시작. 사용자 입력 대기
window.mainloop()
# 장치 버튼 클릭 시 호출되는 함수
def button_click(device, window):
# 선택된 장치 이름 출력
print("Select Device is " + device.name)
# GUI 윈도우 종료
window.quit()
# 메인 실행 부분
if __name__ == '__main__':
# 비동기 이벤트 루프 생성
loop = asyncio.get_event_loop()
# 비동기로 scan 함수 실행하여 장치 스캔
scan_devices = loop.run_until_complete(scan())
# 스캔된 장치들을 GUI로 표시
show_scan_devices(scan_devices)
위 코드를 실행하면 그림과 같이 화면에 Scan된 기기를 확인할 수 있고, 연결하고자 하는 Device를 선택하면 화면이 종료된다.
지금까지의 내용이 반영된 코드이다.
import asyncio # 비동기 프로그래밍을 위한 모듈
from typing import Optional # 타입 힌트를 위한 모듈
from bleak import BleakScanner, BleakClient # Bluetooth Low Energy(BLE) 통신을 위한 모듈
import tkinter as tk # GUI 라이브러리
# 비동기 형태로 BLE 장치 검색
async def scan():
ts100_device = []
print('Scan Start')
# scan start
scan_devices = await BleakScanner.discover()
print('Scan End')
# 스캔된 device 탐색
for device in scan_devices:
# 이름이 없는 Device "Unknown Device로 변경"
device_name = device.name if device.name else "Unknown Device"
# 이름에 TS100을 포함하면 콘솔에 출력
if "TS100" in device_name:
ts100_device.append(device)
print(device_name)
return ts100_device
# GUI로 스캔된 장치들을 화면에 표시하는 함수
def show_scan_devices(devices):
# 스캔된 장치가 없으면 False 반환하고 함수 종료
if not devices:
return False
# tkinter 윈도우 생성
window = tk.Tk()
# 윈도우 제목 설정
window.title("TS100 Scan List")
buttons = []
# 스캔된 각 장치에 대해 버튼 생성
for device in devices:
# 버튼에 표시될 텍스트로 장치 이름 사용
text = device.name
# 버튼 생성. 클릭 시 button_click 함수 호출.
# lambda 함수를 사용해 현재 장치와 윈도우 객체를 전달
button = tk.Button(window, text=text, command=lambda device=device: button_click(device, window))
# 버튼을 윈도우에 추가
button.pack()
# 생성된 버튼을 리스트에 저장
buttons.append(button)
# 윈도우 이벤트 루프 시작. 사용자 입력 대기
window.mainloop()
# 장치 버튼 클릭 시 호출되는 함수
def button_click(device, window):
# 선택된 장치 이름 출력
print("Select Device is " + device.name)
# GUI 윈도우 종료
window.quit()
# 메인 실행 부분
if __name__ == '__main__':
# 비동기 이벤트 루프 생성
loop = asyncio.get_event_loop()
# 비동기로 scan 함수 실행하여 장치 스캔
scan_devices = loop.run_until_complete(scan())
# 스캔된 장치들을 GUI로 표시
show_scan_devices(scan_devices)
블루투스 기기 연결
Scan된 device를 담는 전역변수 selected_device를 추가한다.
selected_device = None
button_click() 함수에 연결하고자 하는 바나나 체온계를 selected_device 변수에 담는 코드를 추가한다.
def button_click(device, window):
global selected_device # 전역변수 함수내 사용하도록 설정
selected_device = device # 선택한 Device selected_device 변수에 담기
# remove gui
window.quit()
블루투스 기기를 연결하기 위해 connect 함수를 추가한다. 비동기 처리를 위해 async 함수로 구현한다.
# 선택한 바나나 체온계에 연결하는 비동기 함수
async def connect(candidate_device):
print('connect start')
# BleakClient 객체 생성. 이 객체를 통해 BLE 장치와 통신
selected_client = BleakClient(candidate_device)
try:
# 장치 연결 시도. await 키워드로 비동기 연결 완료 대기
await selected_client.connect()
print('connected')
except Exception as e:
# 연결 실패 시 예외 처리
print('error: ', e, end='')
# 연결 실패 시 None 반환하여 실패 상태 표시
return None
# 연결 성공 시 BleakClient 객체 반환
return selected_client
main에 연결요청을 수행할 device를 선택했을 때 connect 과정을 수행하는 코드를 추가한다.
if __name__ == '__main__':
# 비동기 이벤트 루프 생성
loop = asyncio.get_event_loop()
# 비동기 형태로 Scan함수 실행
scan_devices = loop.run_until_complete(scan())
# Scan된 기기 화면으로 출력
show_scan_devices(scan_devices)
connected_device = None
# 선택한 기기가 None이 아니라면
if selected_device is not None:
# connect 요청 후 함수가 끝날때까지 기다림
connected_device = loop.run_until_complete(connect(selected_device))
위 코드를 실행하면 콘솔(Terminal)에 다음과 같이 출력된다.이 출력되는것을 확인할 수 있다.
처음 페어링을 진행하는 경우 PIN Code를 입력해야한다.
바나나 체온계 뒷면의 PIN Code 6자리를 입력하면 된다.
지금까지의 내용이 반영된 코드이다. 다음은 위 과정을 모두 수행한 코드이다.
import asyncio # 비동기 프로그래밍을 위한 모듈
from typing import Optional # 타입 힌트를 위한 모듈
from bleak import BleakScanner, BleakClient # Bluetooth Low Energy(BLE) 통신을 위한 모듈
import tkinter as tk # GUI 라이브러리
# 비동기 형태로 BLE 장치 검색
async def scan():
ts100_device = []
print('Scan Start')
# scan start
scan_devices = await BleakScanner.discover()
print('Scan End')
# 스캔된 device 탐색
for device in scan_devices:
# 이름이 없는 Device "Unknown Device로 변경"
device_name = device.name if device.name else "Unknown Device"
# 이름에 TS100을 포함하면 콘솔에 출력
if "TS100" in device_name:
ts100_device.append(device)
print(device_name)
return ts100_device
# GUI로 화면에 나타내는 함수
def show_scan_devices(devices):
if not devices:
return False
window = tk.Tk()
# 화면 제목
window.title("TS100 Scan List")
buttons = []
for device in devices:
text = device.name
# 버튼 이벤트 추가(button_click 함수 호출)
button = tk.Button(window, text=text, command=lambda device=device: button_click(device, window))
button.pack()
buttons.append(button)
# 윈도우 종료까지 실행
window.mainloop()
# 버튼 클릭 시 호출되는 함수
def button_click(device, window):
global selected_device # 전역변수 함수내 사용하도록 설정
selected_device = device # 선택한 Device selected_device 변수에 담기
# remove gui
window.quit()
# 선택한 바나나 체온계에 연결하는 함수
async def connect(candidate_device):
print('connect start')
selected_client = BleakClient(candidate_device)
try:
# 장치 연결 시작
await selected_client.connect()
print('connected')
except Exception as e:
# 연결 실패시 발생
print('error: ', e, end='')
return None
return selected_client
selected_device = None
if __name__ == '__main__':
# 비동기 이벤트 루프 생성
loop = asyncio.get_event_loop()
# 비동기 형태로 Scan함수 실행
scan_devices = loop.run_until_complete(scan())
# Scan된 기기 화면으로 출력
show_scan_devices(scan_devices)
connected_device = None
# 선택한 기기가 None이 아니라면
if selected_device is not None:
# connect 요청 후 함수가 끝날때까지 기다림
connected_device = loop.run_until_complete(connect(selected_device))
Service, Characteristic 요청
블루투스 연결이 완료 후, 바나나 체온계에서 제공하는 전체 Service와 Characteristic을 확인하기 위해 바나나 체온계에 Service와 Characteristic을 요청한다.
Service 요청을 위한 get_service_and_characterisc 함수를 추가한다.
# Service, Characteristic 확인하는 함수
async def get_service_and_characteristic(connected_device):
# Service 확인
for service in connected_device.services:
print(service)
# Characteristic 확인
for characteristic in service.characteristics:
print(characteristic)
print()
get_service_and_characteristic 함수를 main함수에서 호출한다.
if __name__ == '__main__':
# 비동기 이벤트 루프 생성
loop = asyncio.get_event_loop()
# 비동기 형태로 Scan함수 실행
scan_devices = loop.run_until_complete(scan())
# Scan된 기기 화면으로 출력
show_scan_devices(scan_devices)
connected_device = None
if selected_device is not None:
connected_device = loop.run_until_complete(connect(selected_device))
print("device is " + selected_device.name)
# 바나나 체온계와 연결되어 있다면
if connected_device is not None:
# Service, Characteristic 출력하는 함수 호출
loop.run_until_complete(get_service_and_characteristic(connected_device))
위 코드를 실행하여 바나나 체온계에서 제공되는 Service와 Characteristic을 살펴볼 수 있다.
지금까지의 내용이 반영된 코드이다.
import asyncio # 비동기 프로그래밍을 위한 모듈
from typing import Optional # 타입 힌트를 위한 모듈
from bleak import BleakScanner, BleakClient # Bluetooth Low Energy(BLE) 통신을 위한 모듈
import tkinter as tk # GUI 라이브러리
# 비동기 형태로 BLE 장치 검색
async def scan():
ts100_device = []
print('Scan Start')
# scan start
scan_devices = await BleakScanner.discover()
print('Scan End')
# 스캔된 device 탐색
for device in scan_devices:
# 이름이 없는 Device "Unknown Device로 변경"
device_name = device.name if device.name else "Unknown Device"
# 이름에 TS100을 포함하면 콘솔에 출력
if "TS100" in device_name:
ts100_device.append(device)
print(device_name)
return ts100_device
# GUI로 화면에 나타내는 함수
def show_scan_devices(devices):
if not devices:
return False
window = tk.Tk()
# 화면 제목
window.title("TS100 Scan List")
buttons = []
for device in devices:
text = device.name
# 버튼 이벤트 추가(button_click 함수 호출)
button = tk.Button(window, text=text, command=lambda device=device: button_click(device, window))
button.pack()
buttons.append(button)
# 윈도우 종료까지 실행
window.mainloop()
# 버튼 클릭 시 호출되는 함수
def button_click(device, window):
global selected_device # 전역변수 함수내 사용하도록 설정
selected_device = device # 선택한 Device selected_device 변수에 담기
# remove gui
window.quit()
# 선택한 바나나 체온계에 연결하는 함수
async def connect(candidate_device):
print('connect start')
selected_client = BleakClient(candidate_device)
try:
# 장치 연결 시작
await selected_client.connect()
print('connected')
except Exception as e:
# 연결 실패시 발생
print('error: ', e, end='')
return None
return selected_client
# Service, Characteristic 확인하는 함수
async def get_service_and_characteristic(connected_device):
# Service 확인
for service in connected_device.services:
print(service)
# Characteristic 확인
for characteristic in service.characteristics:
print(characteristic)
print()
selected_device = None
if __name__ == '__main__':
# 비동기 이벤트 루프 생성
loop = asyncio.get_event_loop()
# 비동기 형태로 Scan함수 실행
scan_devices = loop.run_until_complete(scan())
# Scan된 기기 화면으로 출력
show_scan_devices(scan_devices)
connected_device = None
# 선택한 기기가 None이 아니라면
if selected_device is not None:
# connect 요청 후 함수가 끝날때까지 기다림
connected_device = loop.run_until_complete(connect(selected_device))
# 바나나 체온계와 연결되어 있다면
if connected_device is not None:
# Service, Characteristic 출력하는 함수 호출
loop.run_until_complete(get_service_and_characteristic(connected_device))
온도정보 가져오기
바나나 체온계의 여러 characteristic 중 온도정보를 받아오는 기능을 구현한다. 온도 정보는 characteristic 에서 확인할 수 있다.
Service: Health Thermometer(00001809-0000-1000-8000-00805f9b34fb)
Characteristic: Temperature Measurement(00002a1c-0000-1000-8000-00805f9b34fb)
온도정보 수신
여러 Service 중 Health Thermometer와 관련된 service만 수신받기 위해 get_service_and_characteristic 함수를 수정한다.
# Service UUID
health_thermometer_service = '00001809-0000-1000-8000-00805f9b34fb'
async def get_service_and_characteristic(connected_device):
# Service 확인(제대로 연결되었는지 테스트용)
for service in connected_device.services:
# Health Thermometer 관련 Service일때만 Characteristic 검색
if service.uuid == health_thermometer_service:
for characteristic in service.characteristics:
print(characteristic)
print()
이후 온도정보 characteristic을 통해 notify를 수신하기 위해 함수를 수정한다.
# Characteristic
temperature_characteristic = '00002a1c-0000-1000-8000-00805f9b34fb'
async def get_service_and_characteristic(connected_device):
# Service 확인(제대로 연결되었는지 테스트용)
for service in connected_device.services:
# Health Thermometer 관련 Service일때만 Characteristic 검색
if service.uuid == health_thermometer_service:
for characteristic in service.characteristics:
# 온도정보 notify 설정
if characteristic.uuid == temperature_characteristic:
await connected_device.start_notify(characteristic, notify_callback)
# notify를 수신했을 때 호출되는 함수
def notify_callback(handle, data):
print(data)
연결된 기기가 종료될 때까지 이벤트 루프를 종료하지 않기 위해 wait_connect 함수를 추가하고 main을 수정한다.
# 이벤트루프 종료를 방지하기위한 함수
async def wait_connect():
while connected_device.is_connected:
await asyncio.sleep(1)
if __name__ == '__main__':
# 비동기 이벤트 루프 생성
loop = asyncio.get_event_loop()
# 비동기 형태로 Scan함수 실행
scan_devices = loop.run_until_complete(scan())
# Scan된 기기 화면으로 출력
show_scan_devices(scan_devices)
connected_device = None
# 선택한 기기가 None이 아니라면
if selected_device is not None:
# connect 요청 후 함수가 끝날때까지 기다림
connected_device = loop.run_until_complete(connect(selected_device))
# 바나나 체온계와 연결되어 있다면
if connected_device is not None:
# Service, Characteristic 출력하는 함수 호출
loop.run_until_complete(get_service_and_characteristic(connected_device))
# 종료 방지함수 호출
loop.run_until_complete(wait_connect())
이후 코드를 실행하면 온도정보가 콘솔창에 출력되는것을 확인할 수 있다. 현재 wait_connect 함수는 별도의 종료를 하지않으면 코드가 계속 실행되므로 우측 상단의 쓰레기통 아이콘을 클릭해 코드실행을 종료한다.
지금까지의 내용이 반영된 코드이다.
import asyncio # 비동기 프로그래밍을 위한 모듈
from typing import Optional # 타입 힌트를 위한 모듈
from bleak import BleakScanner, BleakClient # Bluetooth Low Energy(BLE) 통신을 위한 모듈
import tkinter as tk # GUI 라이브러리
# 비동기 형태로 BLE 장치 검색
async def scan():
ts100_device = []
print('Scan Start')
# scan start
scan_devices = await BleakScanner.discover()
print('Scan End')
# 스캔된 device 탐색
for device in scan_devices:
# 이름이 없는 Device "Unknown Device로 변경"
device_name = device.name if device.name else "Unknown Device"
# 이름에 TS100을 포함하면 콘솔에 출력
if "TS100" in device_name:
ts100_device.append(device)
print(device_name)
return ts100_device
# GUI로 화면에 나타내는 함수
def show_scan_devices(devices):
if not devices:
return False
window = tk.Tk()
# 화면 제목
window.title("TS100 Scan List")
buttons = []
for device in devices:
text = device.name
# 버튼 이벤트 추가(button_click 함수 호출)
button = tk.Button(window, text=text, command=lambda device=device: button_click(device, window))
button.pack()
buttons.append(button)
# 윈도우 종료까지 실행
window.mainloop()
# 버튼 클릭 시 호출되는 함수
def button_click(device, window):
global selected_device # 전역변수 함수내 사용하도록 설정
selected_device = device # 선택한 Device selected_device 변수에 담기
# remove gui
window.quit()
# 선택한 바나나 체온계에 연결하는 함수
async def connect(candidate_device):
print('connect start')
selected_client = BleakClient(candidate_device)
try:
# 장치 연결 시작
await selected_client.connect()
print('connected')
except Exception as e:
# 연결 실패시 발생
print('error: ', e, end='')
return None
return selected_client
# Service, Charactristic 확인하는 함수
async def get_service_and_characteristic(connected_device):
# Service 확인(제대로 연결되었는지 테스트용)
for service in connected_device.services:
# Health Thermometer 관련 Service일때만 Characteristic 검색
if service.uuid == health_thermometer_service:
for characteristic in service.characteristics:
# 온도정보 notify 설정
if characteristic.uuid == temperature_characteristic:
await connected_device.start_notify(characteristic, notify_callback)
# notify를 수신했을 때 호출되는 함수
def notify_callback(handle, data):
print(data)
# 이벤트루프 종료를 방지하기위한 함수
async def wait_connect():
while connected_device.is_connected:
await asyncio.sleep(1)
selected_device = None
# Service, Characteristic
health_thermometer_service = '00001809-0000-1000-8000-00805f9b34fb'
temperature_characteristic = '00002a1c-0000-1000-8000-00805f9b34fb'
if __name__ == '__main__':
# 비동기 이벤트 루프 생성
loop = asyncio.get_event_loop()
# 비동기 형태로 Scan함수 실행
scan_devices = loop.run_until_complete(scan())
# Scan된 기기 화면으로 출력
show_scan_devices(scan_devices)
connected_device = None
# 선택한 기기가 None이 아니라면
if selected_device is not None:
# connect 요청 후 함수가 끝날때까지 기다림
connected_device = loop.run_until_complete(connect(selected_device))
# 바나나 체온계와 연결되어 있다면
if connected_device is not None:
# Service, Characteristic 출력하는 함수 호출
loop.run_until_complete(get_service_and_characteristic(connected_device))
# 종료 방지함수 호출
loop.run_until_complete(wait_connect())
온도정보 변환
바나나 체온계로부터 수신받은 데이터는 온도와 날짜 정보로 이루어져 있다. 온도 정보를 변환하는 함수를 작성한다.
# 온도정보 계산하는 함수
def temperature_calculate(t1, t2, t3, d):
# Int로 변환, 비트 시프트 후 합산
int_t1 = int(t1)
int_t2 = int(t2) << 8 # t2의 값을 왼쪽으로 8비트 shift 한다.
int_t3 = int(t3) << 16 # t3의 값을 왼쪽으로 16비트 shift 한다.
signed_value = int_t1 + int_t2 + int_t3
# 자릿수 정하는 변수
digit = int(d)
if digit > 127:
digit -= 256
# 온도
temperature = float(signed_value) * 10 ** digit
# 소수점 1자리만 남도록 수정
temperature = round(temperature * 10) / 10
return temperature
날짜정보를 변환하는 함수를 작성한다.
# 날짜정보 변환하기 위해 사용하는 라이브러리
from datetime import datetime
# 날짜정보 계산하는 함수
def date_calculate(array):
year = (array[1] & 0x0F) << 8 | (array[0] & 0x00FF)
month = array[2]
day = array[3]
hour = array[4]
minute = array[5]
second = array[6]
date_component = datetime(year, month, day, hour, minute, second)
return date_component
기존에 작성했던 notify_callback 함수를 온도정보와 날짜정보를 받아와 출력하는 함수로 변경한다.
# notify를 수신했을 때 호출되는 함수
def notify_callback(handle, data):
parse_temperature_information(data)
# 온도정보 변환하는 함수
def parse_temperature_information(data: bytearray):
# 온도정보 변환
temperature = temperature_calculate(data[1], data[2], data[3], data[4])
array_slice = data[5:]
date_array = list(map(int, array_slice))
# 날짜정보 변환
date = date_calculate(date_array)
# 온도, 날짜정보 출력
print(date)
print(temperature)
이제 프로그램을 이후 프로젝트를 실행하면 변환된 온도정보가 출력된다.
지금까지의 내용이 반영된 코드이다.
import asyncio # 비동기 프로그래밍을 위한 모듈
from typing import Optional # 타입 힌트를 위한 모듈
from bleak import BleakScanner, BleakClient # Bluetooth Low Energy(BLE) 통신을 위한 모듈
import tkinter as tk # GUI 라이브러리
# 날짜정보 변환하기 위해 사용하는 라이브러리
from datetime import datetime
# 비동기 형태로 BLE 장치 검색
async def scan():
ts100_device = []
print('Scan Start')
# scan start
scan_devices = await BleakScanner.discover()
print('Scan End')
# 스캔된 device 탐색
for device in scan_devices:
# 이름이 없는 Device "Unknown Device로 변경"
device_name = device.name if device.name else "Unknown Device"
# 이름에 TS100을 포함하면 콘솔에 출력
if "TS100" in device_name:
ts100_device.append(device)
print(device_name)
return ts100_device
# GUI로 화면에 나타내는 함수
def show_scan_devices(devices):
if not devices:
return False
window = tk.Tk()
# 화면 제목
window.title("TS100 Scan List")
buttons = []
for device in devices:
text = device.name
# 버튼 이벤트 추가(button_click 함수 호출)
button = tk.Button(window, text=text, command=lambda device=device: button_click(device, window))
button.pack()
buttons.append(button)
# 윈도우 종료까지 실행
window.mainloop()
# 버튼 클릭 시 호출되는 함수
def button_click(device, window):
global selected_device # 전역변수 함수내 사용하도록 설정
selected_device = device # 선택한 Device selected_device 변수에 담기
# remove gui
window.quit()
# 선택한 바나나 체온계에 연결하는 함수
async def connect(candidate_device):
print('connect start')
selected_client = BleakClient(candidate_device)
try:
# 장치 연결 시작
await selected_client.connect()
print('connected')
except Exception as e:
# 연결 실패시 발생
print('error: ', e, end='')
return None
return selected_client
# Service, Charactristic 확인하는 함수
async def get_service_and_characteristic(connected_device):
# Service 확인(제대로 연결되었는지 테스트용)
for service in connected_device.services:
# Health Thermometer 관련 Service일때만 Characteristic 검색
if service.uuid == health_thermometer_service:
for characteristic in service.characteristics:
# 온도정보 notify 설정
if characteristic.uuid == temperature_characteristic:
await connected_device.start_notify(characteristic, notify_callback)
# notify를 수신했을 때 호출되는 함수
def notify_callback(handle, data):
parse_temperature_information(data)
# 온도정보 변환하는 함수
def parse_temperature_information(data: bytearray):
# 온도정보 변환
temperature = temperature_calculate(data[1], data[2], data[3], data[4])
array_slice = data[5:]
date_array = list(map(int, array_slice))
# 날짜정보 변환
date = date_calculate(date_array)
# 온도, 날짜정보 출력
print(date)
print(temperature)
# 온도정보 계산하는 함수
def temperature_calculate(t1, t2, t3, d):
# Int로 변환, 비트 시프트 후 합산
int_t1 = int(t1)
int_t2 = int(t2) << 8 # t2의 값을 왼쪽으로 8비트 shift 한다.
int_t3 = int(t3) << 16 # t3의 값을 왼쪽으로 16비트 shift 한다.
signed_value = int_t1 + int_t2 + int_t3
# 자릿수 정하는 변수
digit = int(d)
if digit > 127:
digit -= 256
# 온도
temperature = float(signed_value) * 10 ** digit
# 소수점 1자리만 남도록 수정
temperature = round(temperature * 10) / 10
return temperature
# 날짜정보 계산하는 함수
def date_calculate(array):
year = (array[1] & 0x0F) << 8 | (array[0] & 0x00FF)
month = array[2]
day = array[3]
hour = array[4]
minute = array[5]
second = array[6]
date_component = datetime(year, month, day, hour, minute, second)
return date_component
# 이벤트루프 종료를 방지하기위한 함수
async def wait_connect():
while connected_device.is_connected:
await asyncio.sleep(1)
selected_device = None
# Service, Characteristic
health_thermometer_service = '00001809-0000-1000-8000-00805f9b34fb'
temperature_characteristic = '00002a1c-0000-1000-8000-00805f9b34fb'
if __name__ == '__main__':
# 비동기 이벤트 루프 생성
loop = asyncio.get_event_loop()
# 비동기 형태로 Scan함수 실행
scan_devices = loop.run_until_complete(scan())
# Scan된 기기 화면으로 출력
show_scan_devices(scan_devices)
connected_device = None
# 선택한 기기가 None이 아니라면
if selected_device is not None:
# connect 요청 후 함수가 끝날때까지 기다림
connected_device = loop.run_until_complete(connect(selected_device))
# 바나나 체온계와 연결되어 있다면
if connected_device is not None:
# Service, Characteristic 출력하는 함수 호출
loop.run_until_complete(get_service_and_characteristic(connected_device))
# 종료 방지함수 호출
loop.run_until_complete(wait_connect())
날짜정보 수정
현재 바나나 체온계에서 수신되는 날짜정보는 현재 시간과 맞지 않는다. 올바른 시간을 확인하려면 바나나 체온계에 현재 시간을 알려주는 과정이 필요하다.
다음은 날짜와 관련된 특성이다.
Service: Health Thermometer(00001809-0000-1000-8000-00805f9b34fb)
Characteristic: Date Time(00002a08-0000-1000-8000-00805f9634fb)
현재 시간을 리턴하는 함수를 구현한다.
# 현재 시간을 리턴하는 메서드(byte 타입)
def current_date():
current_datetime = datetime.now()
year = current_datetime.year
month = current_datetime.month
day = current_datetime.day
hour = current_datetime.hour
minute = current_datetime.minute
second = current_datetime.second
result = [
year & 0xFF,
(year >> 8) & 0xFF,
month & 0xFF,
day & 0xFF,
hour & 0xFF,
minute & 0xFF,
second & 0xFF
]
return bytes(result)
Date Time Characteristic이 수신되었을 때 현재 날씨를 업로드하기 위해 get_service_and_characteristic 함수를 수정한다.
# Service, Characteristic
date_time_characteristic = '00002a08-0000-1000-8000-00805f9b34fb'
temperature_characteristic = '00002a1c-0000-1000-8000-00805f9b34fb'
async def get_service_and_characteristic(connected_device):
# Service 확인(제대로 연결되었는지 테스트용)
for service in connected_device.services:
# Health Thermometer 관련 Service일때만 Characteristic 검색
if service.uuid == health_thermometer_service:
for characteristic in service.characteristics:
# 날짜정보 Update(추가된 부분)
if characteristic.uuid == date_time_characteristic:
await connected_device.write_gatt_char(characteristic, current_date())
# 온도정보 notify 설정
if characteristic.uuid == temperature_characteristic:
await connected_device.start_notify(characteristic, notify_callback)
# notify를 수신했을 때 호출되는 함수
def notify_callback(handle, data):
parse_temperature_information(data)
# 온도정보 변환하는 함수
def parse_temperature_information(data: bytearray):
# 온도정보 변환
temperature = temperature_calculate(data[1], data[2], data[3], data[4])
array_slice = data[5:]
date_array = list(map(int, array_slice))
# 날짜정보 변환
date = date_calculate(date_array)
# 온도, 날짜정보 출력
print(date)
print(temperature)
이후 코드를 실행하면 올바른 날짜로 수정되어 온도정보가 수신되는것을 확인할 수 있다.
지금까지의 내용이 반영된 코드이다.
import asyncio # 비동기 프로그래밍을 위한 모듈
from typing import Optional # 타입 힌트를 위한 모듈
from bleak import BleakScanner, BleakClient # Bluetooth Low Energy(BLE) 통신을 위한 모듈
import tkinter as tk # GUI 라이브러리
# 날짜정보 변환하기 위해 사용하는 라이브러리
from datetime import datetime
# 비동기 형태로 BLE 장치 검색
async def scan():
ts100_device = []
print('Scan Start')
# scan start
scan_devices = await BleakScanner.discover()
print('Scan End')
# 스캔된 device 탐색
for device in scan_devices:
# 이름이 없는 Device "Unknown Device로 변경"
device_name = device.name if device.name else "Unknown Device"
# 이름에 TS100을 포함하면 콘솔에 출력
if "TS100" in device_name:
ts100_device.append(device)
print(device_name)
return ts100_device
# GUI로 화면에 나타내는 함수
def show_scan_devices(devices):
if not devices:
return False
window = tk.Tk()
# 화면 제목
window.title("TS100 Scan List")
buttons = []
for device in devices:
text = device.name
# 버튼 이벤트 추가(button_click 함수 호출)
button = tk.Button(window, text=text, command=lambda device=device: button_click(device, window))
button.pack()
buttons.append(button)
# 윈도우 종료까지 실행
window.mainloop()
# 버튼 클릭 시 호출되는 함수
def button_click(device, window):
global selected_device # 전역변수 함수내 사용하도록 설정
selected_device = device # 선택한 Device selected_device 변수에 담기
# remove gui
window.quit()
# 선택한 바나나 체온계에 연결하는 함수
async def connect(candidate_device):
print('connect start')
selected_client = BleakClient(candidate_device)
try:
# 장치 연결 시작
await selected_client.connect()
print('connected')
except Exception as e:
# 연결 실패시 발생
print('error: ', e, end='')
return None
return selected_client
# Service, Charactristic 확인하는 함수
async def get_service_and_characteristic(connected_device):
# Service 확인(제대로 연결되었는지 테스트용)
for service in connected_device.services:
# Health Thermometer 관련 Service일때만 Characteristic 검색
if service.uuid == health_thermometer_service:
for characteristic in service.characteristics:
# 날짜정보 Update(추가된 부분)
if characteristic.uuid == date_time_characteristic:
await connected_device.write_gatt_char(characteristic, current_date())
# 온도정보 notify 설정
if characteristic.uuid == temperature_characteristic:
await connected_device.start_notify(characteristic, notify_callback)
# notify를 수신했을 때 호출되는 함수
def notify_callback(handle, data):
parse_temperature_information(data)
# 온도정보 변환하는 함수
def parse_temperature_information(data: bytearray):
# 온도정보 변환
temperature = temperature_calculate(data[1], data[2], data[3], data[4])
array_slice = data[5:]
date_array = list(map(int, array_slice))
# 날짜정보 변환
date = date_calculate(date_array)
# 온도, 날짜정보 출력
print(date)
print(temperature)
# 온도정보 계산하는 함수
def temperature_calculate(t1, t2, t3, d):
# Int로 변환, 비트 시프트 후 합산
int_t1 = int(t1)
int_t2 = int(t2) << 8 # t2의 값을 왼쪽으로 8비트 shift 한다.
int_t3 = int(t3) << 16 # t3의 값을 왼쪽으로 16비트 shift 한다.
signed_value = int_t1 + int_t2 + int_t3
# 자릿수 정하는 변수
digit = int(d)
if digit > 127:
digit -= 256
# 온도
temperature = float(signed_value) * 10 ** digit
# 소수점 1자리만 남도록 수정
temperature = round(temperature * 10) / 10
return temperature
# 날짜정보 계산하는 함수
def date_calculate(array):
year = (array[1] & 0x0F) << 8 | (array[0] & 0x00FF)
month = array[2]
day = array[3]
hour = array[4]
minute = array[5]
second = array[6]
date_component = datetime(year, month, day, hour, minute, second)
return date_component
# 현재 시간을 리턴하는 메서드(byte 타입)
def current_date():
current_datetime = datetime.now()
year = current_datetime.year
month = current_datetime.month
day = current_datetime.day
hour = current_datetime.hour
minute = current_datetime.minute
second = current_datetime.second
result = [
year & 0xFF,
(year >> 8) & 0xFF,
month & 0xFF,
day & 0xFF,
hour & 0xFF,
minute & 0xFF,
second & 0xFF
]
return bytes(result)
# 이벤트루프 종료를 방지하기위한 함수
async def wait_connect():
while connected_device.is_connected:
await asyncio.sleep(1)
selected_device = None
# Service, Characteristic
health_thermometer_service = '00001809-0000-1000-8000-00805f9b34fb'
date_time_characteristic = '00002a08-0000-1000-8000-00805f9b34fb'
temperature_characteristic = '00002a1c-0000-1000-8000-00805f9b34fb'
if __name__ == '__main__':
# 비동기 이벤트 루프 생성
loop = asyncio.get_event_loop()
# 비동기 형태로 Scan함수 실행
scan_devices = loop.run_until_complete(scan())
# Scan된 기기 화면으로 출력
show_scan_devices(scan_devices)
connected_device = None
# 선택한 기기가 None이 아니라면
if selected_device is not None:
# connect 요청 후 함수가 끝날때까지 기다림
connected_device = loop.run_until_complete(connect(selected_device))
# 바나나 체온계와 연결되어 있다면
if connected_device is not None:
# Service, Characteristic 출력하는 함수 호출
loop.run_until_complete(get_service_and_characteristic(connected_device))
# 종료 방지함수 호출
loop.run_until_complete(wait_connect())
데이터 저장
바나나 체온계로부터 수신한 온도정보를 데이터베이스를 사용하여 저장한다.
데이터베이스란
데이터베이스란 여러 사람이 공유하고 사용할 목적으로 통합 관리되는 정보의 집합이다. 몇 개의 자료 파일을 조직적으로 통합하여 자료 항목의 중복을 없애고 자료를 구조화하여 기억시켜 놓은 자료의 집합체라고 할 수 있다.
일반적으로 데이터베이스는 다음과 같은 기능을 제공한다
데이터 저장
데이터를 체계적으로 저장하여 나중에 쉽게 검색하고 사용할 수 있다. 예를 들어, 바나나 체온계에서 측정한 온도 데이터를 날짜와 시간별로 저장할 수 있다.
데이터 검색
저장된 데이터 중 원하는 정보를 쉽게 찾을 수 있다. 예를 들어, 특정 날짜의 체온 데이터를 검색할 수 있다.
데이터 수정
저장된 데이터를 필요에 따라 수정할 수 있다. 예를 들어, 잘못된 온도 기록을 수정할 수 있다.
데이터 삭제
더 이상 필요하지 않은 데이터를 삭제할 수 있다. 예를 들어, 오래된 데이터를 삭제하여 저장 공간을 확보할 수 있다.
데이터베이스의 구성 요소는 다음과 같다.
테이블
데이터를 행(row)과 열(column)로 구성된 형태로 저장하는 가장 기본적인 단위이다. 예를 들어, 체온 데이터를 저장하는 테이블에는 날짜, 시간, 체온 등의 열이 있을 수 있다.
레코드
테이블의 각 행을 레코드라고 한다. 각 레코드는 하나의 데이터 항목에 대한 정보를 포함한다. 예를 들어, 특정 시간에 측정된 체온 정보가 하나의 레코드가 된다.
필드
테이블의 각 열을 필드라고 한다. 필드는 데이터의 특정 속성을 나타낸다. 예를 들어, 체온 테이블에서 날짜, 시간, 체온은 각각 하나의 필드가 된다.
기본 키
각 레코드를 고유하게 식별할 수 있는 필드이다. 예를 들어, 체온 데이터를 고유하게 식별하기 위해 날짜와 시간을 조합하여 기본 키로 사용할 수 있다.
데이터베이스의 장점은 다음과 같다.
데이터 일관성
데이터베이스는 데이터를 중복 없이 저장하고 관리하기 때문에 데이터의 일관성을 유지할 수 있다.
데이터 무결성
데이터베이스는 저장된 데이터가 정확하고 신뢰할 수 있도록 다양한 제약 조건을 설정할 수 있다.
데이터 보안
데이터베이스는 접근 권한을 설정하여 데이터를 보호할 수 있다. 예를 들어, 특정 사용자만 데이터를 조회하거나 수정할 수 있도록 제한할 수 있다.
이러한 데이터베이스의 기능과 구성 요소를 이해하면, 데이터를 효율적으로 저장하고 관리할 수 있게 된다. 초보자들도 데이터베이스를 활용하여 체계적으로 데이터를 다룰 수 있는 능력을 기를 수 있다.
이러한 장점때문에 여기서도 데이터베이스를 사용해서 데이터를 저장한다.
데이터베이스 저장
여기서는 별도의 라이브러리를 설치하지 않고, 단일 파일로 저장되어 간편하게 사용할 수 있는 데이터베이스를 사용하기 위한 툴로 SQLite를 사용한다. sqlite를 import하는 코드를 추가한다.
import sqlite3
이후 데이터베이스를 생성하기 위해 코드를 추가한다.
# 데이터베이스 객체 생성(temperature.db 파일 생성됨)
conn = sqlite3.connect("temperature.db")
# SQL 동작을 위해 cursor 오브젝트 생성
cursor = conn.cursor()
if __name__ == '__main__':
# 테이블 생성(한번만 실행되도록 try-catch 이용)
try:
cursor.execute("CREATE TABLE TEMPERATURE(name text, temperature double, date datetime)")
except:
# 이미 테이블이 실행되었다면 구문 콘솔 출력
print("database already created")
...
코드를 실행하면 좌측 상단에 temperature.db가 생성된것을 확인할 수 있다.
온도정보 notify를 수신했을 때 데이터 변환 후 데이터정보를 저장할 수 있도록 코드를 변환한다.
# 온도정보 변환하는 함수
def parse_temperature_information(data: bytearray):
# 온도정보 변환
temperature = temperature_calculate(data[1], data[2], data[3], data[4])
array_slice = data[5:]
date_array = list(map(int, array_slice))
# 날짜정보 변환
date = date_calculate(date_array)
# 온도, 날짜정보 출력
print(date)
print(temperature)
# 데이터베이스에 저장(추가된 부분)
cursor.execute("INSERT INTO TEMPERATURE (name, temperature, date) VALUES (?, ?, ?)", (selected_device.name, temperature, date))
conn.commit()
코드작성 후 실행하면 올바르게 데이터베이스에 온도정보가 저장된다. 저장된 결과는 다음 장인 "온도정보 확인"에서 볼 수 있다.
지금까지의 내용이 반영된 코드이다.
import asyncio # 비동기 프로그래밍을 위한 모듈
from typing import Optional # 타입 힌트를 위한 모듈
from bleak import BleakScanner, BleakClient # Bluetooth Low Energy(BLE) 통신을 위한 모듈
import tkinter as tk # GUI 라이브러리
# 날짜정보 변환하기 위해 사용하는 라이브러리
from datetime import datetime
import sqlite3
# 비동기 형태로 BLE 장치 검색
async def scan():
ts100_device = []
print('Scan Start')
# scan start
scan_devices = await BleakScanner.discover()
print('Scan End')
# 스캔된 device 탐색
for device in scan_devices:
# 이름이 없는 Device "Unknown Device로 변경"
device_name = device.name if device.name else "Unknown Device"
# 이름에 TS100을 포함하면 콘솔에 출력
if "TS100" in device_name:
ts100_device.append(device)
print(device_name)
return ts100_device
# GUI로 화면에 나타내는 함수
def show_scan_devices(devices):
if not devices:
return False
window = tk.Tk()
# 화면 제목
window.title("TS100 Scan List")
buttons = []
for device in devices:
text = device.name
# 버튼 이벤트 추가(button_click 함수 호출)
button = tk.Button(window, text=text, command=lambda device=device: button_click(device, window))
button.pack()
buttons.append(button)
# 윈도우 종료까지 실행
window.mainloop()
# 버튼 클릭 시 호출되는 함수
def button_click(device, window):
global selected_device # 전역변수 함수내 사용하도록 설정
selected_device = device # 선택한 Device selected_device 변수에 담기
# remove gui
window.quit()
# 선택한 바나나 체온계에 연결하는 함수
async def connect(candidate_device):
print('connect start')
selected_client = BleakClient(candidate_device)
try:
# 장치 연결 시작
await selected_client.connect()
print('connected')
except Exception as e:
# 연결 실패시 발생
print('error: ', e, end='')
return None
return selected_client
# Service, Charactristic 확인하는 함수
async def get_service_and_characteristic(connected_device):
# Service 확인(제대로 연결되었는지 테스트용)
for service in connected_device.services:
# Health Thermometer 관련 Service일때만 Characteristic 검색
if service.uuid == health_thermometer_service:
for characteristic in service.characteristics:
# 날짜정보 Update(추가된 부분)
if characteristic.uuid == date_time_characteristic:
await connected_device.write_gatt_char(characteristic, current_date())
# 온도정보 notify 설정
if characteristic.uuid == temperature_characteristic:
await connected_device.start_notify(characteristic, notify_callback)
# notify를 수신했을 때 호출되는 함수
def notify_callback(handle, data):
parse_temperature_information(data)
# 온도정보 변환하는 함수
def parse_temperature_information(data: bytearray):
# 온도정보 변환
temperature = temperature_calculate(data[1], data[2], data[3], data[4])
array_slice = data[5:]
date_array = list(map(int, array_slice))
# 날짜정보 변환
date = date_calculate(date_array)
# 온도, 날짜정보 출력
print(date)
print(temperature)
# 데이터베이스에 저장(추가된 부분)
cursor.execute("INSERT INTO TEMPERATURE (name, temperature, date) VALUES (?, ?, ?)", (selected_device.name, temperature, date))
conn.commit()
# 온도정보 계산하는 함수
def temperature_calculate(t1, t2, t3, d):
# Int로 변환, 비트 시프트 후 합산
int_t1 = int(t1)
int_t2 = int(t2) << 8 # t2의 값을 왼쪽으로 8비트 shift 한다.
int_t3 = int(t3) << 16 # t3의 값을 왼쪽으로 16비트 shift 한다.
signed_value = int_t1 + int_t2 + int_t3
# 자릿수 정하는 변수
digit = int(d)
if digit > 127:
digit -= 256
# 온도
temperature = float(signed_value) * 10 ** digit
# 소수점 1자리만 남도록 수정
temperature = round(temperature * 10) / 10
return temperature
# 날짜정보 계산하는 함수
def date_calculate(array):
year = (array[1] & 0x0F) << 8 | (array[0] & 0x00FF)
month = array[2]
day = array[3]
hour = array[4]
minute = array[5]
second = array[6]
date_component = datetime(year, month, day, hour, minute, second)
return date_component
# 현재 시간을 리턴하는 메서드(byte 타입)
def current_date():
current_datetime = datetime.now()
year = current_datetime.year
month = current_datetime.month
day = current_datetime.day
hour = current_datetime.hour
minute = current_datetime.minute
second = current_datetime.second
result = [
year & 0xFF,
(year >> 8) & 0xFF,
month & 0xFF,
day & 0xFF,
hour & 0xFF,
minute & 0xFF,
second & 0xFF
]
return bytes(result)
# 이벤트루프 종료를 방지하기위한 함수
async def wait_connect():
while connected_device.is_connected:
await asyncio.sleep(1)
selected_device = None
# Service, Characteristic
health_thermometer_service = '00001809-0000-1000-8000-00805f9b34fb'
date_time_characteristic = '00002a08-0000-1000-8000-00805f9b34fb'
temperature_characteristic = '00002a1c-0000-1000-8000-00805f9b34fb'
# 데이터베이스 객체 생성(temperature.db 파일 생성됨)
conn = sqlite3.connect("temperature.db")
# SQL 동작을 위해 cursor 오브젝트 생성
cursor = conn.cursor()
if __name__ == '__main__':
# 테이블 생성(한번만 실행되도록 try-catch 이용)
try:
cursor.execute("CREATE TABLE TEMPERATURE(name text, temperature double, date datetime)")
except:
# 이미 테이블이 실행되었다면 구문 콘솔 출력
print("database already created")
# 비동기 이벤트 루프 생성
loop = asyncio.get_event_loop()
# 비동기 형태로 Scan함수 실행
scan_devices = loop.run_until_complete(scan())
# Scan된 기기 화면으로 출력
show_scan_devices(scan_devices)
connected_device = None
# 선택한 기기가 None이 아니라면
if selected_device is not None:
# connect 요청 후 함수가 끝날때까지 기다림
connected_device = loop.run_until_complete(connect(selected_device))
# 바나나 체온계와 연결되어 있다면
if connected_device is not None:
# Service, Characteristic 출력하는 함수 호출
loop.run_until_complete(get_service_and_characteristic(connected_device))
# 종료 방지함수 호출
loop.run_until_complete(wait_connect())
온도정보 확인
온도정보 확인을 위해 VSC(Visual Studio Code)의 Extension → sqlite 검색 → SQLite Viewer를 설치한다.
설치완료 후 VSC(Visual Studio Code)의 Explorer → temperature.db 클릭 → Open Anyway 클릭 → SQLite Viewer를 순서대로 클릭한다.
temperature.db에 온도정보가 들어가 있는 것을 확인할 수 있다.
SQLite Viewer는 데이터가 실시간으로 업데이트되지 않으므로 데이터베이스를 새롭게 갱신하고 싶다면 좌측 상단의 Refresh 버튼을 클릭해 데이터를 갱신한다.
데이터 그래프로 그리기
온도정보를 그래프로 그리기 위해 코드를 추가해 라이브러리를 import한다.
import matplotlib.pyplot as plt
그래프에 그려질 온도정보를 담고있는 배열을 전역변수로 선언한다.
temperature_list = []
그래프 설정 함수를 추가한다.
# LineChart 변수
line_chart = None
# 그래프 설정하는 함수
def set_graph():
global line_chart
# Set Graph
plt.ion() # Activate interactive mode
fig, ax = plt.subplots()
line = ax.plot([], [], label='Temperature', marker='o', lw=1)
ax.set_ylim(20, 40)
ax.set_xlim(0, 20)
ax.set_ylabel('Temperature (°C)')
ax.legend()
# 전역변수에 차트정보 설정
line_chart = line
set_graph 함수를 호출하는 코드를 main함수에 추가한다.
if __name__ == '__main__':
# Create Table(Call Once)
try:
cursor.execute("CREATE TABLE TEMPERATURE(name text, temperature double, date datetime)")
except:
print("database already created")
loop = asyncio.get_event_loop()
scan_devices = loop.run_until_complete(scan())
show_scan_devices(scan_devices)
connected_device = None
if selected_device is not None:
connected_device = loop.run_until_complete(connect(selected_device))
print("device is " + selected_device.name)
# 그래프 설정 함수(추가된 부분)
set_graph()
# 바나나 체온계와 연결되어 있다면
if connected_device is not None:
# Get Service
loop.run_until_complete(get_service_and_characteristic(connected_device))
loop.run_until_complete(wait_connect())
parse_temperature_information 함수에 그래프 그리는 부분을 추가로 작성한다.
# 온도정보 변환하는 함수
def parse_temperature_information(data: bytearray):
# 온도정보 변환
temperature = temperature_calculate(data[1], data[2], data[3], data[4])
array_slice = data[5:]
date_array = list(map(int, array_slice))
# 날짜정보 변환
date = date_calculate(date_array)
# 온도, 날짜정보 출력
print(date)
print(temperature)
# save database
cursor.execute("INSERT INTO TEMPERATURE (name, temperature, date) VALUES (?, ?, ?)", (selected_device.name, temperature, date))
conn.commit()
# 배열에 값 추가
temperature_list.append(temperature)
# 20개의 데이터만 graph_temperature_data 배열에 담을 수 있도록 설정
graph_temperature_data = temperature_list
if len(temperature_list) > 20:
graph_temperature_data = temperature_list[-20:]
# 그래프 그리기
line_chart[0].set_data(range(len(graph_temperature_data)), graph_temperature_data)
plt.pause(1)
이후 코드를 실행하면 온도정보가 그래프로 나타나는것을 확인할 수 있다.
지금까지의 내용이 반영된 코드이다.
import asyncio # 비동기 프로그래밍을 위한 모듈 (동시에 여러 작업을 처리할 수 있게 해줌)
from typing import Optional # 타입 힌트를 위한 모듈 (코드의 가독성과 오류 방지를 위해 사용)
from bleak import BleakScanner, BleakClient # Bluetooth Low Energy(BLE) 통신을 위한 모듈 (블루투스 기기와 통신하기 위해 사용)
import tkinter as tk # GUI 라이브러리 (그래픽 사용자 인터페이스를 만들기 위해 사용)
# 날짜와 시간 정보를 다루기 위한 라이브러리
from datetime import datetime
# SQLite 데이터베이스를 사용하기 위한 라이브러리
import sqlite3
# 그래프를 그리기 위한 라이브러리
import matplotlib.pyplot as plt
# 비동기 형태로 BLE 장치를 검색하는 함수
async def scan():
ts100_device = []
print('Scan Start')
# BLE 기기 스캔 시작
scan_devices = await BleakScanner.discover()
print('Scan End')
# 스캔된 device 탐색
for device in scan_devices:
# 이름이 없는 Device는 "Unknown Device"로 표시
device_name = device.name if device.name else "Unknown Device"
# 이름에 TS100을 포함하는 기기만 선택
if "TS100" in device_name:
ts100_device.append(device)
print(device_name)
return ts100_device
# GUI로 스캔된 기기 목록을 화면에 나타내는 함수
def show_scan_devices(devices):
if not devices:
return False
window = tk.Tk()
# 화면 제목 설정
window.title("TS100 Scan List")
buttons = []
for device in devices:
text = device.name
# 각 기기에 대한 버튼 생성 (클릭 시 button_click 함수 호출)
button = tk.Button(window, text=text, command=lambda device=device: button_click(device, window))
button.pack()
buttons.append(button)
# GUI 창이 닫힐 때까지 실행
window.mainloop()
# 버튼 클릭 시 호출되는 함수
def button_click(device, window):
global selected_device # 전역변수로 선택된 기기 정보 저장
selected_device = device # 선택한 Device를 selected_device 변수에 저장
# GUI 창 닫기
window.quit()
# 선택한 바나나 체온계에 연결하는 함수
async def connect(candidate_device):
print('connect start')
selected_client = BleakClient(candidate_device)
try:
# 선택한 기기에 연결 시도
await selected_client.connect()
print('connected')
except Exception as e:
# 연결 실패 시 에러 메시지 출력
print('error: ', e, end='')
return None
return selected_client
# 연결된 기기의 Service와 Characteristic을 확인하는 함수
async def get_service_and_characteristic(connected_device):
# 모든 Service 확인 (제대로 연결되었는지 테스트용)
for service in connected_device.services:
# Health Thermometer 관련 Service일 때만 Characteristic 검색
if service.uuid == health_thermometer_service:
for characteristic in service.characteristics:
# 날짜정보 업데이트
if characteristic.uuid == date_time_characteristic:
await connected_device.write_gatt_char(characteristic, current_date())
# 온도정보 notify 설정 (온도 데이터를 자동으로 받기 위해)
if characteristic.uuid == temperature_characteristic:
await connected_device.start_notify(characteristic, notify_callback)
# notify를 수신했을 때 호출되는 함수 (온도 데이터를 받을 때마다 실행됨)
def notify_callback(handle, data):
parse_temperature_information(data)
# 온도정보를 변환하고 저장하는 함수
def parse_temperature_information(data: bytearray):
# 온도정보 변환
temperature = temperature_calculate(data[1], data[2], data[3], data[4])
array_slice = data[5:]
date_array = list(map(int, array_slice))
# 날짜정보 변환
date = date_calculate(date_array)
# 온도, 날짜정보 출력
print(date)
print(temperature)
# 데이터베이스에 온도 정보 저장
cursor.execute("INSERT INTO TEMPERATURE (name, temperature, date) VALUES (?, ?, ?)", (selected_device.name, temperature, date))
conn.commit()
# 온도 데이터를 리스트에 추가 (그래프 표시용)
temperature_list.append(temperature)
# 20개의 데이터만 graph_temperature_data 배열에 담을 수 있도록 설정
graph_temperature_data = temperature_list
if len(temperature_list) > 20:
graph_temperature_data = temperature_list[-20:]
# 그래프 업데이트
line_chart[0].set_data(range(len(graph_temperature_data)), graph_temperature_data)
plt.pause(1) # 그래프 갱신을 위해 잠시 대기
# 온도정보를 계산하는 함수
def temperature_calculate(t1, t2, t3, d):
# 바이트 데이터를 정수로 변환하고 비트 연산을 통해 온도값 계산
int_t1 = int(t1)
int_t2 = int(t2) << 8 # t2의 값을 왼쪽으로 8비트 이동
int_t3 = int(t3) << 16 # t3의 값을 왼쪽으로 16비트 이동
signed_value = int_t1 + int_t2 + int_t3
# 소수점 자릿수 결정
digit = int(d)
if digit > 127:
digit -= 256
# 최종 온도 계산
temperature = float(signed_value) * 10 ** digit
# 소수점 1자리까지 반올림
temperature = round(temperature * 10) / 10
return temperature
# 날짜정보를 계산하는 함수
def date_calculate(array):
year = (array[1] & 0x0F) << 8 | (array[0] & 0x00FF)
month = array[2]
day = array[3]
hour = array[4]
minute = array[5]
second = array[6]
date_component = datetime(year, month, day, hour, minute, second)
return date_component
# 현재 시간을 바이트 형식으로 반환하는 함수
def current_date():
current_datetime = datetime.now()
year = current_datetime.year
month = current_datetime.month
day = current_datetime.day
hour = current_datetime.hour
minute = current_datetime.minute
second = current_datetime.second
result = [
year & 0xFF,
(year >> 8) & 0xFF,
month & 0xFF,
day & 0xFF,
hour & 0xFF,
minute & 0xFF,
second & 0xFF
]
return bytes(result)
# 그래프를 설정하는 함수
def set_graph():
global line_chart
# 그래프 설정
plt.ion() # 대화형 모드 활성화 (그래프를 실시간으로 업데이트하기 위해)
fig, ax = plt.subplots()
line = ax.plot([], [], label='Temperature', marker='o', lw=1)
ax.set_ylim(20, 40) # y축 범위 설정 (온도 범위)
ax.set_xlim(0, 20) # x축 범위 설정 (데이터 개수)
ax.set_ylabel('Temperature (°C)')
ax.legend()
# 전역변수에 차트정보 저장
line_chart = line
# 연결이 유지되는 동안 프로그램이 종료되지 않도록 하는 함수
async def wait_connect():
while connected_device.is_connected:
await asyncio.sleep(1)
selected_device = None
# Service, Characteristic UUID 정의
health_thermometer_service = '00001809-0000-1000-8000-00805f9b34fb'
date_time_characteristic = '00002a08-0000-1000-8000-00805f9b34fb'
temperature_characteristic = '00002a1c-0000-1000-8000-00805f9b34fb'
temperature_list = [] # 온도 데이터를 저장할 리스트
line_chart = None # 그래프 객체를 저장할 변수
# SQLite 데이터베이스 연결 (temperature.db 파일 생성됨)
conn = sqlite3.connect("temperature.db")
# SQL 동작을 위해 cursor 오브젝트 생성
cursor = conn.cursor()
connected_device = None
if __name__ == '__main__':
# 데이터베이스 테이블 생성 (이미 존재하면 무시)
try:
cursor.execute("CREATE TABLE TEMPERATURE(name text, temperature double, date datetime)")
except:
# 이미 테이블이 실행되었다면 구문 콘솔 출력
print("database already created")
# 비동기 이벤트 루프 생성
loop = asyncio.get_event_loop()
# BLE 기기 스캔 실행
scan_devices = loop.run_until_complete(scan())
# 스캔된 기기 목록 GUI로 표시
show_scan_devices(scan_devices)
# 기기가 선택되었다면
if selected_device is not None:
# 선택된 기기에 연결
connected_device = loop.run_until_complete(connect(selected_device))
# 그래프 설정
set_graph()
# 바나나 체온계와 연결되었다면
if connected_device is not None:
# Service와 Characteristic 설정
loop.run_until_complete(get_service_and_characteristic(connected_device))
# 연결이 유지되는 동안 프로그램 실행
loop.run_until_complete(wait_connect())
Last updated