Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,16 @@
/.settings
/.classpath
/.project
.idea/.gitignore
.idea/codeStyles/
.idea/compiler.xml
.idea/encodings.xml
.idea/jarRepositories.xml
.idea/libraries/
.idea/misc.xml
.idea/modules.xml
.idea/sbt.xml
.idea/sonarlint/
.idea/vcs.xml
.java-version
bigqueue.iml
50 changes: 35 additions & 15 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.leansoft</groupId>
<groupId>org.clojars.nitishgoyal13</groupId>
<artifactId>bigqueue</artifactId>
<version>0.7.2</version>
<version>0.7.6</version>
<packaging>jar</packaging>

<name>bigqueue</name>
Expand Down Expand Up @@ -37,15 +37,10 @@

<distributionManagement>
<repository>
<id>github.release.repo</id>
<name>Release Repository</name>
<url>file:///dev/bulldog-repo/repo/releases</url>
<id>clojars</id>
<name>Clojars repository</name>
<url>https://clojars.org/repo</url>
</repository>
<snapshotRepository>
<id>github.snapshot.repo</id>
<name>Snapshot Repository</name>
<url>file:///dev/bulldog-repo/repo/snapshots</url>
</snapshotRepository>
</distributionManagement>

<properties>
Expand Down Expand Up @@ -101,11 +96,33 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.6</source>
<target>1.6</target>
<encoding>UTF-8</encoding>
</configuration>
<version>3.8.1</version>
<executions>
<execution>
<id>compile-java-8</id>
<goals>
<goal>compile</goal>
</goals>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</execution>
<execution>
<id>compile-java-11</id>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
<configuration>
<release>11</release>
<compileSourceRoots>
<compileSourceRoot>${project.basedir}/src/main/java11</compileSourceRoot>
</compileSourceRoots>
<outputDirectory>${project.build.outputDirectory}/META-INF/versions/11</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down Expand Up @@ -134,6 +151,9 @@
<configuration>
<archive>
<manifestFile>src/main/resources/META-INF/MANIFEST.MF</manifestFile>
<manifestEntries>
<Multi-Release>true</Multi-Release>
</manifestEntries>
</archive>
</configuration>
</plugin>
Expand Down
37 changes: 19 additions & 18 deletions src/main/java/com/leansoft/bigqueue/page/MappedPageImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,51 +3,52 @@
import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.Method;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MappedPageImpl implements IMappedPage, Closeable {

private final static Logger logger = LoggerFactory.getLogger(MappedPageImpl.class);

private ThreadLocalByteBuffer threadLocalBuffer;
private volatile boolean dirty = false;
private volatile boolean closed = false;
private String pageFile;
private long index;

public MappedPageImpl(MappedByteBuffer mbb, String pageFile, long index) {
this.threadLocalBuffer = new ThreadLocalByteBuffer(mbb);
this.pageFile = pageFile;
this.index = index;
}

public void close() throws IOException {
synchronized(this) {
if (closed) return;

flush();

MappedByteBuffer srcBuf = (MappedByteBuffer)threadLocalBuffer.getSourceBuffer();
unmap(srcBuf);

this.threadLocalBuffer = null; // hint GC

closed = true;
if (logger.isDebugEnabled()) {
logger.debug("Mapped page for " + this.pageFile + " was just unmapped and closed.");
}
}
}

@Override
public void setDirty(boolean dirty) {
this.dirty = dirty;
}

@Override
public void flush() {
synchronized(this) {
Expand All @@ -69,19 +70,19 @@ public byte[] getLocal(int position, int length) {
buf.get(data);
return data;
}

@Override
public ByteBuffer getLocal(int position) {
ByteBuffer buf = this.threadLocalBuffer.get();
buf.position(position);
((Buffer)buf).position(position);
return buf;
}

private static void unmap(MappedByteBuffer buffer)
{
Cleaner.clean(buffer);
}

/**
* Helper class allowing to clean direct buffers.
*/
Expand Down Expand Up @@ -120,18 +121,18 @@ public static void clean(ByteBuffer buffer) {
}
}
}

private static class ThreadLocalByteBuffer extends ThreadLocal<ByteBuffer> {
private ByteBuffer _src;

public ThreadLocalByteBuffer(ByteBuffer src) {
_src = src;
}

public ByteBuffer getSourceBuffer() {
return _src;
}

@Override
protected synchronized ByteBuffer initialValue() {
ByteBuffer dup = _src.duplicate();
Expand All @@ -143,7 +144,7 @@ protected synchronized ByteBuffer initialValue() {
public boolean isClosed() {
return closed;
}

public String toString() {
return "Mapped page for " + this.pageFile + ", index = " + this.index + ".";
}
Expand Down
150 changes: 150 additions & 0 deletions src/main/java11/com/leansoft/bigqueue/page/MappedPageImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package com.leansoft.bigqueue.page;

import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sun.misc.Unsafe;

public class MappedPageImpl implements IMappedPage, Closeable {

private final static Logger logger = LoggerFactory.getLogger(MappedPageImpl.class);

private ThreadLocalByteBuffer threadLocalBuffer;
private volatile boolean dirty = false;
private volatile boolean closed = false;
private String pageFile;
private long index;

public MappedPageImpl(MappedByteBuffer mbb, String pageFile, long index) {
this.threadLocalBuffer = new ThreadLocalByteBuffer(mbb);
this.pageFile = pageFile;
this.index = index;
}

public void close() throws IOException {
synchronized(this) {
if (closed) return;

flush();

MappedByteBuffer srcBuf = (MappedByteBuffer)threadLocalBuffer.getSourceBuffer();
unmap(srcBuf);

this.threadLocalBuffer = null; // hint GC

closed = true;
if (logger.isDebugEnabled()) {
logger.debug("Mapped page for " + this.pageFile + " was just unmapped and closed.");
}
}
}

@Override
public void setDirty(boolean dirty) {
this.dirty = dirty;
}

@Override
public void flush() {
synchronized(this) {
if (closed) return;
if (dirty) {
MappedByteBuffer srcBuf = (MappedByteBuffer)threadLocalBuffer.getSourceBuffer();
srcBuf.force(); // flush the changes
dirty = false;
if (logger.isDebugEnabled()) {
logger.debug("Mapped page for " + this.pageFile + " was just flushed.");
}
}
}
}

@Override
public ByteBuffer getLocal(int position) {
ByteBuffer buf = this.threadLocalBuffer.get();
buf.position(position);
return buf;
}

public byte[] getLocal(int position, int length) {
ByteBuffer buf = this.getLocal(position);
byte[] data = new byte[length];
buf.get(data);
return data;
}

private static void unmap(MappedByteBuffer buffer)
{
Cleaner.clean(buffer);
}

/**
* Helper class allowing to clean direct buffers.
*/
private static class Cleaner {

private static Unsafe unsafe;

static {
try {
Field f = Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
unsafe = (Unsafe) f.get(null);
} catch (Exception e) {
logger.warn("Unsafe Not support {}", e.getMessage(), e);
}
}

public static void clean(ByteBuffer buffer) {
if (buffer == null) return;
if (buffer.isDirect()) {
if (unsafe != null) {
unsafe.invokeCleaner(buffer);
} else {
logger.warn("Unable to clean bytebuffer");
}
}
}
}

private static class ThreadLocalByteBuffer extends ThreadLocal<ByteBuffer> {
private ByteBuffer _src;

public ThreadLocalByteBuffer(ByteBuffer src) {
_src = src;
}

public ByteBuffer getSourceBuffer() {
return _src;
}

@Override
protected synchronized ByteBuffer initialValue() {
ByteBuffer dup = _src.duplicate();
return dup;
}
}

@Override
public boolean isClosed() {
return closed;
}

public String toString() {
return "Mapped page for " + this.pageFile + ", index = " + this.index + ".";
}

@Override
public String getPageFile() {
return this.pageFile;
}

@Override
public long getPageIndex() {
return this.index;
}
}