Significantly improved performance of hopper logging (fixes #636)

This commit is contained in:
Intelli 2025-03-10 17:42:43 -06:00
parent 3038e96c6e
commit 38ba3abc9a
2 changed files with 150 additions and 70 deletions

View file

@ -4,6 +4,9 @@ import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.bukkit.Location;
import org.bukkit.inventory.Inventory;
@ -18,6 +21,13 @@ import net.coreprotect.utility.ItemUtils;
public final class HopperPushListener {
private static final ConcurrentLinkedQueue<Object[]> hopperQueue = new ConcurrentLinkedQueue<>();
private static final AtomicBoolean processorRunning = new AtomicBoolean(false);
private static final int BATCH_SIZE = 100;
private static final int PROCESS_DELAY_TICKS = 1;
private static final int MAX_CONCURRENT_PROCESSORS = 4;
private static final AtomicInteger activeProcessors = new AtomicInteger(0);
static void processHopperPush(Location location, String user, InventoryHolder sourceHolder, InventoryHolder destinationHolder, ItemStack item) {
Location destinationLocation = destinationHolder.getInventory().getLocation();
if (destinationLocation == null) {
@ -36,9 +46,70 @@ public final class HopperPushListener {
ItemStack[] destinationContainer = ItemUtils.getContainerState(destinationHolder.getInventory().getContents());
ItemStack movedItem = item.clone();
final long taskStarted = InventoryChangeListener.tasksStarted.incrementAndGet();
hopperQueue.add(new Object[] { location, user, sourceHolder, destinationHolder, movedItem, destinationContainer, loggingChestId, lastAbort });
// Start the processor if it's not already running
if (processorRunning.compareAndSet(false, true)) {
startHopperProcessor();
}
}
private static void startHopperProcessor() {
if (activeProcessors.incrementAndGet() <= MAX_CONCURRENT_PROCESSORS) {
Scheduler.runTaskAsynchronously(CoreProtect.getInstance(), () -> {
try {
// Use the same server running check as Consumer class
while (!hopperQueue.isEmpty() && (ConfigHandler.serverRunning || ConfigHandler.converterRunning)) {
processHopperBatch();
}
}
catch (Exception e) {
e.printStackTrace();
}
finally {
activeProcessors.decrementAndGet();
processorRunning.set(false);
// If more items were added and we're still running, restart the processor
if (!hopperQueue.isEmpty() && (ConfigHandler.serverRunning || ConfigHandler.converterRunning) && activeProcessors.get() == 0) {
startHopperProcessor();
}
}
});
}
else {
activeProcessors.decrementAndGet();
}
}
private static void processHopperBatch() {
int processed = 0;
final long taskStarted = InventoryChangeListener.tasksStarted.incrementAndGet();
while (!hopperQueue.isEmpty() && processed < BATCH_SIZE && (ConfigHandler.serverRunning || ConfigHandler.converterRunning)) {
Object[] data = hopperQueue.poll();
if (data == null)
continue;
Location location = (Location) data[0];
String user = (String) data[1];
InventoryHolder sourceHolder = (InventoryHolder) data[2];
InventoryHolder destinationHolder = (InventoryHolder) data[3];
ItemStack movedItem = (ItemStack) data[4];
ItemStack[] destinationContainer = (ItemStack[]) data[5];
String loggingChestId = (String) data[6];
Object[] lastAbort = (Object[]) data[7];
processSingleHopperPush(location, user, sourceHolder, destinationHolder, movedItem, destinationContainer, loggingChestId, lastAbort);
processed++;
}
InventoryChangeListener.checkTasks(taskStarted);
}
private static void processSingleHopperPush(Location location, String user, InventoryHolder sourceHolder, InventoryHolder destinationHolder, ItemStack movedItem, ItemStack[] destinationContainer, String loggingChestId, Object[] lastAbort) {
if (sourceHolder == null || destinationHolder == null) {
return;
}
@ -96,13 +167,6 @@ public final class HopperPushListener {
}
}
InventoryChangeListener.checkTasks(taskStarted);
InventoryChangeListener.onInventoryInteract(user, destinationInventory, originalDestination, null, destinationInventory.getLocation(), true);
}
catch (Exception e) {
e.printStackTrace();
}
});
}
}

View file

@ -43,14 +43,30 @@ public final class InventoryChangeListener extends Queue implements Listener {
protected static AtomicLong tasksStarted = new AtomicLong();
protected static AtomicLong tasksCompleted = new AtomicLong();
private static ConcurrentHashMap<String, Boolean> inventoryProcessing = new ConcurrentHashMap<>();
private static final Object taskCompletionLock = new Object();
private static final long TASK_WAIT_MAX_MS = 50; // Maximum wait time in milliseconds
protected static void checkTasks(long taskStarted) {
try {
int waitCount = 0;
while (tasksCompleted.get() < (taskStarted - 1L) && waitCount++ <= 50) {
Thread.sleep(1);
// Skip checking if this is the first task or we're already caught up
if (taskStarted <= 1 || tasksCompleted.get() >= (taskStarted - 1L)) {
tasksCompleted.set(taskStarted);
return;
}
// Try to update without waiting if possible
if (tasksCompleted.compareAndSet(taskStarted - 1L, taskStarted)) {
return;
}
// Use proper synchronization instead of busy waiting
synchronized (taskCompletionLock) {
if (tasksCompleted.get() < (taskStarted - 1L)) {
taskCompletionLock.wait(TASK_WAIT_MAX_MS);
}
tasksCompleted.set(taskStarted);
taskCompletionLock.notifyAll(); // Notify other waiting threads
}
}
catch (Exception e) {
e.printStackTrace();