4.1.4. 2대이상 기기 연결

지금까지 1대의 장치로 부터 정보를 얻는 것을 살펴보았다면, 이제 보다 일반적인 목적을 위해서 여러대의 장치에서 동시에 정보를 받는 방법을 알아보고자 한다. 다만 여기서는 새로운 디바이스를 동일한 바나나 체온계를 사용하여 보여준다.

본 장에서는 2대 이상의 바나나 체온계에 연결하여 온도정보를 수신하는 방법에 대해 서술한다. 기존에 사용했던 코드를 활용해 진행한다.

프로젝트 생성

새로운 프로젝트에서 사용될 파일을 생성하기 위해 홈화면(라즈베리파이의 바탕 화면)에서 우클릭 → New Folder 버튼을 클릭해 폴더를 생성한다. 여기서는 Multi-TS100s-Project 이름을 가진 폴더를 생성한다.

이후 Visual Studio Code를 열고 Open Folder 버튼을 클릭해 기존에 만들었던 TS100-Project 폴더 선택 후 Open 버튼을 클릭해 폴더를 연다.

이후 다음과 같이 main.py 파일을 생성한다.

화면 레이아웃 설정

2대 이상 스캔된 화면을 그리기 위해 코드를 작성한다.

import tkinter as tk # GUI 라이브러리
from tkinter import *

scan_frame = None
scroll_frame = None

def create_ui(root):
    global scan_frame
    global scroll_frame

    # UI 제목
    root.title("TS100-Project")

    # Bottom 프레임, Scan TS100, Temperature Information을 포함함
    bottom_frame = Frame(root)

    # Scan리스트 Frame
    scan_ts100_frame = LabelFrame(bottom_frame, relief="solid", bd=1, text="Scan TS100", width=115)
    scan_ts100_frame.grid(row=0, column=0, padx=5, pady=5, sticky="nsew")

    # 전역변수 할당
    scan_frame = scan_ts100_frame

    # 온도정보 Frame
    temperature_information_frame = LabelFrame(bottom_frame, relief="solid", bd=1, text="Temperature Information")
    canvas = Canvas(temperature_information_frame, borderwidth=0)
    scrollbar = Scrollbar(temperature_information_frame, orient="vertical", command=canvas.yview)
    scrollable_frame = Frame(canvas)

    # 전역변수 할당
    scroll_frame = scrollable_frame

    scrollable_frame.bind(
        "<Configure>",
        lambda e: canvas.configure(scrollregion=canvas.bbox("all"))
    )

    canvas.create_window((0, 0), window=scrollable_frame, anchor="nw")
    canvas.configure(yscrollcommand=scrollbar.set)

    temperature_information_frame.grid(row=0, column=1, padx=5, pady=5, sticky="nsew")
    canvas.pack(side="left", fill="both", expand=True)
    scrollbar.pack(side="right", fill="y", expand=True)

    # Scan 버튼
    scan_button = Button(root, text="Scan", command=scan_button_click)
    scan_button.pack()

    # 하위 프레임
    bottom_frame.pack(side="bottom", fill="both", expand=True)


# Scan 버튼클릭 시 호출되는 함수
def scan_button_click(scan_frame):
    print("Scan Button Click")

if __name__ == "__main__":
    root = tk.Tk()

    create_ui(root)

    # 이벤트루프 생성
    loop = asyncio.get_event_loop()

    # 비동기로 UI 계속 업데이트
    def asyncio_loop():
        loop.call_soon(loop.stop)
        loop.run_forever()
        root.after(100, asyncio_loop)

    root.after(100, asyncio_loop)
    root.mainloop()

이후 코드를 실행하면 프로젝트 화면이 나타나고, 상단의 Scan 버튼을 클릭하면 Scan Button Click이 콘솔에 출력되는것을 확인할 수 있다.

블루투스 기기 Scan

2대의 기기를 연결하기 위해서는 scan_button_click(), Scan 관련 함수를 수정한다. 비동기 처리를 위해 asyncio을 import한다.

import asyncio # 비동기 프로그래밍을 위한 모듈
from bleak import BleakScanner

...

# 스캔된 버튼을 담고있는 배열
scan_devices_list = []

def remove_button():
    # 기존에 scan했던 기기 화면에서 제거
    for button in scan_devices_list:
        root.after(0, button.destroy)

def create_scan_devices(ts100_scan_list):
    global scan_devices_list

    # Scan했던 Device 버튼 생성
    for device in ts100_scan_list:
        button = Button(scan_frame, text=device.name, command=lambda device=device: ts100_device_button_click(device))
        button.pack()
        scan_devices_list.append(button)

def scan_button_click():
    remove_button()

    # 이벤트루프 생성해 BLE 기기 scan
    loop = asyncio.get_event_loop()
    ts100_scan_list = loop.run_until_complete(scan())

    create_scan_devices(ts100_scan_list)

async def scan():
    ts100_device = []

    print('Scan Start')
    # scan start
    scan_devices = await BleakScanner.discover()
    print('Scan End')

    for device in scan_devices:
        device_name = device.name if device.name else "Unknown Device"
        if "TS100" in device_name:
            ts100_device.append(device)
            print(device_name)

def scan_device_button_click(device):
    print("Selected Device: " + device.name)

이후 코드를 실행하면 Scan된 기기가 좌측에 출력된다. Scan 버튼을 계속 클릭하면 Scan된 기기가 업데이트 되는것을 확인할 수 있다.

지금까지의 내용이 반영된 코드이다.

import tkinter as tk # GUI 라이브러리
from tkinter import *
import asyncio # 비동기 프로그래밍을 위한 모듈
from bleak import BleakScanner

scan_frame = None
scroll_frame = None

def create_ui(root):
  global scan_frame
  global scroll_frame

  # UI 제목
  root.title("TS100-Project")

  # Bottom 프레임, Scan TS100, Temperature Information을 포함함
  bottom_frame = Frame(root)

  # Scan리스트 Frame
  scan_ts100_frame = LabelFrame(bottom_frame, relief="solid", bd=1, text="Scan TS100", width=115)
  scan_ts100_frame.grid(row=0, column=0, padx=5, pady=5, sticky="nsew")

  # 전역변수 할당
  scan_frame = scan_ts100_frame

  # 온도정보 Frame
  temperature_information_frame = LabelFrame(bottom_frame, relief="solid", bd=1, text="Temperature Information")
  canvas = Canvas(temperature_information_frame, borderwidth=0)
  scrollbar = Scrollbar(temperature_information_frame, orient="vertical", command=canvas.yview)
  scrollable_frame = Frame(canvas)

  # 전역변수 할당
  scroll_frame = scrollable_frame

  scrollable_frame.bind(
      "<Configure>",
      lambda e: canvas.configure(scrollregion=canvas.bbox("all"))
  )

  canvas.create_window((0, 0), window=scrollable_frame, anchor="nw")
  canvas.configure(yscrollcommand=scrollbar.set)

  temperature_information_frame.grid(row=0, column=1, padx=5, pady=5, sticky="nsew")
  canvas.pack(side="left", fill="both", expand=True)
  scrollbar.pack(side="right", fill="y", expand=True)

  # Scan 버튼
  scan_button = Button(root, text="Scan", command=scan_button_click)
  scan_button.pack()

  # 하위 프레임
  bottom_frame.pack(side="bottom", fill="both", expand=True)

'''
  Scan
'''

# 스캔된 버튼을 담고있는 배열
scan_devices_list = []

def remove_button():
  # 기존에 scan했던 기기 화면에서 제거
  for button in scan_devices_list:
      root.after(0, button.destroy)
      scan_devices_list.remove(button)

def create_scan_devices(ts100_scan_list):
  global scan_devices_list

  # Scan했던 Device 버튼 생성
  for device in ts100_scan_list:
      button = Button(scan_frame, text=device.name, command=lambda device=device: scan_device_button_click(device))
      button.pack()
      scan_devices_list.append(button)

def scan_button_click():
  remove_button()

  # 이벤트루프 생성해 BLE 기기 scan
  loop = asyncio.get_event_loop()
  ts100_scan_list = loop.run_until_complete(scan())

  create_scan_devices(ts100_scan_list)

