This file creates a stream that will emit values for each new file created in the directory watching. It's only dependency is the RxJava library, and uses inside the java.nio2 watch service for new values.
Last active
January 16, 2018 13:14
-
-
Save Alotor/0cebef8b8c3741b06700ab70c8556274 to your computer and use it in GitHub Desktop.
New files streaming
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Grab("io.reactivex.rxjava2:rxjava:2.1.8") | |
import java.nio.file.* | |
import io.reactivex.* | |
Observable<Path> newFilesStream(Path path) { | |
Observable.create { emitter -> | |
println(">> Watching $path") | |
def parentWatchService = FileSystems.getDefault().newWatchService() | |
def foldersWatching = [ "$path" : parentWatchService ] | |
path.register(parentWatchService, StandardWatchEventKinds.ENTRY_CREATE) | |
while(true) { | |
newFolders = [:] | |
foldersWatching.each { entry -> | |
def curPath = Paths.get(entry.key) | |
def watchService = entry.value | |
def key = watchService.poll() | |
if (key) { | |
key.pollEvents().each { ev -> | |
def eventPath = curPath.resolve("${ev.context()}") | |
if (Files.isDirectory(eventPath) && !foldersWatching.containsKey("$eventPath") && !newFolders.containsKey("$eventPath")) { | |
def newService = FileSystems.getDefault().newWatchService() | |
eventPath.register(newService, StandardWatchEventKinds.ENTRY_CREATE) | |
newFolders["$eventPath"] = newService | |
println(">> Adding directory $eventPath to watch list") | |
} | |
emitter.onNext(eventPath) | |
} | |
key.reset() | |
} | |
} | |
foldersWatching << newFolders | |
} | |
} | |
} | |
newFilesStream(Paths.get("/tmp/watch")).take(10).forEach { Path path -> | |
println(">>> $path") | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Grab("commons-io:commons-io:2.6") | |
@Grab("io.reactivex.rxjava2:rxjava:2.1.8") | |
import org.apache.commons.io.monitor.* | |
import java.nio.file.* | |
import io.reactivex.* | |
Observable<Path> newFilesStream(Path path) { | |
Observable.create { emitter -> | |
def observer = new FileAlterationObserver(path.toFile()) | |
observer.addListener(new FileAlterationListenerAdaptor() { | |
@Override | |
public void onDirectoryCreate(File file) { | |
emitter.onNext(file.toPath()) | |
} | |
@Override | |
public void onFileCreate(File file) { | |
emitter.onNext(file.toPath()) | |
} | |
}) | |
def monitor = new FileAlterationMonitor(1000, observer) | |
emitter.cancellable = { | |
monitor.stop() | |
} | |
monitor.start() | |
} | |
} | |
newFilesStream(Paths.get("/mnt/shared")).take(5).forEach { Path path -> | |
println(">>> $path") | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment