544 lines
20 KiB
Python
544 lines
20 KiB
Python
======================================================================
|
|
// File: layer.py
|
|
======================================================================
|
|
|
|
# backend/layer.py
|
|
import pcbnew
|
|
import re
|
|
import traceback
|
|
from .resolvers import get_item_uuid, resolve_to_trackable_item, find_item_by_uuid
|
|
from .serializer import Serializer
|
|
|
|
class Layer:
|
|
def __init__(self, name, manager):
|
|
self.name = name
|
|
self.manager = manager
|
|
self.visible = True
|
|
self.on_board_items = [] # List of {'uuid': str, 'class': str}
|
|
self.stored_items = [] # List of filenames (str) pointing to item JSONs
|
|
|
|
def add_item(self, item):
|
|
if not item: return False
|
|
uuid = get_item_uuid(item)
|
|
if not uuid: return False
|
|
|
|
cls = item.GetClass()
|
|
for entry in self.on_board_items:
|
|
if entry['uuid'] == uuid: return False
|
|
self.on_board_items.append({'uuid': uuid, 'class': cls})
|
|
return True
|
|
|
|
def _clear_selection_safe(self):
|
|
try:
|
|
sel = pcbnew.GetCurrentSelection()
|
|
items = [i for i in sel]
|
|
for item in items:
|
|
if hasattr(item, 'ClearSelected'): item.ClearSelected()
|
|
if hasattr(item, 'ClearBrightened'): item.ClearBrightened()
|
|
except: pass
|
|
|
|
def hide(self):
|
|
self.manager.log(f"Layer {self.name}: Hiding...")
|
|
try:
|
|
board = pcbnew.GetBoard()
|
|
self._clear_selection_safe()
|
|
pcbnew.Refresh()
|
|
|
|
new_stored_items = list(self.stored_items)
|
|
items_to_remove = []
|
|
failed_entries = []
|
|
|
|
for entry in self.on_board_items:
|
|
try:
|
|
uuid = entry['uuid']
|
|
item = find_item_by_uuid(board, uuid)
|
|
if not item:
|
|
self.manager.log(f"Hide: Item {uuid} not found (skipping)")
|
|
continue
|
|
|
|
sexpr, err = Serializer.serialize(item)
|
|
if not sexpr:
|
|
self.manager.log(f"Hide: Serialize failed {uuid}: {err}")
|
|
failed_entries.append(entry)
|
|
continue
|
|
|
|
item_data = {'class': item.GetClass(), 'sexpr': sexpr, 'uuid': uuid}
|
|
if hasattr(item, 'GetReference'): item_data['ref'] = item.GetReference()
|
|
if hasattr(item, 'GetNetname'): item_data['net'] = item.GetNetname()
|
|
|
|
filename = self.manager.storage.save_item(item_data, uuid)
|
|
if filename:
|
|
new_stored_items.append(filename)
|
|
items_to_remove.append(item)
|
|
else:
|
|
failed_entries.append(entry)
|
|
except Exception as e:
|
|
self.manager.log(f"Hide: Error entry {entry}: {e}")
|
|
failed_entries.append(entry)
|
|
|
|
for item in items_to_remove:
|
|
try:
|
|
if item.IsLocked(): item.SetLocked(False)
|
|
board.Remove(item)
|
|
except: pass
|
|
|
|
self.stored_items = new_stored_items
|
|
self.on_board_items = failed_entries
|
|
if not self.on_board_items: self.visible = False
|
|
self.manager.save()
|
|
self.manager.log("Hide: Complete")
|
|
except Exception as e:
|
|
self.manager.log(f"Hide: Critical Error: {e}")
|
|
self.manager.log(traceback.format_exc())
|
|
|
|
def show(self):
|
|
try:
|
|
self.manager.log(f"Layer {self.name}: Showing (Force Restore)...")
|
|
board = pcbnew.GetBoard()
|
|
self.manager.is_restoring = True
|
|
|
|
new_on_board_items = list(self.on_board_items)
|
|
restored_files = []
|
|
|
|
for filename in self.stored_items:
|
|
try:
|
|
item_data = self.manager.storage.load_item(filename)
|
|
if not item_data: continue
|
|
uuid = item_data.get('uuid')
|
|
|
|
# Force Restore: Remove existing ghost if present
|
|
existing = find_item_by_uuid(board, uuid)
|
|
if existing:
|
|
try:
|
|
if existing.IsLocked(): existing.SetLocked(False)
|
|
board.Remove(existing)
|
|
except: pass
|
|
|
|
item, err = Serializer.deserialize(item_data, board)
|
|
if item:
|
|
try:
|
|
board.Add(item)
|
|
new_uuid = get_item_uuid(item)
|
|
if new_uuid:
|
|
found = any(e['uuid'] == new_uuid for e in new_on_board_items)
|
|
if not found:
|
|
new_on_board_items.append({'uuid': new_uuid, 'class': item.GetClass()})
|
|
restored_files.append(filename)
|
|
except Exception as e:
|
|
self.manager.log(f"Show: Add failed {uuid}: {e}")
|
|
except Exception as e:
|
|
self.manager.log(f"Show: Error file {filename}: {e}")
|
|
|
|
self.on_board_items = new_on_board_items
|
|
for f in restored_files:
|
|
if f in self.stored_items: self.stored_items.remove(f)
|
|
|
|
self.visible = True
|
|
self.manager.is_restoring = False
|
|
self.manager.save()
|
|
self.manager.log("Show: Complete")
|
|
except Exception as e:
|
|
self.manager.log(f"Show: Critical Error: {e}")
|
|
self.manager.is_restoring = False
|
|
|
|
def delete_content(self, delete_items_from_board):
|
|
board = pcbnew.GetBoard()
|
|
if self.visible:
|
|
if delete_items_from_board:
|
|
self._clear_selection_safe()
|
|
for entry in self.on_board_items:
|
|
item = find_item_by_uuid(board, entry['uuid'])
|
|
if item:
|
|
try:
|
|
if item.IsLocked(): item.SetLocked(False)
|
|
board.Remove(item)
|
|
except: pass
|
|
self.on_board_items = []
|
|
else:
|
|
if not delete_items_from_board:
|
|
self.manager.is_restoring = True
|
|
for filename in self.stored_items:
|
|
item_data = self.manager.storage.load_item(filename)
|
|
if item_data:
|
|
item, err = Serializer.deserialize(item_data, board)
|
|
if item: board.Add(item)
|
|
self.manager.is_restoring = False
|
|
self.stored_items = []
|
|
self.manager.save()
|
|
|
|
def clear_items(self):
|
|
self.delete_content(delete_items_from_board=True)
|
|
|
|
def to_dict(self):
|
|
return {
|
|
"name": self.name, "visible": self.visible,
|
|
"on_board_items": self.on_board_items, "stored_items": self.stored_items
|
|
}
|
|
|
|
@staticmethod
|
|
def from_dict(d, manager):
|
|
l = Layer(d["name"], manager)
|
|
l.visible = d["visible"]
|
|
l.on_board_items = d.get("on_board_items", [])
|
|
l.stored_items = d.get("stored_items", [])
|
|
return l
|
|
|
|
def add_from_text(self, text):
|
|
uuids = set(re.findall(r'\(uuid\s+"([^"]+)"\)', text))
|
|
if not uuids: return 0, "No UUIDs found."
|
|
board = pcbnew.GetBoard()
|
|
count = 0
|
|
for uuid in uuids:
|
|
try:
|
|
item = board.GetItem(pcbnew.KIID(uuid))
|
|
if item:
|
|
trackable = resolve_to_trackable_item(item)
|
|
if trackable and self.add_item(trackable): count += 1
|
|
except: pass
|
|
self.manager.save()
|
|
return count, f"Added {count} items."
|
|
|
|
def _get_selection_candidates(self, board):
|
|
candidates = []
|
|
try:
|
|
sel = pcbnew.GetCurrentSelection()
|
|
for item in sel: candidates.append(item)
|
|
except: pass
|
|
|
|
if not candidates:
|
|
# Fallback scan
|
|
for lst in [board.GetTracks(), board.GetFootprints(), board.GetDrawings(), board.Zones(), board.Groups()]:
|
|
try:
|
|
for item in lst:
|
|
if item.IsSelected(): candidates.append(item)
|
|
except: pass
|
|
return candidates
|
|
|
|
def add_selection(self):
|
|
board = pcbnew.GetBoard()
|
|
candidates = self._get_selection_candidates(board)
|
|
items_to_add = {}
|
|
|
|
def process_item(item):
|
|
if item.GetClass() == "PCB_GROUP":
|
|
try:
|
|
group = pcbnew.Cast_to_PCB_GROUP(item)
|
|
for sub in group.GetItems(): process_item(sub)
|
|
except: pass
|
|
else:
|
|
trackable = resolve_to_trackable_item(item)
|
|
if trackable:
|
|
uuid = get_item_uuid(trackable)
|
|
if uuid: items_to_add[uuid] = trackable
|
|
|
|
for item in candidates: process_item(item)
|
|
|
|
count = 0
|
|
for uuid, item in items_to_add.items():
|
|
if self.add_item(item): count += 1
|
|
self.manager.save()
|
|
return count, f"Added {count} items."
|
|
|
|
def remove_selection(self):
|
|
board = pcbnew.GetBoard()
|
|
candidates = self._get_selection_candidates(board)
|
|
uuids_to_remove = set()
|
|
|
|
def collect_uuids(item):
|
|
if item.GetClass() == "PCB_GROUP":
|
|
try:
|
|
group = pcbnew.Cast_to_PCB_GROUP(item)
|
|
for sub in group.GetItems(): collect_uuids(sub)
|
|
except: pass
|
|
else:
|
|
trackable = resolve_to_trackable_item(item)
|
|
if trackable:
|
|
uuid = get_item_uuid(trackable)
|
|
if uuid: uuids_to_remove.add(uuid)
|
|
|
|
for item in candidates: collect_uuids(item)
|
|
|
|
initial_count = len(self.on_board_items)
|
|
self.on_board_items = [e for e in self.on_board_items if e['uuid'] not in uuids_to_remove]
|
|
removed = initial_count - len(self.on_board_items)
|
|
self.manager.save()
|
|
return removed, f"Removed {removed} items."
|
|
|
|
def inspect(self):
|
|
return f"Layer: {self.name}\nVisible: {self.visible}\nOn Board: {len(self.on_board_items)}\nStored: {len(self.stored_items)}"
|
|
|
|
|
|
======================================================================
|
|
// File: manager.py
|
|
======================================================================
|
|
|
|
# backend/manager.py
|
|
import pcbnew
|
|
import os
|
|
import re
|
|
from .storage import StorageManager
|
|
from .layer import Layer
|
|
from .resolvers import find_item_by_uuid, get_item_uuid, resolve_to_trackable_item
|
|
from .serializer import Serializer
|
|
|
|
class LayerListener(pcbnew.BOARD_LISTENER):
|
|
def __init__(self, mgr):
|
|
super().__init__()
|
|
self.mgr = mgr
|
|
|
|
def OnBoardItemAdded(self, item):
|
|
if self.mgr.is_restoring: return
|
|
if self.mgr.active_layer_index is not None:
|
|
try:
|
|
layer = self.mgr.layers[self.mgr.active_layer_index]
|
|
if layer.visible:
|
|
trackable = resolve_to_trackable_item(item)
|
|
if trackable: layer.add_item(trackable)
|
|
except: pass
|
|
|
|
class LayerManager:
|
|
def __init__(self):
|
|
self.layers = []
|
|
self.active_layer_index = None
|
|
self.listener = None
|
|
self.last_save_status = "Init"
|
|
self.last_save_path = "Session Dir"
|
|
self.last_error = "None"
|
|
self.settings = {"delete_mode": "ask"}
|
|
self.is_restoring = False
|
|
self.debug_log = []
|
|
self.storage = StorageManager()
|
|
|
|
def log(self, msg):
|
|
self.debug_log.append(str(msg))
|
|
print(f"[LayerManager] {msg}")
|
|
|
|
def get_log_text(self): return "\n".join(self.debug_log)
|
|
def clear_log(self): self.debug_log = []
|
|
|
|
def activate_listener(self):
|
|
if not self.listener:
|
|
self.listener = LayerListener(self)
|
|
pcbnew.GetBoard().AddListener(self.listener)
|
|
|
|
def set_active_layer(self, index):
|
|
self.active_layer_index = index
|
|
if index is not None: self.activate_listener()
|
|
|
|
def get_file_path(self): return self.storage.master_file
|
|
|
|
def save(self):
|
|
if not self.storage.setup_paths():
|
|
self.last_error = "Failed to setup storage paths"
|
|
return
|
|
for l in self.layers: self.storage.save_layer(l.name, l.to_dict())
|
|
self.storage.update_master(self.layers)
|
|
self.last_save_status = "Saved"
|
|
self.last_save_path = self.storage.session_dir
|
|
|
|
def load(self):
|
|
if not self.storage.setup_paths():
|
|
self.last_save_status = "No Board"
|
|
return
|
|
master = self.storage.load_master()
|
|
if master and "layers" in master:
|
|
self.layers = []
|
|
for l_meta in master["layers"]:
|
|
l_data = self.storage.load_layer(l_meta["name"])
|
|
if l_data: self.layers.append(Layer.from_dict(l_data, self))
|
|
self.last_save_status = "Loaded"
|
|
else:
|
|
self.last_save_status = "New Session"
|
|
|
|
def create_layer(self, name):
|
|
l = Layer(name, self)
|
|
self.layers.append(l)
|
|
self.save()
|
|
return l
|
|
|
|
def delete_layer(self, index, delete_items):
|
|
if 0 <= index < len(self.layers):
|
|
l = self.layers[index]
|
|
l.delete_content(True) if delete_items else l.delete_content(False)
|
|
self.layers.pop(index)
|
|
self.save()
|
|
|
|
def move_layer_up(self, index):
|
|
if index > 0 and index < len(self.layers):
|
|
self.layers[index], self.layers[index-1] = self.layers[index-1], self.layers[index]
|
|
if self.active_layer_index == index: self.active_layer_index = index - 1
|
|
elif self.active_layer_index == index - 1: self.active_layer_index = index
|
|
self.save()
|
|
return True
|
|
return False
|
|
|
|
def move_layer_down(self, index):
|
|
if index < len(self.layers) - 1 and index >= 0:
|
|
self.layers[index], self.layers[index+1] = self.layers[index+1], self.layers[index]
|
|
if self.active_layer_index == index: self.active_layer_index = index + 1
|
|
elif self.active_layer_index == index + 1: self.active_layer_index = index
|
|
self.save()
|
|
return True
|
|
return False
|
|
|
|
def analyze_clipboard_text(self, text):
|
|
self.log(f"Analyze: Input text length: {len(text)}")
|
|
uuids = set(re.findall(r'\(uuid\s+"([^"]+)"\)', text))
|
|
if not uuids: return self.get_log_text()
|
|
board = pcbnew.GetBoard()
|
|
matched = 0
|
|
def check(item):
|
|
try:
|
|
if get_item_uuid(item) in uuids: return 1
|
|
except: pass
|
|
return 0
|
|
try:
|
|
for lst in [board.GetTracks(), board.GetFootprints(), board.GetDrawings(), board.Zones()]:
|
|
for item in lst: matched += check(item)
|
|
except: pass
|
|
self.log(f"Analyze: Total Matches on Board: {matched}")
|
|
return self.get_log_text()
|
|
|
|
def recover_orphans(self):
|
|
if not self.storage.setup_paths(): return 0, "Storage not ready"
|
|
tracked_uuids = set()
|
|
stored_files = set()
|
|
board = pcbnew.GetBoard()
|
|
for l in self.layers:
|
|
for f in l.stored_items: stored_files.add(f)
|
|
for entry in l.on_board_items: tracked_uuids.add(entry['uuid'])
|
|
orphans = []
|
|
try:
|
|
for f in os.listdir(self.storage.session_dir):
|
|
if f.endswith(".json") and f != "$Master$.json" and f not in stored_files:
|
|
data = self.storage.load_item(f)
|
|
if not data or 'uuid' not in data: continue
|
|
uuid = data['uuid']
|
|
if uuid in tracked_uuids or find_item_by_uuid(board, uuid): continue
|
|
orphans.append(f)
|
|
except Exception as e: return 0, f"Error scanning dir: {e}"
|
|
if not orphans: return 0, "No orphans found"
|
|
rec_layer = self.create_layer("Recovered")
|
|
rec_layer.stored_items = orphans
|
|
rec_layer.visible = False
|
|
self.save()
|
|
return len(orphans), f"Recovered {len(orphans)} items."
|
|
|
|
def hard_refresh(self):
|
|
self.log("Hard Refresh: Starting...")
|
|
board = pcbnew.GetBoard()
|
|
for layer in self.layers:
|
|
if layer.visible:
|
|
valid_on_board = []
|
|
for entry in layer.on_board_items:
|
|
uuid = entry['uuid']
|
|
item = find_item_by_uuid(board, uuid)
|
|
if item:
|
|
sexpr, err = Serializer.serialize(item)
|
|
if sexpr: valid_on_board.append(entry)
|
|
else:
|
|
self.log(f"Refresh: Item {uuid} missing from visible layer {layer.name}")
|
|
layer.on_board_items = valid_on_board
|
|
else:
|
|
for filename in layer.stored_items:
|
|
data = self.storage.load_item(filename)
|
|
if data and 'uuid' in data:
|
|
uuid = data['uuid']
|
|
item = find_item_by_uuid(board, uuid)
|
|
if item:
|
|
self.log(f"Refresh: Found ghost {uuid} from hidden layer {layer.name}. Removing.")
|
|
try: board.Remove(item)
|
|
except: pass
|
|
self.save()
|
|
self.log("Hard Refresh: Complete")
|
|
|
|
manager = LayerManager()
|
|
|
|
|
|
======================================================================
|
|
// File: serializer.py
|
|
======================================================================
|
|
|
|
# backend/serializer.py
|
|
import pcbnew
|
|
|
|
class Serializer:
|
|
@staticmethod
|
|
def serialize(item):
|
|
"""
|
|
Returns (sexpr_string, error_message).
|
|
"""
|
|
if not item: return None, "Item is None"
|
|
|
|
last_error = ""
|
|
|
|
# Method 1: PCB_IO_KICAD_SEXPR (Robust, KiCad 6+)
|
|
try:
|
|
sf = pcbnew.STRING_FORMATTER()
|
|
plugin = pcbnew.PCB_IO_KICAD_SEXPR()
|
|
plugin.SetOutputFormatter(sf)
|
|
plugin.Format(item)
|
|
res = sf.GetString()
|
|
if res and len(res) > 0:
|
|
return res, None
|
|
else:
|
|
last_error = "Format returned empty string"
|
|
except Exception as e:
|
|
last_error = str(e)
|
|
|
|
# Method 2: Fallback to direct Serialize
|
|
try:
|
|
sf = pcbnew.STRING_FORMATTER()
|
|
if hasattr(item, 'Serialize'):
|
|
item.Serialize(sf)
|
|
res = sf.GetString()
|
|
if res and len(res) > 0:
|
|
return res, None
|
|
except Exception as e:
|
|
last_error = f"{last_error} | Fallback: {str(e)}"
|
|
|
|
return None, last_error
|
|
|
|
@staticmethod
|
|
def deserialize(data, board):
|
|
"""
|
|
Returns (item, error_message).
|
|
"""
|
|
if not data: return None, "No data provided"
|
|
cls_name = data.get('class')
|
|
sexpr = data.get('sexpr')
|
|
if not cls_name or not sexpr: return None, "Missing class or sexpr"
|
|
|
|
try:
|
|
if not hasattr(pcbnew, cls_name):
|
|
return None, f"Class {cls_name} not found in pcbnew"
|
|
|
|
cls = getattr(pcbnew, cls_name)
|
|
item = None
|
|
|
|
# Instantiate item
|
|
try:
|
|
item = cls(board)
|
|
except:
|
|
try:
|
|
item = cls()
|
|
except Exception as e:
|
|
return None, f"Constructor failed for {cls_name}: {e}"
|
|
|
|
# Deserialize with KiCad version compatibility
|
|
# KiCad 9+ requires (data, source_name), older versions take (data)
|
|
try:
|
|
lr = pcbnew.STRING_LINE_READER(sexpr, "restore")
|
|
except:
|
|
try:
|
|
lr = pcbnew.STRING_LINE_READER(sexpr)
|
|
except Exception as e:
|
|
return None, f"STRING_LINE_READER init failed: {e}"
|
|
|
|
item.Deserialize(lr)
|
|
return item, None
|
|
except Exception as e:
|
|
return None, f"Deserialization error for {cls_name}: {e}"
|
|
|
|
|