async def scan():
  ts100_device = []

  print('Scan Start')
  # scan start
  scan_devices = await BleakScanner.discover()
  print('Scan End')

  for device in scan_devices:
      device_name = device.name if device.name else "Unknown Device"
      if "TS100" in device_name:
          ts100_device.append(device)
          print(device_name)
 
  return ts100_device

def scan_device_button_click(device):
  print("Selected Device: " + device.name)

if __name__ == "__main__":
    root = tk.Tk()

    create_ui(root)

    # 이벤트루프 생성
    loop = asyncio.get_event_loop()

    # 비동기로 UI 계속 업데이트
    def asyncio_loop():
        loop.call_soon(loop.stop)
        loop.run_forever()
        root.after(100, asyncio_loop)

    root.after(100, asyncio_loop)
    root.mainloop()

블루투스 기기 연결

Scan된 Device를 연결하는 connect 함수를 작성한다. 버튼을 클릭했을 때 connect 함수를 호출할 수 있도록 scan_device_button_click 함수도 함께 수정한다.

def scan_device_button_click(device):
    print("Selected Device: " + device.name)

    loop = asyncio.get_event_loop()
    loop.run_until_complete(connect(device))

async def connect(candidate_device):
    print('connect start')

    selected_client = BleakClient(candidate_device)

    try:
        # 장치 연결 시작
        await selected_client.connect()
        print('connected')

    except Exception as e:
        # 연결 실패시 발생
        print('error: ', e, end='')
        return None

코드수정 후 실행하면 콘솔에 출력되는것을 확인할 수 있다.

연결된 기기를 Temperature Information 영역에 나타내는 코드를 작성한다.

연결된 device를 구분하기 위해 device_info_frame_dict 딕셔너리 변수를 추가한다.

dictionary란 키와 값의 쌍을 저장하는 자료구조로, 키를 통해 값을 빠르게 검색할 수 있다. device_info_frame_dict 딕셔너리 변수는 바나나 체온계 이름이 key, 화면에 나타나는 Text Frame이 value로 이루어진 데이터를 저장한다.

async def connect(candidate_device):
    print('connect start')

    selected_client = BleakClient(candidate_device)

    try:
        # 장치 연결 시작
        await selected_client.connect()
        print('connected')

        # 연결된 기기 화면에 생성
        root.after(0, create_device_info_frame, candidate_device, selected_client)

    except Exception as e:
        # 연결 실패시 발생
        print('error: ', e, end='')
        return None

# dictionary 선언(key: device_name, value: Text Frame)
device_info_frame_dict = {}

# 연결된 기기 Frame을 생성하는 함수
def create_device_info_frame(device, ble_client):
    global scroll_frame
    global device_info_frame_dict

    # Device 영역 제거
    def remove_frame():
      print("click")

    frame = Frame(scroll_frame, relief="solid", bd=1, padx=5, pady=5)
    frame.pack()

    # TitleFrame: 좌측 상단에 제목이 표시되는 프레임
    title_frame = Frame(frame, padx=5, pady=5)
    title_label = Label(title_frame, text=device.name, font=("Helvetica", 14))
    title_label.pack(side="left")

    # NotifyStartFrame: 우측 상단에 X 버튼이 표시되는 프레임
    # NotifyStartFrame: 우측 상단에 notify를 시작하는 버튼이 표시되는 프레임
    remove_button_frame = Frame(frame, padx=5, pady=5)
    remove_button = Button(remove_button_frame, text="X", command=remove_frame, bg="red", fg="white")
    remove_button.pack()

    # ContentFrame: 하단에 내용을 보여주는 프레임
    content_frame = Frame(frame, padx=10, pady=10)
    content_label_text = StringVar()
    content_label = Label(content_frame, textvariable=content_label_text, font=("Helvetica", 14))
    content_label_text.set("Connect")
    content_label.pack(side="bottom")

    # 배치
    title_frame.grid(row=0, column=0, sticky="ew")
    notify_button_frame.grid(row=0, column=1, sticky="e")
    content_frame.grid(row=1, column=0, columnspan=2, sticky="nsew")

    # 그리드 가중치 설정 (윈도우 크기 조절에 대응하기 위해)
    scroll_frame.grid_rowconfigure(1, weight=1)
    scroll_frame.grid_columnconfigure(0, weight=1)

    # dictionary에 추가
    device_info_frame_dict[device.name] = content_label_text

    # TitleFrame: 좌측 상단에 제목이 표시되는 프레임
    title_frame = Frame(frame, padx=5, pady=5)
    title_label = Label(title_frame, text=device.name, font=("Helvetica", 14))
    title_label.pack(side="left")

    # NotifyStartFrame: 우측 상단에 X 버튼이 표시되는 프레임
    # NotifyStartFrame: 우측 상단에 notify를 시작하는 버튼이 표시되는 프레임
    remove_button_frame = Frame(frame, padx=5, pady=5)
    remove_button = Button(remove_button_frame, text="X", command=remove_frame, bg="red", fg="white")
    remove_button.pack()

    # ContentFrame: 하단에 내용을 보여주는 프레임
    content_frame = Frame(frame, padx=10, pady=10)
    content_label_text = StringVar()
    content_label = Label(content_frame, textvariable=content_label_text, font=("Helvetica", 14))
    content_label_text.set("Connected!")
    content_label.pack(side="bottom")

    # 배치
    title_frame.grid(row=0, column=0, sticky="ew")
    notify_button_frame.grid(row=0, column=1, sticky="e")
    content_frame.grid(row=1, column=0, columnspan=2, sticky="nsew")

    # 그리드 가중치 설정 (윈도우 크기 조절에 대응하기 위해)
    scroll_frame.grid_rowconfigure(1, weight=1)
    scroll_frame.grid_columnconfigure(0, weight=1)

    # dictionary에 추가
    device_info_frame_dict[device.name] = content_label_text

코드수정 후 실행하면 연결된 기기가 Temperature Information 화면에 출력되는것을 확인할 수 있다.

Device 연결에 성공하면 기존에 좌측화면에서 클릭했던 기기를 삭제할 수 있도록 remove_button_click 함수를 수정한다.

# 연결된 기기는 버튼 리스트에서 삭제할 수 있도록 수정
def remove_button(button_name=None):
    if button_name is not None:
        if not scan_devices_list:
            return

        remove_button = None
        for device in scan_devices_list:
            if device.cget("text") == button_name:
                remove_button = device
                break
       
        if remove_button:
            root.after(0, remove_button.destroy)
            scan_devices_list.remove(remove_button)
        return

    # 기존에 scan했던 기기 화면에서 제거
    for button in scan_devices_list:
        root.after(0, button.destroy)
        scan_devices_list.remove(button)
...

async def connect(candidate_device):
    print('connect start')

    selected_client = BleakClient(candidate_device)

    try:
        # 장치 연결 시작
        await selected_client.connect()
        print('connected')

        # scan_ts100에서 연결된 기기 삭제
        root.after(0, remove_button_click, candidate_device.name)
        # 연결된 기기 화면에 생성
        root.after(0, create_device_info_frame, candidate_device, selected_client)

    except Exception as e:
        # 연결 실패시 발생
        print('error: ', e, end='')
        return None

코드수정 후 실행하면 연결한 Device는 Scan TS100 영역에서 사라진것을 확인할 수 있다.

지금까지의 내용이 반영된 코드이다.

import tkinter as tk # GUI 라이브러리
from tkinter import *
import asyncio # 비동기 프로그래밍을 위한 모듈
from bleak import BleakScanner, BleakClient # Bluetooth Low Energy(BLE) 통신을 위한 모듈

scan_frame = None
scroll_frame = None

