Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,7 @@
if TYPE_CHECKING:
from collections.abc import Iterable

DEFAULT_DATA_RAW = (
"/mnt/c/Users/zhang/AppData/Roaming/Factorio/"
"script-output/data-raw-dump.json"
)
DEFAULT_DATA_RAW = None


@dataclass(frozen=True)
Expand Down Expand Up @@ -130,7 +127,7 @@ def read_png_size(path: Path) -> tuple[int, int] | None:
signature = handle.read(8)
if signature != b"\x89PNG\r\n\x1a\n":
return None
_length = int.from_bytes(handle.read(4), "big")
_ = int.from_bytes(handle.read(4), "big")
chunk_type = handle.read(4)
if chunk_type != b"IHDR":
return None
Expand Down Expand Up @@ -218,7 +215,7 @@ def parse_args() -> argparse.Namespace:
)
parser.add_argument(
"--data-raw",
default=DEFAULT_DATA_RAW,
default=os.environ.get("FACTORIO_DATA_RAW", DEFAULT_DATA_RAW),
help="Path to data-raw-dump.json",
)
parser.add_argument(
Expand All @@ -232,15 +229,22 @@ def parse_args() -> argparse.Namespace:
return parser.parse_args()


def ensure_data_raw(data_raw: Path) -> Path | None:
def ensure_data_raw(data_raw: str | Path | None) -> Path | None:
"""Validate the data-raw-dump.json path."""
if not data_raw.exists():
if not data_raw:
print(
f"ERROR: data-raw-dump.json not found: {data_raw}",
"ERROR: --data-raw is required (or set FACTORIO_DATA_RAW).",
file=sys.stderr,
)
return None
return data_raw
resolved = data_raw if isinstance(data_raw, Path) else Path(data_raw)
if not resolved.exists():
print(
f"ERROR: data-raw-dump.json not found: {resolved}",
file=sys.stderr,
)
return None
return resolved


def ensure_data_dir(data_dir: str | None) -> Path | None:
Expand Down Expand Up @@ -320,7 +324,7 @@ def summarize_failures(icon_failures: int, locale_failures: int) -> int:
def main() -> int:
"""Run the icon and locale verification workflow."""
args = parse_args()
data_raw = ensure_data_raw(Path(args.data_raw))
data_raw = ensure_data_raw(args.data_raw)
if not data_raw:
return 2

Expand Down
156 changes: 107 additions & 49 deletions src/private/app/factorio-cycle-calculator/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,8 @@
from collections.abc import Mapping
from types import TracebackType

DEFAULT_DATA_DIR = (
"/mnt/c/Program Files (x86)/Steam/steamapps/common/Factorio/data"
)
DEFAULT_DATA_RAW = (
"/mnt/c/Users/zhang/AppData/Roaming/Factorio/"
"script-output/data-raw-dump.json"
)
DEFAULT_DATA_DIR = ""
DEFAULT_DATA_RAW = ""

ICON_TOKEN_RE = re.compile(r"__([^/]+)__/(.+)")

Expand All @@ -43,6 +38,7 @@ class Machine:
label: str
crafting_speed: float
allow_productivity: bool
base_productivity: float
crafting_categories: tuple[str, ...]
module_slots: int
allowed_effects: frozenset[str]
Expand All @@ -59,6 +55,7 @@ class Recipe:
ingredients: Mapping[tuple[str, str], float]
results: Mapping[tuple[str, str], float]
allow_productivity: bool
ignored_by_productivity: frozenset[tuple[str, str]]


@dataclass(frozen=True)
Expand Down Expand Up @@ -220,7 +217,7 @@ def load_data_raw(data_raw_path: str) -> dict:
try:
with Path(data_raw_path).open("r", encoding="utf-8") as handle:
return json.load(handle)
except OSError:
except (OSError, json.JSONDecodeError, ValueError):
return {}


Expand Down Expand Up @@ -271,9 +268,12 @@ def parse_ingredient_list(
return parsed


def parse_results(proto: dict) -> dict[tuple[str, str], float]:
"""Parse recipe results into a typed map."""
def parse_results(
proto: dict,
) -> tuple[dict[tuple[str, str], float], frozenset[tuple[str, str]]]:
"""Parse recipe results into a typed map and ignored-by-productivity set."""
results: dict[tuple[str, str], float] = {}
ignored: set[tuple[str, str]] = set()
if "results" in proto and isinstance(proto["results"], list):
for entry in proto["results"]:
if not isinstance(entry, dict):
Expand All @@ -287,13 +287,15 @@ def parse_results(proto: dict) -> dict[tuple[str, str], float]:
proto_type = entry.get("type", "item")
key = (proto_type, name)
results[key] = results.get(key, 0.0) + amount
return results
if entry.get("ignored_by_productivity"):
ignored.add(key)
return results, frozenset(ignored)

result_name = proto.get("result")
if isinstance(result_name, str):
amount = float(proto.get("result_count", 1.0))
results[("item", result_name)] = amount
return results
return results, frozenset(ignored)


def build_recipe_from_proto(name: str, proto: dict) -> Recipe:
Expand All @@ -303,7 +305,7 @@ def build_recipe_from_proto(name: str, proto: dict) -> Recipe:
energy_required = 0.5
category = proto.get("category", "crafting")
ingredients = parse_ingredient_list(proto.get("ingredients", []))
results = parse_results(proto)
results, ignored_by_productivity = parse_results(proto)
allow_productivity = bool(proto.get("allow_productivity", False))
return Recipe(
key=name,
Expand All @@ -313,6 +315,7 @@ def build_recipe_from_proto(name: str, proto: dict) -> Recipe:
ingredients=ingredients,
results=results,
allow_productivity=allow_productivity,
ignored_by_productivity=ignored_by_productivity,
)


Expand Down Expand Up @@ -352,18 +355,22 @@ def build_machine_catalog(
crafting_categories = tuple(proto.get("crafting_categories", []))
allowed_effects_raw = proto.get("allowed_effects", [])
allowed_effects = normalize_allowed_effects(allowed_effects_raw)
allow_productivity = "productivity" in allowed_effects
if not allow_productivity:
base_effect = (proto.get("effect_receiver") or {}).get(
"base_effect", {}
)
allow_productivity = bool(base_effect.get("productivity", 0))
base_effect = (proto.get("effect_receiver") or {}).get(
"base_effect", {}
)
base_productivity = parse_effect_bonus(
base_effect.get("productivity", 0.0) or 0.0
)
allow_productivity = (
"productivity" in allowed_effects or base_productivity > 0.0
)
module_slots = int(proto.get("module_slots", 0))
catalog[name] = Machine(
key=name,
label=name.replace("-", " ").title(),
crafting_speed=crafting_speed,
allow_productivity=allow_productivity,
base_productivity=base_productivity,
crafting_categories=crafting_categories,
module_slots=module_slots,
allowed_effects=allowed_effects,
Expand Down Expand Up @@ -476,6 +483,8 @@ def build_icon_catalog(
machines: Mapping[str, Machine],
) -> dict[tuple[str, str], IconSpec]:
"""Build the icon catalog for recipes, machines, and flows."""
if not data_dir_path:
return {}
data_dir = Path(data_dir_path)
if not data_dir.exists():
return {}
Expand Down Expand Up @@ -610,23 +619,22 @@ def find_icon(
def load_icon_image(path: str, size: int | None) -> bytes | None:
"""Load and crop an icon image to a single square."""
try:
image = Image.open(path)
with Image.open(path) as image:
image = image.convert("RGBA")
width, height = image.size
target_size = size
if target_size is None and width != height:
target_size = min(width, height)
if target_size:
target_size = min(target_size, width, height)
image = image.crop((0, 0, target_size, target_size))

with io.BytesIO() as buffer:
image.save(buffer, format="PNG")
return buffer.getvalue()
except OSError:
return None

image = image.convert("RGBA")
width, height = image.size
target_size = size
if target_size is None and width != height:
target_size = min(width, height)
if target_size:
target_size = min(target_size, width, height)
image = image.crop((0, 0, target_size, target_size))

with io.BytesIO() as buffer:
image.save(buffer, format="PNG")
return buffer.getvalue()


def format_amount(value: float) -> str:
"""Format a number using compact k/M suffixes."""
Expand Down Expand Up @@ -1074,13 +1082,17 @@ def per_machine_rates(recipe: Recipe, config: RecipeConfig) -> FlowRates:
cycle_seconds = recipe.energy_required / effective_speed
productivity = 0.0
if recipe.allow_productivity and config.machine.allow_productivity:
productivity = config.effects.productivity_bonus
productivity = (
config.machine.base_productivity + config.effects.productivity_bonus
)
multiplier = 1.0 + productivity

production = {
key: amount * multiplier / cycle_seconds
for key, amount in recipe.results.items()
}
production: dict[tuple[str, str], float] = {}
for key, amount in recipe.results.items():
if key in recipe.ignored_by_productivity:
production[key] = amount / cycle_seconds
else:
production[key] = amount * multiplier / cycle_seconds
consumption = {
key: amount / cycle_seconds
for key, amount in recipe.ingredients.items()
Expand Down Expand Up @@ -1149,10 +1161,23 @@ def solve_chain(
("fluid", "heavy-oil"),
0.0,
)
solver.Add(
heavy_prod * variables[advanced_key] # type: ignore[operator]
== heavy_cons * variables[heavy_key] # type: ignore[operator]
)
if heavy_prod <= 0.0 or heavy_cons <= 0.0:
st.error(
"Selected recipes must produce/consume heavy-oil. "
"Please choose an oil-processing recipe that outputs heavy-oil "
"and a cracking recipe that consumes heavy-oil."
)
return None
if force_integer:
solver.Add(
heavy_prod * variables[advanced_key] # type: ignore[operator]
>= heavy_cons * variables[heavy_key] # type: ignore[operator]
)
else:
solver.Add(
heavy_prod * variables[advanced_key] # type: ignore[operator]
== heavy_cons * variables[heavy_key] # type: ignore[operator]
)

light_prod_advanced = rates[advanced_key].production.get(
("fluid", "light-oil"),
Expand All @@ -1166,11 +1191,26 @@ def solve_chain(
("fluid", "light-oil"),
0.0,
)
solver.Add(
light_prod_advanced * variables[advanced_key] # type: ignore[operator]
+ light_prod_from_heavy * variables[heavy_key] # type: ignore[operator]
== light_cons * variables[light_key] # type: ignore[operator]
)
total_light_prod = light_prod_advanced + light_prod_from_heavy
if total_light_prod <= 0.0 or light_cons <= 0.0:
st.error(
"Selected recipes must produce/consume light-oil. "
"Please choose recipes that produce light-oil and a cracking "
"recipe that consumes light-oil."
)
return None
if force_integer:
solver.Add(
light_prod_advanced * variables[advanced_key] # type: ignore[operator]
+ light_prod_from_heavy * variables[heavy_key] # type: ignore[operator]
>= light_cons * variables[light_key] # type: ignore[operator]
)
else:
solver.Add(
light_prod_advanced * variables[advanced_key] # type: ignore[operator]
+ light_prod_from_heavy * variables[heavy_key] # type: ignore[operator]
== light_cons * variables[light_key] # type: ignore[operator]
)

pg_prod_advanced = rates[advanced_key].production.get(
("fluid", "petroleum-gas"),
Expand All @@ -1180,6 +1220,12 @@ def solve_chain(
("fluid", "petroleum-gas"),
0.0,
)
if pg_prod_advanced + pg_prod_from_light <= 0.0:
st.error(
"Selected recipes must produce petroleum gas. "
"Please choose an oil-processing/cracking chain that outputs it."
)
return None
solver.Add(
pg_prod_advanced * variables[advanced_key] # type: ignore[operator]
+ pg_prod_from_light * variables[light_key] # type: ignore[operator]
Expand Down Expand Up @@ -1376,6 +1422,17 @@ def main() -> None:
unit_label,
) = render_sidebar_controls()

if not data_raw_path:
st.error(
"Set FACTORIO_DATA_RAW or enter a data-raw-dump.json path to load."
)
return

if not data_dir_path:
st.sidebar.warning(
"Set FACTORIO_DATA_DIR or enter a data directory to load icons."
)

data_raw = load_data_raw(data_raw_path)
if not data_raw:
st.error("Failed to load data-raw-dump.json.")
Expand Down Expand Up @@ -1435,7 +1492,7 @@ def main() -> None:
production_context
)

demand_pg_per_s = demand_pg_per_min / 60.0
demand_pg_per_s = demand_pg_per_min / unit_multiplier
result = solve_chain(
demand_pg_per_s,
recipes,
Expand Down Expand Up @@ -1464,4 +1521,5 @@ def main() -> None:
render_solution(result, state)


main()
if __name__ == "__main__":
main()
Loading