Skip to content

Commit

Permalink
🔖 Bump version 1.1.0
Browse files Browse the repository at this point in the history
  • Loading branch information
SylvainRomillon committed Oct 13, 2023
1 parent ed485d3 commit 41bdafa
Show file tree
Hide file tree
Showing 49 changed files with 2,594 additions and 2,881 deletions.
2 changes: 0 additions & 2 deletions ALookCom/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +0,0 @@
# use this to import files that are inside this module
import os, sys; sys.path.append(os.path.dirname(os.path.realpath(__file__)))
100 changes: 40 additions & 60 deletions ALookCom/anim.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,16 @@

import cv2

import utils
from img import Img
from commandPub import CommandPub
from . import utils
from . import imgFmt
from .img import Img


class Anim:
def __init__(self, com):
self.cmd = CommandPub(com)
self.img = img = Img(com)
self.com = com

self.img = Img(com)

## return the number of elements before the fist difference
def _sameValueLen(self, la, lb):
assert(len(la) == len(lb)), "different length is not handled"
Expand Down Expand Up @@ -42,52 +41,35 @@ def _sameValueLenReverse(self, la, lb):
return cnt

## encode full image for animation
def _encodFullImgCmd(self, img):
height = img.shape[0]
width = img.shape[1]

## compress img 4 bit per pixel
encodedImg = []
for i in range(height):
byte = 0
shift = 0
for j in range(width):
## convert gray8bit to gray4bit
pxl = round(img[i, j] / 17)

## compress 4 bit per pixel
byte += pxl << shift
shift += 4
if shift == 8:
encodedImg.append(byte)
byte = 0
shift = 0
if shift != 0:
encodedImg.append(byte)

return encodedImg

### reduce pixel range to 16 level of grey but keep it on 1 byte
def _reducePxlRange(self, img):
for i, line in enumerate(img):
for j, plx in enumerate(line):
pxl = round(plx / 17)
img[i][j] = pxl

return img
def _encodFullImgCmd(self, fmt, img):
if fmt in [imgFmt.MONO_4BPP, imgFmt.RYYG, imgFmt.RRYG]:
encodedImg = self.img.compress4bpp(img)
return (encodedImg, len(encodedImg), len(encodedImg))
elif fmt == imgFmt.MONO_4BPP_HEATSHRINK:
encodedImg = self.img.compress4bpp(img)
compressedImg = self.img.compressHs(encodedImg)
return (compressedImg, len(encodedImg), len(compressedImg))
else:
raise Exception("unsupported format")

## prepare command for animation saving
def formatCmd(self, id, imgs):
def getCmd(self, id, imgs, fmt = imgFmt.MONO_4BPP, useDeprecateCmd = False):
if useDeprecateCmd:
assert(fmt in [imgFmt.MONO_4BPP])
else:
assert(fmt in [imgFmt.MONO_4BPP, imgFmt.MONO_4BPP_HEATSHRINK, imgFmt.RYYG, imgFmt.RRYG])

firstImg = self.img.convert(imgs[0], fmt)

## first image encoded as complete image
rawAnim = self._encodFullImgCmd(imgs[0])
imgSize = len(rawAnim)
(rawAnim, imgSize, compressedSize) = self._encodFullImgCmd(fmt, firstImg)

prev = self._reducePxlRange(imgs[0])
prev = firstImg

width = imgs[0].shape[1]
width = firstImg.shape[1]

for img in imgs[1:]:
img = self._reducePxlRange(img)
img = self.img.convert(img, fmt)

## crop width
lines = []
Expand Down Expand Up @@ -161,6 +143,9 @@ class BreakIt(Exception): pass
data += utils.intToList(len(rawAnim))
data += utils.intToList(imgSize)
data += utils.uShortToList(width)
if not useDeprecateCmd:
data += [fmt]
data += utils.intToList(compressedSize)
cmds = [self.com.formatFrame(0x95, data)]

## pack data in commands
Expand All @@ -176,21 +161,19 @@ class BreakIt(Exception): pass
return cmds

## prepare command for animation saving
def formatCmdFolder(self, id, folder):
def getCmdFolder(self, id, folder, fmt = imgFmt.MONO_4BPP, useDeprecateCmd = False):
## list file in folder
imgs = []
for f in os.listdir(folder):
for f in sorted(os.listdir(folder)):
path = os.path.join(folder, f)
if os.path.isfile(path):
img = cv2.imread(path)
img = cv2.rotate(img, cv2.ROTATE_180)
img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
imgs.append(img)

return self.formatCmd(id, imgs)
return self.getCmd(id, imgs, fmt, useDeprecateCmd)

## prepare command for animation saving
def formatCmdGif(self, id, gif, invertColor):
def getCmdGif(self, id, gif, invertColor, fmt = imgFmt.MONO_4BPP):

cap = cv2.VideoCapture(gif)

Expand All @@ -200,9 +183,6 @@ def formatCmdGif(self, id, gif, invertColor):
if not ret:
break