def create_ui(root):
    global scan_frame
    global scroll_frame

    # UI 제목
    root.title("TS100-Project")

    # Bottom 프레임, Scan TS100, Temperature Information을 포함함
    bottom_frame = Frame(root)

    # Scan리스트 Frame
    scan_ts100_frame = LabelFrame(bottom_frame, relief="solid", bd=1, text="Scan TS100", width=115)
    scan_ts100_frame.grid(row=0, column=0, padx=5, pady=5, sticky="nsew")

    # 전역변수 할당
    scan_frame = scan_ts100_frame

    # 온도정보 Frame
    temperature_information_frame = LabelFrame(bottom_frame, relief="solid", bd=1, text="Temperature Information")
    canvas = Canvas(temperature_information_frame, borderwidth=0)
    scrollbar = Scrollbar(temperature_information_frame, orient="vertical", command=canvas.yview)
    scrollable_frame = Frame(canvas)

    # 전역변수 할당
    scroll_frame = scrollable_frame

    scrollable_frame.bind(
        "<Configure>",
        lambda e: canvas.configure(scrollregion=canvas.bbox("all"))
    )

    canvas.create_window((0, 0), window=scrollable_frame, anchor="nw")
    canvas.configure(yscrollcommand=scrollbar.set)

    temperature_information_frame.grid(row=0, column=1, padx=5, pady=5, sticky="nsew")
    canvas.pack(side="left", fill="both", expand=True)
    scrollbar.pack(side="right", fill="y", expand=True)

    # Scan 버튼
    scan_button = Button(root, text="Scan", command=scan_button_click)
    scan_button.pack()

    # 하위 프레임
    bottom_frame.pack(side="bottom", fill="both", expand=True)

'''
    Scan
'''

# 스캔된 버튼을 담고있는 배열
scan_devices_list = []

def remove_button(button_name=None):
    if button_name is not None:
        if not scan_devices_list:
            return

        remove_button = None
        for device in scan_devices_list:
            if device.cget("text") == button_name:
                remove_button = device
                break
       
        if remove_button:
            root.after(0, remove_button.destroy)
            scan_devices_list.remove(remove_button)
        return

    # 기존에 scan했던 기기 화면에서 제거
    for button in scan_devices_list:
        root.after(0, button.destroy)
        scan_devices_list.remove(button)

def create_scan_devices(ts100_scan_list):
    global scan_devices_list

    # Scan했던 Device 버튼 생성
    for device in ts100_scan_list:
        button = Button(scan_frame, text=device.name, command=lambda device=device: scan_device_button_click(device))
        button.pack()
        scan_devices_list.append(button)

def scan_button_click():
    remove_button()

    # 이벤트루프 생성해 BLE 기기 scan
    loop = asyncio.get_event_loop()
    ts100_scan_list = loop.run_until_complete(scan())

    create_scan_devices(ts100_scan_list)

async def scan():
    ts100_device = []

    print('Scan Start')
    # scan start
    scan_devices = await BleakScanner.discover()
    print('Scan End')

    for device in scan_devices:
        device_name = device.name if device.name else "Unknown Device"
        if "TS100" in device_name:
            ts100_device.append(device)
            print(device_name)
   
    return ts100_device

def scan_device_button_click(device):
    print("Selected Device: " + device.name)

    loop = asyncio.get_event_loop()
    loop.run_until_complete(connect(device))

'''
    Connect
'''

# dictionary 선언(key: device_name, value: device_text_frame)
device_info_frame_dict = {}


async def connect(candidate_device):
    print('connect start')

    selected_client = BleakClient(candidate_device)

    try:
        # 장치 연결 시작
        await selected_client.connect()
        print('connected')

        # scan_ts100에서 연결된 기기 삭제
        root.after(0, remove_button, candidate_device.name)
        # info 화면에 추가
        root.after(0, create_device_info_frame, candidate_device, selected_client)

    except Exception as e:
        # 연결 실패시 발생
        print('error: ', e, end='')
        return None

def create_device_info_frame(device, ble_client):
    global scroll_frame
    global device_info_frame_dict
    global sender_dict

    # Device 영역 제거
    def remove_frame():
        # 프레임에서 삭제
        frame.destroy()

        # BLE 연결 해제
        loop = asyncio.get_event_loop()
        loop.run_until_complete(ble_client.disconnect())

        # dictionary 삭제
        del device_info_frame_dict[device.name]
        remove_characteristic_by_value(device.name)

    # value값으로 dictionary 값 삭제하는 함수
    def remove_characteristic_by_value(name):
        keys_to_remove = [key for key, value in sender_dict.items() if value == name]
        for key in keys_to_remove:
            del sender_dict[key]

    frame = Frame(scroll_frame, relief="solid", bd=1, padx=5, pady=5)
    frame.pack()

    # TitleFrame: 좌측 상단에 제목이 표시되는 프레임
    title_frame = Frame(frame, padx=5, pady=5)
    title_label = Label(title_frame, text=device.name, font=("Helvetica", 14))
    title_label.pack(side="left")

    # NotifyStartFrame: 우측 상단에 notify를 시작하는 버튼이 표시되는 프레임
    remove_button_frame = Frame(frame, padx=5, pady=5)
    remove_button = Button(remove_button_frame, text="X", command=remove_frame, bg="red", fg="white")
    remove_button.pack()

    # ContentFrame: 하단에 내용을 보여주는 프레임
    content_frame = Frame(frame, padx=10, pady=10)
    content_label_text = StringVar()
    content_label = Label(content_frame, textvariable=content_label_text, font=("Helvetica", 14))
    content_label_text.set("Connect")
    content_label.pack(side="bottom")

    # 배치
    title_frame.grid(row=0, column=0, sticky="ew")
    remove_button_frame.grid(row=0, column=1, sticky="e")
    content_frame.grid(row=1, column=0, columnspan=2, sticky="nsew")

    # 그리드 가중치 설정 (윈도우 크기 조절에 대응하기 위해)
    scroll_frame.grid_rowconfigure(1, weight=1)
    scroll_frame.grid_columnconfigure(0, weight=1)

    # dictionary에 추가
    device_info_frame_dict[device.name] = content_label_text


if __name__ == "__main__":
    root = tk.Tk()

    create_ui(root)

    # 이벤트루프 생성
    loop = asyncio.get_event_loop()

    # 비동기로 UI 계속 업데이트
    def asyncio_loop():
        loop.call_soon(loop.stop)
        loop.run_forever()
        root.after(100, asyncio_loop)

    root.after(100, asyncio_loop)
    root.mainloop()

Service, Characteristic 요청

블루투스 연결이 완료된 후 Service, Characteristic 요청을 위해 함수를 추가한다.

# Service, Charactristic 확인하는 함수
async def get_service_and_characteristic(connected_device):
    # Service 확인
    for service in connected_device.services:
        print(service)
        # Characteristic 확인
        for characteristic in service.characteristics:
            print(characteristic)
       
        print()

connect 함수에 Service, Characteristic을 검색하는 코드를 추가한 후 프로젝트를 실행하면 전체 Service와 Characteristic을 확인할 수 있다.

async def connect(candidate_device):
    print('connect start')

    selected_client = BleakClient(candidate_device)

    try:
        # 장치 연결 시작
        await selected_client.connect()
        print('connected')


        # Characteristic, Service 요청
        await get_service_and_characteristic(candidate_device, selected_client)

        # scan_ts100에서 연결된 기기 삭제
        root.after(0, remove_button_click, candidate_device.name)

        # 연결된 기기 화면에 생성
        root.after(0, create_device_info_frame, candidate_device, selected_client)

    except Exception as e:
        # 연결 실패시 발생
        print('error: ', e, end='')
        return None

지금까지의 내용이 반영된 코드이다.

import tkinter as tk # GUI 라이브러리
from tkinter import *
import asyncio # 비동기 프로그래밍을 위한 모듈
from bleak import BleakScanner, BleakClient # Bluetooth Low Energy(BLE) 통신을 위한 모듈

scan_frame = None
scroll_frame = None

