1
mirror of https://github.com/DarkFlippers/unleashed-firmware.git synced 2025-12-13 13:09:49 +04:00

Merge branch 'ofw-dev' into dev

This commit is contained in:
MX
2023-06-08 14:58:46 +03:00
38 changed files with 1206 additions and 929 deletions

View File

@@ -132,7 +132,7 @@ def generate(env):
"UsbInstall": Builder(
action=[
Action(
'${PYTHON3} "${SELFUPDATE_SCRIPT}" ${UPDATE_BUNDLE_DIR}/update.fuf'
'${PYTHON3} "${SELFUPDATE_SCRIPT}" -p ${FLIP_PORT} ${UPDATE_BUNDLE_DIR}/update.fuf'
),
Touch("${TARGET}"),
]

View File

@@ -431,7 +431,7 @@ def AddAppLaunchTarget(env, appname, launch_target_name):
# print(deploy_sources, flipp_dist_paths)
env.PhonyTarget(
launch_target_name,
'${PYTHON3} "${APP_RUN_SCRIPT}" ${EXTRA_ARGS} -s ${SOURCES} -t ${FLIPPER_FILE_TARGETS}',
'${PYTHON3} "${APP_RUN_SCRIPT}" -p ${FLIP_PORT} ${EXTRA_ARGS} -s ${SOURCES} -t ${FLIPPER_FILE_TARGETS}',
source=deploy_sources,
FLIPPER_FILE_TARGETS=flipp_dist_paths,
EXTRA_ARGS=run_script_extra_ars,
@@ -443,7 +443,6 @@ def generate(env, **kw):
env.SetDefault(
EXT_APPS_WORK_DIR="${FBT_FAP_DEBUG_ELF_ROOT}",
APP_RUN_SCRIPT="${FBT_SCRIPT_DIR}/runfap.py",
STORAGE_SCRIPT="${FBT_SCRIPT_DIR}/storage.py",
)
if not env["VERBOSE"]:
env.SetDefault(

View File

@@ -257,12 +257,12 @@ class FlipperStorage:
self.read.until(self.CLI_PROMPT)
ftell = file.tell()
percent = str(math.ceil(ftell / filesize * 100))
total_chunks = str(math.ceil(filesize / buffer_size))
current_chunk = str(math.ceil(ftell / buffer_size))
percent = math.ceil(ftell / filesize * 100)
total_chunks = math.ceil(filesize / buffer_size)
current_chunk = math.ceil(ftell / buffer_size)
approx_speed = ftell / (time.time() - start_time + 0.0001)
sys.stdout.write(
f"\r{percent}%, chunk {current_chunk} of {total_chunks} @ {approx_speed/1024:.2f} kb/s"
f"\r<{percent:3d}%, chunk {current_chunk:2d} of {total_chunks:2d} @ {approx_speed/1024:.2f} kb/s"
)
sys.stdout.flush()
print()
@@ -270,6 +270,7 @@ class FlipperStorage:
def read_file(self, filename: str):
"""Receive file from Flipper, and get filedata (bytes)"""
buffer_size = self.chunk_size
start_time = time.time()
self.send_and_wait_eol(
'storage read_chunks "' + filename + '" ' + str(buffer_size) + "\r"
)
@@ -290,10 +291,13 @@ class FlipperStorage:
filedata.extend(self.port.read(chunk_size))
read_size = read_size + chunk_size
percent = str(math.ceil(read_size / size * 100))
total_chunks = str(math.ceil(size / buffer_size))
current_chunk = str(math.ceil(read_size / buffer_size))
sys.stdout.write(f"\r{percent}%, chunk {current_chunk} of {total_chunks}")
percent = math.ceil(read_size / size * 100)
total_chunks = math.ceil(size / buffer_size)
current_chunk = math.ceil(read_size / buffer_size)
approx_speed = read_size / (time.time() - start_time + 0.0001)
sys.stdout.write(
f"\r>{percent:3d}%, chunk {current_chunk:2d} of {total_chunks:2d} @ {approx_speed/1024:.2f} kb/s"
)
sys.stdout.flush()
print()
self.read.until(self.CLI_PROMPT)

View File

@@ -6,7 +6,7 @@ def resolve_port(logger, portname: str = "auto"):
if portname != "auto":
return portname
# Try guessing
flippers = list(list_ports.grep("flip"))
flippers = list(list_ports.grep("flip_"))
if len(flippers) == 1:
flipper = flippers[0]
logger.info(f"Using {flipper.serial_number} on {flipper.device}")

View File

@@ -1,3 +1,4 @@
import argparse
import logging
import os
import subprocess
@@ -8,7 +9,10 @@ from flipper.utils.cdc import resolve_port
def main():
logger = logging.getLogger()
if not (port := resolve_port(logger, "auto")):
parser = argparse.ArgumentParser()
parser.add_argument("-p", "--port", help="CDC Port", default="auto")
args = parser.parse_args()
if not (port := resolve_port(logger, args.port)):
logger.error("Is Flipper connected via USB and not in DFU mode?")
return 1
subprocess.call(

View File

@@ -342,7 +342,7 @@ else:
appenv.PhonyTarget(
"cli",
'${PYTHON3} "${FBT_SCRIPT_DIR}/serial_cli.py"',
'${PYTHON3} "${FBT_SCRIPT_DIR}/serial_cli.py" -p ${FLIP_PORT}',
)
# Linter
@@ -469,7 +469,7 @@ if dolphin_src_dir.exists():
)
dist_env.PhonyTarget(
"dolphin_ext",
'${PYTHON3} ${FBT_SCRIPT_DIR}/storage.py send "${SOURCE}" /ext/dolphin',
'${PYTHON3} ${FBT_SCRIPT_DIR}/storage.py -p ${FLIP_PORT} send "${SOURCE}" /ext/dolphin',
source=ufbt_build_dir.Dir("dolphin"),
)
else:

View File

@@ -71,6 +71,11 @@ vars.AddVariables(
validator=PathVariable.PathIsDir,
default="",
),
(
"FLIP_PORT",
"CDC Port of Flipper to use, if multiple are connected",
"auto",
),
)
Return("vars")

240
scripts/wifi_board.py Executable file
View File

@@ -0,0 +1,240 @@
#!/usr/bin/env python3
from flipper.app import App
from serial.tools.list_ports_common import ListPortInfo
import logging
import os
import tempfile
import subprocess
import serial.tools.list_ports as list_ports
import json
import requests
import tarfile
class UpdateDownloader:
UPDATE_SERVER = "https://update.flipperzero.one"
UPDATE_PROJECT = "/blackmagic-firmware"
UPDATE_INDEX = UPDATE_SERVER + UPDATE_PROJECT + "/directory.json"
UPDATE_TYPE = "full_tgz"
CHANNEL_ID_ALIAS = {
"dev": "development",
"rc": "release-candidate",
"r": "release",
"rel": "release",
}
def __init__(self):
self.logger = logging.getLogger()
def download(self, channel_id: str, dir: str) -> bool:
# Aliases
if channel_id in self.CHANNEL_ID_ALIAS:
channel_id = self.CHANNEL_ID_ALIAS[channel_id]
# Make directory
if not os.path.exists(dir):
self.logger.info(f"Creating directory {dir}")
os.makedirs(dir)
# Download json index
self.logger.info(f"Downloading {self.UPDATE_INDEX}")
response = requests.get(self.UPDATE_INDEX)
if response.status_code != 200:
self.logger.error(f"Failed to download {self.UPDATE_INDEX}")
return False
# Parse json index
try:
index = json.loads(response.content)
except Exception as e:
self.logger.error(f"Failed to parse json index: {e}")
return False
# Find channel
channel = None
for channel_candidate in index["channels"]:
if channel_candidate["id"] == channel_id:
channel = channel_candidate
break
# Check if channel found
if channel is None:
self.logger.error(
f"Channel '{channel_id}' not found. Valid channels: {', '.join([c['id'] for c in index['channels']])}"
)
return False
self.logger.info(f"Using channel '{channel_id}'")
# Get latest version
try:
version = channel["versions"][0]
except Exception as e:
self.logger.error(f"Failed to get version: {e}")
return False
self.logger.info(f"Using version '{version['version']}'")
# Get changelog
changelog = None
try:
changelog = version["changelog"]
except Exception as e:
self.logger.error(f"Failed to get changelog: {e}")
# print changelog
if changelog is not None:
self.logger.info(f"Changelog:")
for line in changelog.split("\n"):
if line.strip() == "":
continue
self.logger.info(f" {line}")
# Find file
file_url = None
for file_candidate in version["files"]:
if file_candidate["type"] == self.UPDATE_TYPE:
file_url = file_candidate["url"]
break
if file_url is None:
self.logger.error(f"File not found")
return False
# Make file path
file_name = file_url.split("/")[-1]
file_path = os.path.join(dir, file_name)
# Download file
self.logger.info(f"Downloading {file_url} to {file_path}")
with open(file_path, "wb") as f:
response = requests.get(file_url)
f.write(response.content)
# Unzip tgz
self.logger.info(f"Unzipping {file_path}")
with tarfile.open(file_path, "r") as tar:
tar.extractall(dir)
return True
class Main(App):
def init(self):
self.parser.add_argument("-p", "--port", help="CDC Port", default="auto")
self.parser.add_argument(
"-c", "--channel", help="Channel name", default="release"
)
self.parser.set_defaults(func=self.update)
# logging
self.logger = logging.getLogger()
def find_wifi_board(self) -> bool:
# idk why, but python thinks that list_ports.grep returns tuple[str, str, str]
blackmagics: list[ListPortInfo] = list(list_ports.grep("blackmagic")) # type: ignore
daps: list[ListPortInfo] = list(list_ports.grep("CMSIS-DAP")) # type: ignore
return len(blackmagics) > 0 or len(daps) > 0
def find_wifi_board_bootloader(self):
# idk why, but python thinks that list_ports.grep returns tuple[str, str, str]
ports: list[ListPortInfo] = list(list_ports.grep("ESP32-S2")) # type: ignore
if len(ports) == 0:
# Blackmagic probe serial port not found, will be handled later
pass
elif len(ports) > 1:
raise Exception("More than one WiFi board found")
else:
port = ports[0]
if os.name == "nt":
port.device = f"\\\\.\\{port.device}"
return port.device
def update(self):
try:
port = self.find_wifi_board_bootloader()
except Exception as e:
self.logger.error(f"{e}")
return 1
if self.args.port != "auto":
port = self.args.port
available_ports = [p[0] for p in list(list_ports.comports())]
if port not in available_ports:
self.logger.error(f"Port {port} not found")
return 1
if port is None:
if self.find_wifi_board():
self.logger.error("WiFi board found, but not in bootloader mode.")
self.logger.info("Please hold down BOOT button and press RESET button")
else:
self.logger.error("WiFi board not found")
self.logger.info(
"Please connect WiFi board to your computer, hold down BOOT button and press RESET button"
)
return 1
# get temporary dir
with tempfile.TemporaryDirectory() as temp_dir:
downloader = UpdateDownloader()
# download latest channel update
try:
if not downloader.download(self.args.channel, temp_dir):
self.logger.error(f"Cannot download update")
return 1
except Exception as e:
self.logger.error(f"Cannot download update: {e}")
return 1
with open(os.path.join(temp_dir, "flash.command"), "r") as f:
flash_command = f.read()
flash_command = flash_command.replace("\n", "").replace("\r", "")
flash_command = flash_command.replace("(PORT)", port)
# We can't reset the board after flashing via usb
flash_command = flash_command.replace(
"--after hard_reset", "--after no_reset_stub"
)
args = flash_command.split(" ")[0:]
args = list(filter(None, args))
esptool_params = []
esptool_params.extend(args)
self.logger.info(f'Running command: "{" ".join(args)}" in "{temp_dir}"')
process = subprocess.Popen(
esptool_params,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
cwd=temp_dir,
bufsize=1,
universal_newlines=True,
)
while process.poll() is None:
if process.stdout is not None:
for line in process.stdout:
self.logger.debug(f"{line.strip()}")
if process.returncode != 0:
self.logger.error(f"Failed to flash WiFi board")
else:
self.logger.info("WiFi board flashed successfully")
self.logger.info("Press RESET button on WiFi board to start it")
return process.returncode
if __name__ == "__main__":
Main()()