frame = cv2.rotate(frame, cv2.ROTATE_180)
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

if invertColor:
## invert color
frame = 255 - frame
Expand All @@ -212,12 +192,12 @@ def formatCmdGif(self, id, gif, invertColor):

cap.release()

return self.formatCmd(id, imgs)
return self.getCmd(id, imgs, fmt)

## save an animation, takes all images of a folder
def saveAnimation(self, id, folder = "anim/cube-302x256"):
def saveAnimation(self, id, folder = "anim/cube-302x256", fmt = imgFmt.MONO_4BPP, useDeprecateCmd = False):
## convert image
cmds = self.formatCmdFolder(id, folder)
cmds = self.getCmdFolder(id, folder, fmt, useDeprecateCmd)

## send commands
for c in cmds:
Expand All @@ -228,9 +208,9 @@ def saveAnimation(self, id, folder = "anim/cube-302x256"):
return True

## save an animation from a gif
def saveAnimationGif(self, id, gif = "anim/monkey-228x228.gif", invertColor = True):
def saveAnimationGif(self, id, gif = "anim/monkey-228x228.gif", invertColor = True, fmt = imgFmt.MONO_4BPP):
## convert image
cmds = self.formatCmdGif(id, gif, invertColor)
cmds = self.getCmdGif(id, gif, invertColor, fmt)

## send commands
for c in cmds:
Expand Down
127 changes: 106 additions & 21 deletions ALookCom/ble.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,115 @@
## BLE abstraction Layer
# BLE abstraction Layer

import asyncio

from bleak import BleakScanner

class Ble:
## get device rssi
def __getRssi(self, dev):
return dev.rssi

## runner to scan devices
async def __runScanDevices(self):
devices = await BleakScanner.discover(timeout = 2.5)
for d in devices:
self.__scannedDev.append(d)

## scan for devices
def scanDevices(self):
self.__scannedDev = []
# filter used by __runGetAdvData()
def __advScanFilter(self, d, adv):
if d.name == self.__advScanName:
self.__advData = adv
return True
else:
return False


# Handle scan failure for debug
async def __runError(self, timeout):
dev_lst = await BleakScanner.discover(timeout)
print(f"Failed to find device, scanned devices:")
for d in dev_lst:
print(f" - {d.address}, {d.name}")


async def __runScan(self, timeout):
return await BleakScanner.discover(timeout)


async def __runScanByName(self, name, timeout):
return await BleakScanner.find_device_by_filter(
lambda d, ad: d.name == name,
timeout
)


async def __runScanByAddr(self, addr_lst, timeout):
return await BleakScanner.find_device_by_filter(
lambda d, ad: d.address in addr_lst,
timeout
)


async def __runScanBySrv(self, srvUuid, timeout):
return await BleakScanner.find_device_by_filter(
lambda d, ad: srvUuid in ad.service_uuids,
timeout
)


# runner to get advertizing data
async def __runGetAdvData(self, name):
self.__advScanName = name
d = await BleakScanner.find_device_by_filter(self.__advScanFilter)
if d:
return self.__advData
else:
return None


# scan for device
def scan(self, timeout=30.0):
loop = asyncio.get_event_loop()
return loop.run_until_complete(self.__runScan(timeout))


def scanByName(self, name, timeout=30.0, debug=True):
loop = asyncio.get_event_loop()
loop.run_until_complete(self.__runScanDevices())
dev = loop.run_until_complete(self.__runScanByName(name, timeout))

if debug and not dev:
loop.run_until_complete(self.__runError(timeout))

return dev


def scanByAddr(self, addr, timeout=30.0, debug=True):
loop = asyncio.get_event_loop()
dev = loop.run_until_complete(self.__runScanByAddr(addr, timeout))

if debug and not dev:
loop.run_until_complete(self.__runError(timeout))

return dev


def scanBySrv(self, srvUuid, timeout=30.0, debug=True):
loop = asyncio.get_event_loop()
dev = loop.run_until_complete(self.__runScanBySrv(srvUuid, timeout))

if debug and not dev:
loop.run_until_complete(self.__runError(timeout))

return dev


def getAdvData(self, name, debug=True):
loop = asyncio.get_event_loop()
adv_raw = loop.run_until_complete(self.__runGetAdvData(name))

if adv_raw:
adv = {}

mfr = adv_raw.manufacturer_data
key = list(mfr.keys())[0]
value = list(mfr[key])
key = key.to_bytes(length=2, byteorder='big', signed=False)
key = [key[1], key[0]]
adv['manufacturer_data'] = key + value

## sort by RSSI
self.__scannedDev.sort(key=self.__getRssi, reverse=True)
adv['rssi'] = adv_raw.rssi
else:
loop.run_until_complete(self.__runError(10.0))
adv = None

return self.__scannedDev

def getAdvtManufacturerData(self, dev):
return dev.metadata['manufacturer_data']
return adv
Loading

0 comments on commit 41bdafa

Please sign in to comment.