def create_ui(root):
    global scan_frame
    global scroll_frame

    # UI 제목
    root.title("TS100-Project")

    # Bottom 프레임, Scan TS100, Temperature Information을 포함함
    bottom_frame = Frame(root)

    # Scan리스트 Frame
    scan_ts100_frame = LabelFrame(bottom_frame, relief="solid", bd=1, text="Scan TS100", width=115)
    scan_ts100_frame.grid(row=0, column=0, padx=5, pady=5, sticky="nsew")

    # 전역변수 할당
    scan_frame = scan_ts100_frame

    # 온도정보 Frame
    temperature_information_frame = LabelFrame(bottom_frame, relief="solid", bd=1, text="Temperature Information")
    canvas = Canvas(temperature_information_frame, borderwidth=0)
    scrollbar = Scrollbar(temperature_information_frame, orient="vertical", command=canvas.yview)
    scrollable_frame = Frame(canvas)

    # 전역변수 할당
    scroll_frame = scrollable_frame

    scrollable_frame.bind(
        "<Configure>",
        lambda e: canvas.configure(scrollregion=canvas.bbox("all"))
    )

    canvas.create_window((0, 0), window=scrollable_frame, anchor="nw")
    canvas.configure(yscrollcommand=scrollbar.set)

    temperature_information_frame.grid(row=0, column=1, padx=5, pady=5, sticky="nsew")
    canvas.pack(side="left", fill="both", expand=True)
    scrollbar.pack(side="right", fill="y", expand=True)

    # Scan 버튼
    scan_button = Button(root, text="Scan", command=scan_button_click)
    scan_button.pack()

    # 하위 프레임
    bottom_frame.pack(side="bottom", fill="both", expand=True)

'''
    Scan
'''

# 스캔된 버튼을 담고있는 배열
scan_devices_list = []

def remove_button(button_name=None):
    if button_name is not None:
        if not scan_devices_list:
            return

        remove_button = None
        for device in scan_devices_list:
            if device.cget("text") == button_name:
                remove_button = device
                break
       
        if remove_button:
            root.after(0, remove_button.destroy)
            scan_devices_list.remove(remove_button)
        return

    # 기존에 scan했던 기기 화면에서 제거
    for button in scan_devices_list:
        root.after(0, button.destroy)
        scan_devices_list.remove(button)

def create_scan_devices(ts100_scan_list):
    global scan_devices_list

    # Scan했던 Device 버튼 생성
    for device in ts100_scan_list:
        button = Button(scan_frame, text=device.name, command=lambda device=device: scan_device_button_click(device))
        button.pack()
        scan_devices_list.append(button)

def scan_button_click():
    remove_button()

    # 이벤트루프 생성해 BLE 기기 scan
    loop = asyncio.get_event_loop()
    ts100_scan_list = loop.run_until_complete(scan())

    create_scan_devices(ts100_scan_list)

async def scan():
    ts100_device = []

    print('Scan Start')
    # scan start
    scan_devices = await BleakScanner.discover()
    print('Scan End')

    for device in scan_devices:
        device_name = device.name if device.name else "Unknown Device"
        if "TS100" in device_name:
            ts100_device.append(device)
            print(device_name)
   
    return ts100_device

def scan_device_button_click(device):
    print("Selected Device: " + device.name)

    loop = asyncio.get_event_loop()
    loop.run_until_complete(connect(device))

'''
    Connect
'''

# dictionary 선언(key: device_name, value: device_text_frame)
device_info_frame_dict = {}


async def connect(candidate_device):
    print('connect start')

    selected_client = BleakClient(candidate_device)

    try:
        # 장치 연결 시작
        await selected_client.connect()
        print('connected')

        # Characteristic, Service 요청
        await get_service_and_characteristic(candidate_device, selected_client)

        # scan_ts100에서 연결된 기기 삭제
        root.after(0, remove_button, candidate_device.name)
        # info 화면에 추가
        root.after(0, create_device_info_frame, candidate_device, selected_client)

    except Exception as e:
        # 연결 실패시 발생
        print('error: ', e, end='')
        return None

def create_device_info_frame(device, ble_client):
    global scroll_frame
    global device_info_frame_dict
    global sender_dict

    # Device 영역 제거
    def remove_frame():
        # 프레임에서 삭제
        frame.destroy()

        # BLE 연결 해제
        loop = asyncio.get_event_loop()
        loop.run_until_complete(ble_client.disconnect())

        # dictionary 삭제
        del device_info_frame_dict[device.name]
        remove_characteristic_by_value(device.name)

    # value값으로 dictionary 값 삭제하는 함수
    def remove_characteristic_by_value(name):
        keys_to_remove = [key for key, value in sender_dict.items() if value == name]
        for key in keys_to_remove:
            del sender_dict[key]

    frame = Frame(scroll_frame, relief="solid", bd=1, padx=5, pady=5)
    frame.pack()

    # TitleFrame: 좌측 상단에 제목이 표시되는 프레임
    title_frame = Frame(frame, padx=5, pady=5)
    title_label = Label(title_frame, text=device.name, font=("Helvetica", 14))
    title_label.pack(side="left")

    # NotifyStartFrame: 우측 상단에 notify를 시작하는 버튼이 표시되는 프레임
    remove_button_frame = Frame(frame, padx=5, pady=5)
    remove_button = Button(remove_button_frame, text="X", command=remove_frame, bg="red", fg="white")
    remove_button.pack()

    # ContentFrame: 하단에 내용을 보여주는 프레임
    content_frame = Frame(frame, padx=10, pady=10)
    content_label_text = StringVar()
    content_label = Label(content_frame, textvariable=content_label_text, font=("Helvetica", 14))
    content_label_text.set("Connect")
    content_label.pack(side="bottom")

    # 배치
    title_frame.grid(row=0, column=0, sticky="ew")
    remove_button_frame.grid(row=0, column=1, sticky="e")
    content_frame.grid(row=1, column=0, columnspan=2, sticky="nsew")

    # 그리드 가중치 설정 (윈도우 크기 조절에 대응하기 위해)
    scroll_frame.grid_rowconfigure(1, weight=1)
    scroll_frame.grid_columnconfigure(0, weight=1)

    # dictionary에 추가
    device_info_frame_dict[device.name] = content_label_text

'''
    notify
'''

# Service, Charactristic 확인하는 함수
async def get_service_and_characteristic(connected_device):
    # Service 확인
    for service in connected_device.services:
        print(service)
        # Characteristic 확인
        for characteristic in service.characteristics:
            print(characteristic)
       
        print()

if __name__ == "__main__":
    root = tk.Tk()

    create_ui(root)

    # 이벤트루프 생성
    loop = asyncio.get_event_loop()

    # 비동기로 UI 계속 업데이트
    def asyncio_loop():
        loop.call_soon(loop.stop)
        loop.run_forever()
        root.after(100, asyncio_loop)

    root.after(100, asyncio_loop)
    root.mainloop()

온도정보 가져오기

바나나 체온계의 여러 characteristic 중 온도정보를 받아오는 기능을 구현한다. 다음은 온도 정보에 대한 특성이다.

Service: Health Thermometer(00001809-0000-1000-8000-00805f9b34fb)

Characteristic: Temperature Measurement(00002a1c-0000-1000-8000-00805f9b34fb)

온도정보 수신

여러 Service 중 Health Thermometer와 관련된 service만 수신받기 위해 get_service_and_characteristic 함수를 수정한다.

health_thermometer_service = '00001809-0000-1000-8000-00805f9b34fb'

async def get_service_and_characteristic(candidate_device, connected_device):
    # Service 확인(제대로 연결되었는지 테스트용)
    for service in connected_device.services:
        # Health Thermometer 관련 Service일때만 Characteristic 검색
        if service.uuid == health_thermometer_service:
            for characteristic in service.characteristics:
                print(characteristic)
       
        print()

이후 온도정보 characteristic을 통해 notify를 수신하기 위해 함수를 수정한다.

health_thermometer_service = '00001809-0000-1000-8000-00805f9b34fb'

temperature_characteristic = '00002a1c-0000-1000-8000-00805f9b34fb'


async def get_service_and_characteristic(candidate_device, connected_device):
    # Service 확인
    for service in connected_device.services:
        # Health Thermometer 관련 Service일때만 Characteristic 검색
        if service.uuid == health_thermometer_service:
            for characteristic in service.characteristics:
                # 온도정보 notify 설정
                if characteristic.uuid == temperature_characteristic:
                    await connected_device.start_notify(characteristic, notify_callback)

# notify를 수신했을 때 호출되는 함수
def notify_callback(handle, data):
    print(data)

2대이상의 기기에서 수신된 온도정보를 구분하기 위해 sender_dict라는 dictionary 자료형을 추가하고, get_service_and_characteristic 함수를 수정한다. sender_dict에는 characteristic이 key, 바나나 체온계 이름이 value로 이루어진 데이터를 저장한다.

health_thermometer_service = '00001809-0000-1000-8000-00805f9b34fb'

temperature_characteristic = '00002a1c-0000-1000-8000-00805f9b34fb'

# dictionary 선언(key: characteristic, value: device_name)
sender_dict = {}


async def get_service_and_characteristic(candidate_device, connected_device):
    # Service 확인
    for service in connected_device.services:
        # Health Thermometer 관련 Service일때만 Characteristic 검색
        if service.uuid == health_thermometer_service:
            for characteristic in service.characteristics:
                # 온도정보 notify 설정
                if characteristic.uuid == temperature_characteristic:
                    # Dictionary에 추가
                    if characteristic not in sender_dict:
                        sender_dict[characteristic] = candidate_device.name
                    await connected_device.start_notify(characteristic, notify_callback)

# notify를 수신했을 때 호출되는 함수
def notify_callback(handle, data):
    print(data)

이후 코드실행 후 바나나 체온계와 연결하면 수신된 온도정보가 콘솔창에 출력되는것을 확인할 수 있다.

지금까지의 내용이 반영된 코드이다.

import tkinter as tk # GUI 라이브러리
from tkinter import *
import asyncio # 비동기 프로그래밍을 위한 모듈
from bleak import BleakScanner, BleakClient # Bluetooth Low Energy(BLE) 통신을 위한 모듈

scan_frame = None
scroll_frame = None

def create_ui(root):
    global scan_frame
    global scroll_frame

    # UI 제목
    root.title("TS100-Project")

    # Bottom 프레임, Scan TS100, Temperature Information을 포함함
    bottom_frame = Frame(root)

    # Scan리스트 Frame
    scan_ts100_frame = LabelFrame(bottom_frame, relief="solid", bd=1, text="Scan TS100", width=115)
    scan_ts100_frame.grid(row=0, column=0, padx=5, pady=5, sticky="nsew")

    # 전역변수 할당
    scan_frame = scan_ts100_frame

    # 온도정보 Frame
    temperature_information_frame = LabelFrame(bottom_frame, relief="solid", bd=1, text="Temperature Information")
    canvas = Canvas(temperature_information_frame, borderwidth=0)
    scrollbar = Scrollbar(temperature_information_frame, orient="vertical", command=canvas.yview)
    scrollable_frame = Frame(canvas)

    # 전역변수 할당
    scroll_frame = scrollable_frame

    scrollable_frame.bind(
        "<Configure>",
        lambda e: canvas.configure(scrollregion=canvas.bbox("all"))
    )

    canvas.create_window((0, 0), window=scrollable_frame, anchor="nw")
    canvas.configure(yscrollcommand=scrollbar.set)

    temperature_information_frame.grid(row=0, column=1, padx=5, pady=5, sticky="nsew")
    canvas.pack(side="left", fill="both", expand=True)
    scrollbar.pack(side="right", fill="y", expand=True)

    # Scan 버튼
    scan_button = Button(root, text="Scan", command=scan_button_click)
    scan_button.pack()

    # 하위 프레임
    bottom_frame.pack(side="bottom", fill="both", expand=True)

'''
    Scan
'''

# 스캔된 버튼을 담고있는 배열
scan_devices_list = []

def remove_button(button_name=None):
    if button_name is not None:
        if not scan_devices_list:
            return

        remove_button = None
        for device in scan_devices_list:
            if device.cget("text") == button_name:
                remove_button = device
                break
       
        if remove_button:
            root.after(0, remove_button.destroy)
            scan_devices_list.remove(remove_button)
        return

    # 기존에 scan했던 기기 화면에서 제거
    for button in scan_devices_list:
        root.after(0, button.destroy)
        scan_devices_list.remove(button)

def create_scan_devices(ts100_scan_list):
    global scan_devices_list

    # Scan했던 Device 버튼 생성
    for device in ts100_scan_list:
        button = Button(scan_frame, text=device.name, command=lambda device=device: scan_device_button_click(device))
        button.pack()
        scan_devices_list.append(button)

def scan_button_click():
    remove_button()

    # 이벤트루프 생성해 BLE 기기 scan
    loop = asyncio.get_event_loop()
    ts100_scan_list = loop.run_until_complete(scan())

    create_scan_devices(ts100_scan_list)

async def scan():
    ts100_device = []

    print('Scan Start')
    # scan start
    scan_devices = await BleakScanner.discover()
    print('Scan End')

    for device in scan_devices:
        device_name = device.name if device.name else "Unknown Device"
        if "TS100" in device_name:
            ts100_device.append(device)
            print(device_name)
   
    return ts100_device

def scan_device_button_click(device):
    print("Selected Device: " + device.name)

    loop = asyncio.get_event_loop()
    loop.run_until_complete(connect(device))

'''
    Connect
'''

# dictionary 선언(key: device_name, value: device_text_frame)
device_info_frame_dict = {}


async def connect(candidate_device):
    print('connect start')

    selected_client = BleakClient(candidate_device)

    try:
        # 장치 연결 시작
        await selected_client.connect()
        print('connected')

        # Characteristic, Service 요청
        await get_service_and_characteristic(candidate_device, selected_client)

        # scan_ts100에서 연결된 기기 삭제
        root.after(0, remove_button, candidate_device.name)
        # info 화면에 추가
        root.after(0, create_device_info_frame, candidate_device, selected_client)

    except Exception as e:
        # 연결 실패시 발생
        print('error: ', e, end='')
        return None

def create_device_info_frame(device, ble_client):
    global scroll_frame
    global device_info_frame_dict
    global sender_dict

    # Device 영역 제거
    def remove_frame():
        # 프레임에서 삭제
        frame.destroy()

        # BLE 연결 해제
        loop = asyncio.get_event_loop()
        loop.run_until_complete(ble_client.disconnect())

        # dictionary 삭제
        del device_info_frame_dict[device.name]
        remove_characteristic_by_value(device.name)

    # value값으로 dictionary 값 삭제하는 함수
    def remove_characteristic_by_value(name):
        keys_to_remove = [key for key, value in sender_dict.items() if value == name]
        for key in keys_to_remove:
            del sender_dict[key]

    frame = Frame(scroll_frame, relief="solid", bd=1, padx=5, pady=5)
    frame.pack()

    # TitleFrame: 좌측 상단에 제목이 표시되는 프레임
    title_frame = Frame(frame, padx=5, pady=5)
    title_label = Label(title_frame, text=device.name, font=("Helvetica", 14))
    title_label.pack(side="left")

    # NotifyStartFrame: 우측 상단에 notify를 시작하는 버튼이 표시되는 프레임
    remove_button_frame = Frame(frame, padx=5, pady=5)
    remove_button = Button(remove_button_frame, text="X", command=remove_frame, bg="red", fg="white")
    remove_button.pack()

    # ContentFrame: 하단에 내용을 보여주는 프레임
    content_frame = Frame(frame, padx=10, pady=10)
    content_label_text = StringVar()
    content_label = Label(content_frame, textvariable=content_label_text, font=("Helvetica", 14))
    content_label_text.set("Connect")
    content_label.pack(side="bottom")

    # 배치
    title_frame.grid(row=0, column=0, sticky="ew")
    remove_button_frame.grid(row=0, column=1, sticky="e")
    content_frame.grid(row=1, column=0, columnspan=2, sticky="nsew")

    # 그리드 가중치 설정 (윈도우 크기 조절에 대응하기 위해)
    scroll_frame.grid_rowconfigure(1, weight=1)
    scroll_frame.grid_columnconfigure(0, weight=1)

    # dictionary에 추가
    device_info_frame_dict[device.name] = content_label_text

'''
    notify
'''

health_thermometer_service = '00001809-0000-1000-8000-00805f9b34fb'

temperature_characteristic = '00002a1c-0000-1000-8000-00805f9b34fb'

# dictionary 선언(key: characteristic, value: device_name)
sender_dict = {}

# Service, Characteristic 요청하는 함수
async def get_service_and_characteristic(candidate_device, connected_device):
    # Service 확인
    for service in connected_device.services:
        # Health Thermometer 관련 Service일때만 Characteristic 검색
        if service.uuid == health_thermometer_service:
            for characteristic in service.characteristics:
                # 온도정보 notify 설정
                if characteristic.uuid == temperature_characteristic:
                    # Dictionary에 추가
                    if characteristic not in sender_dict:
                        sender_dict[characteristic] = candidate_device.name
                    await connected_device.start_notify(characteristic, notify_callback)

# notify를 수신했을 때 호출되는 함수
def notify_callback(handle, data):
    print(data)

if __name__ == "__main__":
    root = tk.Tk()

    create_ui(root)

    # 이벤트루프 생성
    loop = asyncio.get_event_loop()

    # 비동기로 UI 계속 업데이트
    def asyncio_loop():
        loop.call_soon(loop.stop)
        loop.run_forever()
        root.after(100, asyncio_loop)

    root.after(100, asyncio_loop)
    root.mainloop()

온도정보 변환

바나나 체온계로부터 수신받은 데이터는 온도정보와 날짜정보로 이루어져 있다. 온도정보를 변환하는 temperature_calculate 함수를 작성한다.

# 온도정보 계산하는 함수
def temperature_calculate(t1, t2, t3, d):
    # Int로 변환, 비트 시프트 후 합산
    int_t1 = int(t1)
    int_t2 = int(t2) << 8
    int_t3 = int(t3) << 16

    signed_value = int_t1 + int_t2 + int_t3

    # 자릿수 정하는 변수
    digit = int(d)
    if digit > 127:
        digit -= 256

    # 온도
    temperature = float(signed_value) * 10 ** digit
    # 소수점 1자리만 남도록 수정
    temperature = round(temperature * 10) / 10

    return temperature

기존에 작성했던 notify_callback 함수를 온도정보를 받아와 출력하는 함수로 변경한다.

# notify를 수신했을 때 호출되는 함수
def notify_callback(handle, data):
    parse_temperature_information(data)

# 온도정보 변환
def parse_temperature_information(data: bytearray):
    # 온도정보 변환
    temperature = temperature_calculate(data[1], data[2], data[3], data[4])

    # 온도정보 출력
    print(temperature)

    return temperature

이후 프로젝트를 실행하면 변환된 온도정보가 출력되는것을 확인할 수 있다.

온도정보 화면에 출력

바나나 체온계로부터 수신받은 온도정보를 화면에 출력하기 위해 notify_callback 함수를 수정한다.

# notify를 수신했을 때 호출되는 함수
def notify_callback(handle, data):
    temperature = parse_temperature_information(data)

    # 수신된 기기 이름 확인
    device_name = sender_dict.get(handle)

    if device_name is None:
        return
   
    # 업데이트할 텍스트 영역 확인 후 업데이트
    string_var = device_info_frame_dict.get(device_name)

    if string_var is None:
        return

    string_var.set(f"{temperature}°C")

다음은 온도정보를 업데이트하는 순서이다.

  1. sender_info dictionary에서 수신된 characteristic을 가진 이름을 획득한다.

  2. device_info_frame_dict에서 바나나 체온계 이름을 key로 가진 온도정보 라벨 프레임을 획득한다.

  3. set() 함수를 통해 온도정보를 업데이트한다.

코드실행 후 2대 연결과정을 수행하면 온도정보가 화면에 업데이트 되는것을 확인할 수 있다.

지금까지의 내용이 반영된 코드이다.

import tkinter as tk # GUI 라이브러리
from tkinter import *
import asyncio # 비동기 프로그래밍을 위한 모듈
from bleak import BleakScanner, BleakClient # Bluetooth Low Energy(BLE) 통신을 위한 모듈

scan_frame = None
scroll_frame = None

def create_ui(root):
    global scan_frame
    global scroll_frame

    # UI 제목
    root.title("TS100-Project")

    # Bottom 프레임, Scan TS100, Temperature Information을 포함함
    bottom_frame = Frame(root)

    # Scan리스트 Frame
    scan_ts100_frame = LabelFrame(bottom_frame, relief="solid", bd=1, text="Scan TS100", width=115)
    scan_ts100_frame.grid(row=0, column=0, padx=5, pady=5, sticky="nsew")

    # 전역변수 할당
    scan_frame = scan_ts100_frame

    # 온도정보 Frame
    temperature_information_frame = LabelFrame(bottom_frame, relief="solid", bd=1, text="Temperature Information")
    canvas = Canvas(temperature_information_frame, borderwidth=0)
    scrollbar = Scrollbar(temperature_information_frame, orient="vertical", command=canvas.yview)
    scrollable_frame = Frame(canvas)

    # 전역변수 할당
    scroll_frame = scrollable_frame

    scrollable_frame.bind(
        "<Configure>",
        lambda e: canvas.configure(scrollregion=canvas.bbox("all"))
    )

    canvas.create_window((0, 0), window=scrollable_frame, anchor="nw")
    canvas.configure(yscrollcommand=scrollbar.set)

    temperature_information_frame.grid(row=0, column=1, padx=5, pady=5, sticky="nsew")
    canvas.pack(side="left", fill="both", expand=True)
    scrollbar.pack(side="right", fill="y", expand=True)

    # Scan 버튼
    scan_button = Button(root, text="Scan", command=scan_button_click)
    scan_button.pack()

    # 하위 프레임
    bottom_frame.pack(side="bottom", fill="both", expand=True)

'''
    Scan
'''

# 스캔된 버튼을 담고있는 배열
scan_devices_list = []

def remove_button(button_name=None):
    if button_name is not None:
        if not scan_devices_list:
            return

        remove_button = None
        for device in scan_devices_list:
            if device.cget("text") == button_name:
                remove_button = device
                break
       
        if remove_button:
            root.after(0, remove_button.destroy)
            scan_devices_list.remove(remove_button)
        return

    # 기존에 scan했던 기기 화면에서 제거
    for button in scan_devices_list:
        root.after(0, button.destroy)
        scan_devices_list.remove(button)

def create_scan_devices(ts100_scan_list):
    global scan_devices_list

    # Scan했던 Device 버튼 생성
    for device in ts100_scan_list:
        button = Button(scan_frame, text=device.name, command=lambda device=device: scan_device_button_click(device))
        button.pack()
        scan_devices_list.append(button)

def scan_button_click():
    remove_button()

    # 이벤트루프 생성해 BLE 기기 scan
    loop = asyncio.get_event_loop()
    ts100_scan_list = loop.run_until_complete(scan())

    create_scan_devices(ts100_scan_list)

async def scan():
    ts100_device = []

    print('Scan Start')
    # scan start
    scan_devices = await BleakScanner.discover()
    print('Scan End')

    for device in scan_devices:
        device_name = device.name if device.name else "Unknown Device"
        if "TS100" in device_name:
            ts100_device.append(device)
            print(device_name)
   
    return ts100_device

def scan_device_button_click(device):
    print("Selected Device: " + device.name)

    loop = asyncio.get_event_loop()
    loop.run_until_complete(connect(device))

'''
    Connect
'''

# dictionary 선언(key: device_name, value: device_text_frame)
device_info_frame_dict = {}


async def connect(candidate_device):
    print('connect start')

    selected_client = BleakClient(candidate_device)

    try:
        # 장치 연결 시작
        await selected_client.connect()
        print('connected')

        # Characteristic, Service 요청
        await get_service_and_characteristic(candidate_device, selected_client)

        # scan_ts100에서 연결된 기기 삭제
        root.after(0, remove_button, candidate_device.name)
        # info 화면에 추가
        root.after(0, create_device_info_frame, candidate_device, selected_client)

    except Exception as e:
        # 연결 실패시 발생
        print('error: ', e, end='')
        return None

def create_device_info_frame(device, ble_client):
    global scroll_frame
    global device_info_frame_dict
    global sender_dict

    # Device 영역 제거
    def remove_frame():
        # 프레임에서 삭제
        frame.destroy()

        # BLE 연결 해제
        loop = asyncio.get_event_loop()
        loop.run_until_complete(ble_client.disconnect())

        # dictionary 삭제
        del device_info_frame_dict[device.name]
        remove_characteristic_by_value(device.name)

    # value값으로 dictionary 값 삭제하는 함수
    def remove_characteristic_by_value(name):
        keys_to_remove = [key for key, value in sender_dict.items() if value == name]
        for key in keys_to_remove:
            del sender_dict[key]

    frame = Frame(scroll_frame, relief="solid", bd=1, padx=5, pady=5)
    frame.pack()

    # TitleFrame: 좌측 상단에 제목이 표시되는 프레임
    title_frame = Frame(frame, padx=5, pady=5)
    title_label = Label(title_frame, text=device.name, font=("Helvetica", 14))
    title_label.pack(side="left")

    # NotifyStartFrame: 우측 상단에 notify를 시작하는 버튼이 표시되는 프레임
    remove_button_frame = Frame(frame, padx=5, pady=5)
    remove_button = Button(remove_button_frame, text="X", command=remove_frame, bg="red", fg="white")
    remove_button.pack()

    # ContentFrame: 하단에 내용을 보여주는 프레임
    content_frame = Frame(frame, padx=10, pady=10)
    content_label_text = StringVar()
    content_label = Label(content_frame, textvariable=content_label_text, font=("Helvetica", 14))
    content_label_text.set("Connect")
    content_label.pack(side="bottom")

    # 배치
    title_frame.grid(row=0, column=0, sticky="ew")
    remove_button_frame.grid(row=0, column=1, sticky="e")
    content_frame.grid(row=1, column=0, columnspan=2, sticky="nsew")

    # 그리드 가중치 설정 (윈도우 크기 조절에 대응하기 위해)
    scroll_frame.grid_rowconfigure(1, weight=1)
    scroll_frame.grid_columnconfigure(0, weight=1)

    # dictionary에 추가
    device_info_frame_dict[device.name] = content_label_text

'''
    notify
'''

health_thermometer_service = '00001809-0000-1000-8000-00805f9b34fb'

temperature_characteristic = '00002a1c-0000-1000-8000-00805f9b34fb'

# dictionary 선언(key: characteristic, value: device_name)
sender_dict = {}

# Service, Characteristic 요청하는 함수
async def get_service_and_characteristic(candidate_device, connected_device):
    # Service 확인
    for service in connected_device.services:
        # Health Thermometer 관련 Service일때만 Characteristic 검색
        if service.uuid == health_thermometer_service:
            for characteristic in service.characteristics:
                # 온도정보 notify 설정
                if characteristic.uuid == temperature_characteristic:
                    # Dictionary에 추가
                    if characteristic not in sender_dict:
                        sender_dict[characteristic] = candidate_device.name
                    await connected_device.start_notify(characteristic, notify_callback)

# notify를 수신했을 때 호출되는 함수
def notify_callback(handle, data):
    temperature = parse_temperature_information(data)

    # 수신된 기기 이름 확인
    device_name = sender_dict.get(handle)

    if device_name is None:
        return
   
    # 업데이트할 텍스트 영역 확인 후 업데이트
    string_var = device_info_frame_dict.get(device_name)

    if string_var is None:
        return

    string_var.set(f"{temperature}°C")

# 온도정보 변환
def parse_temperature_information(data: bytearray):
    # 온도정보 변환
    temperature = temperature_calculate(data[1], data[2], data[3], data[4])

    # 온도정보 출력
    print(temperature)

    return temperature

# 온도정보 계산하는 함수
def temperature_calculate(t1, t2, t3, d):
    # Int로 변환, 비트 시프트 후 합산
    int_t1 = int(t1)
    int_t2 = int(t2) << 8
    int_t3 = int(t3) << 16

    signed_value = int_t1 + int_t2 + int_t3

    # 자릿수 정하는 변수
    digit = int(d)
    if digit > 127:
        digit -= 256

    # 온도
    temperature = float(signed_value) * 10 ** digit
    # 소수점 1자리만 남도록 수정
    temperature = round(temperature * 10) / 10

    return temperature

if __name__ == "__main__":
    root = tk.Tk()

    create_ui(root)

    # 이벤트루프 생성
    loop = asyncio.get_event_loop()

    # 비동기로 UI 계속 업데이트
    def asyncio_loop():
        loop.call_soon(loop.stop)
        loop.run_forever()
        root.after(100, asyncio_loop)

    root.after(100, asyncio_loop)
    root.mainloop()

블루투스 기기 연결 해제

Temperature Information 영역의 X 버튼을 클릭하면 BLE연결을 해제하는 기능을 구현하기 위해 create_device_info_frame 함수 내 remove_frame, remove_characteristic_by_value 함수를 수정한다.

수정된 코드는 UI에서 삭제, BLE연결 해제, dictionary 값 삭제 기능을 수행한다.

# 연결된 기기 Frame을 생성하는 함수
def create_device_info_frame(device, ble_client):
    global scroll_frame
    global device_info_frame_dict
    global sender_dict

    # Device 영역 제거
    def remove_frame():
        # 프레임에서 삭제
        frame.destroy()

        # BLE 연결 해제
        loop = asyncio.get_event_loop()
        loop.run_until_complete(ble_client.disconnect())

        # dictionary 삭제
        del device_info_frame_dict[device.name]
        remove_characteristic_by_value(device.name)

    # value값으로 dictionary 값 삭제하는 함수
    def remove_characteristic_by_value(name):
        keys_to_remove = [key for key, value in sender_dict.items() if value == name]
        for key in keys_to_remove:
            del sender_dict[key]

    frame = Frame(scroll_frame, relief="solid", bd=1, padx=5, pady=5)
    frame.pack()

    # TitleFrame: 좌측 상단에 제목이 표시되는 프레임
    title_frame = Frame(frame, padx=5, pady=5)
    title_label = Label(title_frame, text=device.name, font=("Helvetica", 14))
    title_label.pack(side="left")

    # NotifyStartFrame: 우측 상단에 notify를 시작하는 버튼이 표시되는 프레임
    remove_button_frame = Frame(frame, padx=5, pady=5)
    remove_button = Button(remove_button_frame, text="X", command=remove_frame, bg="red", fg="white")
    remove_button.pack()

    # ContentFrame: 하단에 내용을 보여주는 프레임
    content_frame = Frame(frame, padx=10, pady=10)
    content_label_text = StringVar()
    content_label = Label(content_frame, textvariable=content_label_text, font=("Helvetica", 14))
    content_label_text.set("Connect")
    content_label.pack(side="bottom")

    # 배치
    title_frame.grid(row=0, column=0, sticky="ew")
    remove_button_frame.grid(row=0, column=1, sticky="e")
    content_frame.grid(row=1, column=0, columnspan=2, sticky="nsew")

    # 그리드 가중치 설정 (윈도우 크기 조절에 대응하기 위해)
    scroll_frame.grid_rowconfigure(1, weight=1)
    scroll_frame.grid_columnconfigure(0, weight=1)

    # dictionary에 추가
    device_info_frame_dict[device.name] = content_label_text

이후 코드를 실행해 X 버튼을 클릭하면 화면에서 사라지고, BLE 연결이 해제되는것을 확인할 수 있다.

지금까지의 내용이 반영된 코드이다.

import tkinter as tk # GUI 라이브러리
from tkinter import *
import asyncio # 비동기 프로그래밍을 위한 모듈
from bleak import BleakScanner, BleakClient # Bluetooth Low Energy(BLE) 통신을 위한 모듈

scan_frame = None
scroll_frame = None

def create_ui(root):
    global scan_frame
    global scroll_frame

    # UI 제목
    root.title("TS100-Project")

    # Bottom 프레임, Scan TS100, Temperature Information을 포함함
    bottom_frame = Frame(root)

    # Scan리스트 Frame
    scan_ts100_frame = LabelFrame(bottom_frame, relief="solid", bd=1, text="Scan TS100", width=115)
    scan_ts100_frame.grid(row=0, column=0, padx=5, pady=5, sticky="nsew")

    # 전역변수 할당
    scan_frame = scan_ts100_frame

    # 온도정보 Frame
    temperature_information_frame = LabelFrame(bottom_frame, relief="solid", bd=1, text="Temperature Information")
    canvas = Canvas(temperature_information_frame, borderwidth=0)
    scrollbar = Scrollbar(temperature_information_frame, orient="vertical", command=canvas.yview)
    scrollable_frame = Frame(canvas)

    # 전역변수 할당
    scroll_frame = scrollable_frame

    scrollable_frame.bind(
        "<Configure>",
        lambda e: canvas.configure(scrollregion=canvas.bbox("all"))
    )

    canvas.create_window((0, 0), window=scrollable_frame, anchor="nw")
    canvas.configure(yscrollcommand=scrollbar.set)

    temperature_information_frame.grid(row=0, column=1, padx=5, pady=5, sticky="nsew")
    canvas.pack(side="left", fill="both", expand=True)
    scrollbar.pack(side="right", fill="y", expand=True)

    # Scan 버튼
    scan_button = Button(root, text="Scan", command=scan_button_click)
    scan_button.pack()

    # 하위 프레임
    bottom_frame.pack(side="bottom", fill="both", expand=True)

'''
    Scan
'''

# 스캔된 버튼을 담고있는 배열
scan_devices_list = []

def remove_button(button_name=None):
    if button_name is not None:
        if not scan_devices_list:
            return

        remove_button = None
        for device in scan_devices_list:
            if device.cget("text") == button_name:
                remove_button = device
                break
       
        if remove_button:
            root.after(0, remove_button.destroy)
            scan_devices_list.remove(remove_button)
        return

    # 기존에 scan했던 기기 화면에서 제거
    for button in scan_devices_list:
        root.after(0, button.destroy)
        scan_devices_list.remove(button)

def create_scan_devices(ts100_scan_list):
    global scan_devices_list

    # Scan했던 Device 버튼 생성
    for device in ts100_scan_list:
        button = Button(scan_frame, text=device.name, command=lambda device=device: scan_device_button_click(device))
        button.pack()
        scan_devices_list.append(button)

def scan_button_click():
    remove_button()

    # 이벤트루프 생성해 BLE 기기 scan
    loop = asyncio.get_event_loop()
    ts100_scan_list = loop.run_until_complete(scan())

    create_scan_devices(ts100_scan_list)

async def scan():
    ts100_device = []

    print('Scan Start')
    # scan start
    scan_devices = await BleakScanner.discover()
    print('Scan End')

    for device in scan_devices:
        device_name = device.name if device.name else "Unknown Device"
        if "TS100" in device_name:
            ts100_device.append(device)
            print(device_name)
   
    return ts100_device

def scan_device_button_click(device):
    print("Selected Device: " + device.name)

    loop = asyncio.get_event_loop()
    loop.run_until_complete(connect(device))

'''
    Connect
'''

# dictionary 선언(key: device_name, value: device_text_frame)
device_info_frame_dict = {}


async def connect(candidate_device):
    print('connect start')

    selected_client = BleakClient(candidate_device)

    try:
        # 장치 연결 시작
        await selected_client.connect()
        print('connected')

        # Characteristic, Service 요청
        await get_service_and_characteristic(candidate_device, selected_client)

        # scan_ts100에서 연결된 기기 삭제
        root.after(0, remove_button, candidate_device.name)
        # info 화면에 추가
        root.after(0, create_device_info_frame, candidate_device, selected_client)

    except Exception as e:
        # 연결 실패시 발생
        print('error: ', e, end='')
        return None

# 연결된 기기 Frame을 생성하는 함수
def create_device_info_frame(device, ble_client):
    global scroll_frame
    global device_info_frame_dict
    global sender_dict

    # Device 영역 제거
    def remove_frame():
        # 프레임에서 삭제
        frame.destroy()

        # BLE 연결 해제
        loop = asyncio.get_event_loop()
        loop.run_until_complete(ble_client.disconnect())

        # dictionary 삭제
        del device_info_frame_dict[device.name]
        remove_characteristic_by_value(device.name)

    # value값으로 dictionary 값 삭제하는 함수
    def remove_characteristic_by_value(name):
        keys_to_remove = [key for key, value in sender_dict.items() if value == name]
        for key in keys_to_remove:
            del sender_dict[key]

    frame = Frame(scroll_frame, relief="solid", bd=1, padx=5, pady=5)
    frame.pack()

    # TitleFrame: 좌측 상단에 제목이 표시되는 프레임
    title_frame = Frame(frame, padx=5, pady=5)
    title_label = Label(title_frame, text=device.name, font=("Helvetica", 14))
    title_label.pack(side="left")

    # NotifyStartFrame: 우측 상단에 notify를 시작하는 버튼이 표시되는 프레임
    remove_button_frame = Frame(frame, padx=5, pady=5)
    remove_button = Button(remove_button_frame, text="X", command=remove_frame, bg="red", fg="white")
    remove_button.pack()

    # ContentFrame: 하단에 내용을 보여주는 프레임
    content_frame = Frame(frame, padx=10, pady=10)
    content_label_text = StringVar()
    content_label = Label(content_frame, textvariable=content_label_text, font=("Helvetica", 14))
    content_label_text.set("Connect")
    content_label.pack(side="bottom")

    # 배치
    title_frame.grid(row=0, column=0, sticky="ew")
    remove_button_frame.grid(row=0, column=1, sticky="e")
    content_frame.grid(row=1, column=0, columnspan=2, sticky="nsew")

    # 그리드 가중치 설정 (윈도우 크기 조절에 대응하기 위해)
    scroll_frame.grid_rowconfigure(1, weight=1)
    scroll_frame.grid_columnconfigure(0, weight=1)

    # dictionary에 추가
    device_info_frame_dict[device.name] = content_label_text

'''
    notify
'''

health_thermometer_service = '00001809-0000-1000-8000-00805f9b34fb'

temperature_characteristic = '00002a1c-0000-1000-8000-00805f9b34fb'

# dictionary 선언(key: characteristic, value: device_name)
sender_dict = {}

# Service, Characteristic 요청하는 함수
async def get_service_and_characteristic(candidate_device, connected_device):
    # Service 확인
    for service in connected_device.services:
        # Health Thermometer 관련 Service일때만 Characteristic 검색
        if service.uuid == health_thermometer_service:
            for characteristic in service.characteristics:
                # 온도정보 notify 설정
                if characteristic.uuid == temperature_characteristic:
                    # Dictionary에 추가
                    if characteristic not in sender_dict:
                        sender_dict[characteristic] = candidate_device.name
                    await connected_device.start_notify(characteristic, notify_callback)

# notify를 수신했을 때 호출되는 함수
def notify_callback(handle, data):
    temperature = parse_temperature_information(data)

    # 수신된 기기 이름 확인
    device_name = sender_dict.get(handle)

    if device_name is None:
        return
   
    # 업데이트할 텍스트 영역 확인 후 업데이트
    string_var = device_info_frame_dict.get(device_name)

    if string_var is None:
        return

    string_var.set(f"{temperature}°C")

# 온도정보 변환
def parse_temperature_information(data: bytearray):
    # 온도정보 변환
    temperature = temperature_calculate(data[1], data[2], data[3], data[4])

    # 온도정보 출력
    print(temperature)

    return temperature

# 온도정보 계산하는 함수
def temperature_calculate(t1, t2, t3, d):
    # Int로 변환, 비트 시프트 후 합산
    int_t1 = int(t1)
    int_t2 = int(t2) << 8
    int_t3 = int(t3) << 16

    signed_value = int_t1 + int_t2 + int_t3

    # 자릿수 정하는 변수
    digit = int(d)
    if digit > 127:
        digit -= 256

    # 온도
    temperature = float(signed_value) * 10 ** digit
    # 소수점 1자리만 남도록 수정
    temperature = round(temperature * 10) / 10

    return temperature

if __name__ == "__main__":
    root = tk.Tk()

    create_ui(root)

    # 이벤트루프 생성
    loop = asyncio.get_event_loop()

    # 비동기로 UI 계속 업데이트
    def asyncio_loop():
        loop.call_soon(loop.stop)
        loop.run_forever()
        root.after(100, asyncio_loop)

    root.after(100, asyncio_loop)
    root.mainloop()

Last updated