StoreDefault.java
package org.linkedopenactors.rdfpub.store.rdf4j;
import static org.eclipse.rdf4j.model.util.Values.iri;
import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.commons.rdf.api.BlankNodeOrIRI;
import org.apache.commons.rdf.api.Dataset;
import org.apache.commons.rdf.api.Graph;
import org.apache.commons.rdf.api.IRI;
import org.apache.commons.rdf.api.Quad;
import org.apache.commons.rdf.api.RDFTerm;
import org.apache.commons.rdf.api.Triple;
import org.apache.commons.rdfrdf4j.RDF4J;
import org.apache.commons.rdfrdf4j.RDF4JBlankNodeOrIRI;
import org.apache.commons.rdfrdf4j.RDF4JDataset;
import org.apache.commons.rdfrdf4j.RDF4JGraphLike;
import org.apache.commons.rdfrdf4j.RDF4JIRI;
import org.eclipse.rdf4j.model.Model;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.model.impl.TreeModel;
import org.eclipse.rdf4j.model.util.RDFContainers;
import org.eclipse.rdf4j.model.util.Values;
import org.eclipse.rdf4j.repository.Repository;
import org.eclipse.rdf4j.repository.RepositoryConnection;
import org.linkedopenactors.rdfpub.domain.commonsrdf.RdfFormat;
import org.linkedopenactors.rdfpub.domain.commonsrdf.vocab.OWL;
import org.linkedopenactors.rdfpub.domain.commonsrdf.vocab.Rdf;
import org.linkedopenactors.rdfpub.store.Store;
import lombok.extern.slf4j.Slf4j;
@Slf4j
class StoreDefault implements Store {
static final org.slf4j.Logger graphlog = org.slf4j.LoggerFactory.getLogger(StoreDefault.class.getName() + "_graphlog");
static final org.slf4j.Logger graphlogStore = org.slf4j.LoggerFactory.getLogger(StoreDefault.class.getName() + "_graphlog_store");
private Dataset dataset;
private RDF4J rdf;
private IRI owner;
public StoreDefault(RDF4J rdf, RDF4JDataset dataset, IRI owner) {
this.rdf = rdf;
this.dataset = dataset;
this.owner = owner;
}
@Override
public void add(BlankNodeOrIRI graphName, Graph graph, String logMsg) {
if(log.isDebugEnabled()) {
log.debug("add(graphName:"+graphName+",graph)");
logGraph(graphName, graph, logMsg, true);
}
// TODO problem with that loop!!! See PR https://github.com/apache/commons-rdf/pull/205
graph.stream().forEach(triple->{
dataset.add(graphName, triple.getSubject(), triple.getPredicate(), triple.getObject());
});
}
@Override
public void add(BlankNodeOrIRI graphName, Set<Triple> newTriples, String logMsg) {
log.debug("add(graphName:"+graphName+",graph)");
if(log.isDebugEnabled()) {
log.debug("add(graphName:"+graphName+",graph)");
logGraph(graphName, newTriples, logMsg, true);
}
// TODO problem with that loop!!! See PR https://github.com/apache/commons-rdf/pull/205
newTriples.stream().forEach(triple->{
dataset.add(graphName, triple.getSubject(), triple.getPredicate(), triple.getObject());
});
}
@Override
public void remove(BlankNodeOrIRI graphName, Set<BlankNodeOrIRI> subjects, String logMsg) {
subjects.stream().forEach(subject->{
dataset.remove(Optional.of(graphName), subject, null, null);
});
}
/**
* Info. the managment of the internal revisons while saving is done in {@link AbstractHandler}.
*/
@Override
public Optional<Graph> find(BlankNodeOrIRI graphName, BlankNodeOrIRI subject) {
return find(graphName, subject, 0);
}
@Override
public Optional<Graph> find(BlankNodeOrIRI graphName, BlankNodeOrIRI subject, int deep) {
if(deep>1) {
throw new UnsupportedOperationException("deep is currently only possible for 1.");
}
log.debug("find("+graphName+", "+subject+", "+deep+")");
Optional<Graph> graphOpt = find(graphName, subject, null, null)
.flatMap(graph->checkIfItsABag(graphName, subject, graph));
if(deep==1) {
StackTraceElement[] stackTrace = new RuntimeException().getStackTrace();
for (int i = 0; i < 5; i++) {
log.debug("comming from: " + stackTrace[i].getClassName() + "#" + stackTrace[i].getMethodName() + "("+stackTrace[i].getLineNumber()+")");
}
graphOpt.ifPresent(g->logGraph(graphName, g, "before resolve references"));
graphOpt = graphOpt.map(graph->resolveReferences(graphName, graph));
graphOpt.ifPresent(g->logGraph(graphName, g, "after resolve references"));
}
return graphOpt;
}
private Graph resolveReferences(BlankNodeOrIRI graphName, Graph graph) {
log.debug("resolveReferences("+graphName+", graph)");
// AtomicInteger i = new AtomicInteger(0);
Map<String, RDFTerm> m = graph.stream()
.collect(Collectors.toMap(e->UUID.randomUUID().toString() + "|" + e.getPredicate().toString(), Triple::getObject));
Map<String, IRI> filtered = m.entrySet().stream()
.filter(entry->IRI.class.isInstance(entry.getValue()))
.filter(entry->isInternal(entry.getValue().toString()))
.collect(Collectors.toMap(
e -> e.getKey(),
e -> IRI.class.cast(e.getValue())
));
// filtered.forEach((k,v)->System.out.println("k: " + k + ", v: " + v));
// Set<IRI> refs = graph.stream()
// .filter(t-> IRI.class.isInstance(t.getObject()))
// .map(Triple::getObject)
// .map(IRI.class::cast)
// .filter(s->isInternal(s.toString()))
// .collect(Collectors.toSet());
// log.debug("references: " + refs);
// refs.forEach(r->{
// Optional<Graph> gf = find(graphName, r, null, null)
// .flatMap(gr->checkIfItsABag(graphName, r, gr));
// log.debug("resolve reference: " + r);
// log.debug("resolved reference: " + gf);
// gf.ifPresent(gr->{
// gr.stream().forEach(graph::add);
// });
// });
filtered.forEach((k, v) -> {
log.debug("resolving: " + k + " -> " + v );
Optional<Graph> gf = find(graphName, v, null, null).flatMap(gr -> checkIfItsABag(graphName, v, gr));
// log.debug("resolve reference: " + v);
// log.debug("resolved reference: " + gf);
gf.ifPresent(gr -> {
gr.stream().forEach(graph::add);
});
});
return graph;
}
private boolean isInternal(String iri) {
return !iri.startsWith("http");
}
private Optional<Graph> checkIfItsABag(BlankNodeOrIRI graphName, BlankNodeOrIRI subject, Graph graph) {
if(isBag(subject, graph)) {
log.debug("checkIfItsABag: YES");
// logGraph(graphName, graph, "checkIfItsABag");
Optional<Graph> latest = findLatestRevision(graphName, (IRI)subject, true);
return latest;
} else {
log.debug("checkIfItsABag: NO");
// logGraph(graphName, graph, "checkIfItsABag");
}
return Optional.of(graph);
}
@Override
public Optional<Graph> find(BlankNodeOrIRI graphName, BlankNodeOrIRI subject, IRI predicate, RDFTerm object) {
Graph graph = rdf.createGraph();
try(Stream<? extends Quad> stream = dataset.stream(Optional.ofNullable(graphName), subject, predicate, object)) {
stream.forEach(quad->graph.add(quad.getSubject(), quad.getPredicate(), quad.getObject()));
}
if(graph.size() < 1) {
trace(graphName);
return Optional.empty();
} else {
logGraph(graphName, graph, "find ["+subject+"]");
return Optional.of(graph);
}
}
private void trace(BlankNodeOrIRI graphName) {
// try(Stream<? extends Quad> stream = dataset.stream(Optional.ofNullable(graphName), null, null, null)) {
// stream.forEach(quad->System.out.println("TRACE:: "+quad.getSubject() +" - " + quad.getPredicate() +" - " + quad.getObject()));
// }
}
@Override
public Map<BlankNodeOrIRI, Graph> findAll(BlankNodeOrIRI graphName, List<BlankNodeOrIRI> subjects) {
Map<BlankNodeOrIRI, Graph> result = new HashMap<>();
subjects.forEach(sub->{
find(graphName, sub)
.ifPresent(graph->result.put(sub, graph));
});
return result;
}
private Optional<Graph> findLatestRevision(BlankNodeOrIRI graphName, IRI collectionSubject, boolean useCollectionSubject) {
Optional<Graph> result = Optional.empty();
List<IRI> items = getCollection(graphName, collectionSubject);
if(!items.isEmpty()) {
IRI revisionSubject = items.get(items.size()-1);
result = find(graphName, revisionSubject);
if(useCollectionSubject) {
result = replaceSubject(result.orElse(null), revisionSubject, collectionSubject);
result.ifPresent(r->r.add(collectionSubject, rdf.createIRI(OWL.sameAs), revisionSubject));
}
result.ifPresent(g->logGraph(graphName, g, "getLatest"));
}
return result;
}
@Override
public Optional<IRI> findLatestRevisionSubject(BlankNodeOrIRI graphName, IRI collectionSubject) {
Optional<IRI> result = Optional.empty();
List<IRI> items = getCollection(graphName, collectionSubject);
if(!items.isEmpty()) {
result = Optional.ofNullable(items.get(items.size()-1));
}
return result;
}
@Override
public Optional<Graph> find(BlankNodeOrIRI graphName) {
// ATTENTION
// the dataset is a org.apache.commons.rdfrdf4j.impl.RepositoryDatasetImpl and returns a org.apache.commons.rdfrdf4j.impl.RepositoryGraphImpl
// which offers open connection streams, so the clinet has to take care of closing that stream.
// I think that's too dangerous, so it's better to have a decoupled copy
Optional<Graph> found = dataset.getGraph(graphName);
if(found.isPresent()) {
Graph copy = rdf.createGraph();
try(Stream<? extends Triple> stream = found.get().stream()) {
stream.forEach(triple->copy.add(triple));
return Optional.of(copy);
}
} else {
return Optional.empty();
}
}
/**
* Check's, if the passed graph is a {@link Rdf#Bag}.
* @param subject
* @param graph
* @return True, if the passed graph's type is {@link Rdf#Bag}.
*/
private boolean isBag(BlankNodeOrIRI subject, Graph graph) {
return graph.stream(subject, rdf.createIRI(Rdf.type), (RDFTerm)null)
.map(Triple::getObject)
.anyMatch(term->{
RDF4JIRI bagTerm = rdf.createIRI(Rdf.Bag);
return term.equals(bagTerm);
});
}
/**
* The latest Object is known to the user under a pointer name. This pointer
* points internally to a collection of revisions of a graph, so we have to
* replace the internal subject with the pointer.
*
* @param graph
* @param revisionSubject
* @param pointerSubject
* @return The passed graph with the pointerSubject instead of the intrenal subject.
*/
private Optional<Graph> replaceSubject(Graph graph, BlankNodeOrIRI revisionSubject, BlankNodeOrIRI pointerSubject) {
return Optional.ofNullable(graph).map(g->{
Graph result = rdf.createGraph();
g.stream(revisionSubject, null, null)
.forEach(t->result.add(pointerSubject, t.getPredicate(), t.getObject()));
return result;
});
}
public Set<BlankNodeOrIRI> getGraphNames() {
try(Stream<BlankNodeOrIRI> graphNames = dataset.getGraphNames()) {
return graphNames.collect(Collectors.toSet());
}
}
@Override
public void saveCollection(BlankNodeOrIRI graphName, IRI collectionSubject, List<IRI> items, String logMsg) {
log.debug("saveCollection: store: "+owner+" + graphName: " + graphName + "; collectionSubject: " + collectionSubject);
if(exists(graphName, collectionSubject)) {
throw new IllegalStateException("graph '%s' already exists!".formatted(graphName));
}
List<org.eclipse.rdf4j.model.IRI> rdf4jItems = items.stream()
.map(i->Values.iri(i.toString()))
.collect(Collectors.toList());
org.eclipse.rdf4j.model.IRI asValue = ((RDF4JIRI)collectionSubject).asValue();
Model bag = RDFContainers.toRDF(iri("http://www.w3.org/1999/02/22-rdf-syntax-ns#Bag"), rdf4jItems,
asValue,
new TreeModel());
// TODO use new method -> https://github.com/apache/commons-rdf/pull/205
Graph bagGraph = new RDF4J().asGraph(bag);
logGraph(graphName, bagGraph, logMsg, true);
bagGraph.stream().forEach(t->dataset.add(graphName, t.getSubject(), t.getPredicate(), t.getObject()));
}
/**
* @param graphName
* @param subject
* @return True, if the passed subjects points to a none empty graph.
*/
private boolean exists(BlankNodeOrIRI graphName, BlankNodeOrIRI subject) {
try(Graph graph = getGraph(graphName);){
return graph.contains(subject, null, null);
} catch (Exception e) {
throw new IllegalStateException("unable to get graph", e);
}
}
@Override
public List<IRI> getCollection(BlankNodeOrIRI graphName, BlankNodeOrIRI collectionSubject) {
log.debug("getCollection(graphName:"+graphName+",collection:"+collectionSubject+")");
try(Stream<? extends Triple> stream = getGraph(graphName).stream(collectionSubject, null, null)){
List<Triple> triples = stream.collect(Collectors.toList());
Graph sub = rdf.createGraph();
triples.forEach(sub::add);
@SuppressWarnings({ "rawtypes", "unchecked" })
Optional<Model> modelOpt = ((RDF4JGraphLike)sub).asModel();
log.debug("getCollection(...) model: " + new GraphToStringConverterDefault(null).convert(RdfFormat.TURTLE, modelOpt.orElse(null)));
Optional<List<Value>> valueListOpt = modelOpt.map(model->RDFContainers.toValues(org.eclipse.rdf4j.model.vocabulary.RDF.BAG, model,
((RDF4JBlankNodeOrIRI)collectionSubject).asValue(),
new ArrayList<Value>()));
return valueListOpt.map(this::toIriList).orElse(new ArrayList<>());
} catch (Exception e) {
throw new IllegalStateException("unable to get resource", e);
}
}
@Override
public Set<IRI> findCollection(BlankNodeOrIRI graphName, IRI memberParam) {
RDF4JIRI memberPredicate = rdf.createIRI("http://www.w3.org/2000/01/rdf-schema#member");
try(Stream<? extends Triple> stream = getGraph(graphName).stream(null, memberPredicate, memberParam)){
return stream
.map(Triple::getSubject)
.filter(IRI.class::isInstance)
.map(IRI.class::cast)
.collect(Collectors.toSet());
}
}
/**
* @param graphName
* @return The graph to the passed graphName.
*/
private Graph getGraph(BlankNodeOrIRI graphName) {
return dataset.getGraph(graphName).orElseThrow(()->new IllegalStateException("no graph with name '%s'".formatted(graphName)));
}
/**
* @param values
* @return The passed {@link Value}'s converted to {@link org.eclipse.rdf4j.model.IRI}'s.
*/
private List<IRI> toIriList(List<Value> values) {
return values.stream().map(v->rdf.createIRI(v.toString())).collect(Collectors.toList());
}
@Override
public void addItem(BlankNodeOrIRI graphName, IRI collectionSubject, IRI item, String logMsg) {
if(!this.owner.equals(graphName)) {
throw new IllegalStateException(owner + " != " + graphName);
}
if(!exists(graphName, collectionSubject)) {
saveCollection(graphName, collectionSubject, List.of(item), logMsg);
} else {
if(log.isDebugEnabled()) {
log.debug("addItem (store: "+owner+" + graphName:"+graphName+", collection:"+collectionSubject+", item:"+item+")");
}
long size = size(graphName, collectionSubject);
String propIriAsString = "http://www.w3.org/1999/02/22-rdf-syntax-ns#_%s".formatted(size+1);
// TODO how can we add multiple triples/quads ? See https://github.com/apache/commons-rdf/pull/205
dataset.add(graphName, collectionSubject, rdf.createIRI(propIriAsString), item);
dataset.add(graphName, collectionSubject, rdf.createIRI("http://www.w3.org/2000/01/rdf-schema#member"), item);
}
if(graphlogStore.isDebugEnabled()) {
graphlogStore.debug("items ("+collectionSubject+"): " + getCollection(graphName, collectionSubject));
}
}
// private void logGraph(BlankNodeOrIRI graphName, Set<Triple> triples, String msg) {
// logGraph(graphName, triples, msg, false);
// }
/**
* Logs the passed graph to the log.
* @param graphName
* @param triples the graph
* @param msg
*/
private void logGraph(BlankNodeOrIRI graphName, Set<Triple> triples, String msg, boolean store) {
Graph g = rdf.createGraph();
triples.forEach(g::add);
logGraph(graphName, g, msg, store);
}
private void logGraph(BlankNodeOrIRI graphName, Graph graph, String msg) {
logGraph(graphName, graph, msg, false);
}
/**
* Logs the passed graph to the log.
* @param graphName
* @param graph
* @param msg
*/
private void logGraph(BlankNodeOrIRI graphName, Graph graph, String msg, boolean store) {
if(store) {
if(graphlogStore.isDebugEnabled()) {
String str = "\n(graphlogStore)#####################################################################################\n";
str += graphName + "->" + msg + "\n";
graphlogStore.debug(str + new GraphToStringConverterDefault(null).convert(RdfFormat.TURTLE, graph));
}
}
if(graphlog.isDebugEnabled()) {
String str = "\n(graphlog)#####################################################################################\n";
str += graphName + "->" + msg + "\n";
graphlog.debug(str + new GraphToStringConverterDefault(null).convert(RdfFormat.TURTLE, graph));
}
}
// /**
// * Logs the whole graph with the passed graphname to the log.
// * @param graphName
// * @param msg
// */
// private void logExistingGraph(BlankNodeOrIRI graphName, String msg) {
// if(graphlog.isDebugEnabled()) {
// final String str = "\n((graphlog)#####################################################################################\n"
// + graphName + "->" + msg + "\n";
// find(graphName).ifPresentOrElse(graph->{
// graphlog.debug(str + new GraphToStringConverterDefault(null).convert(RdfFormat.TURTLE, graph));
// }, () -> graphlog.debug(msg + " empty"));
// }
// }
/**
* Determines the number of triples contained in the collection.
* @param graphName
* @param collectionSubject
* @return
*/
private long size(BlankNodeOrIRI graphName, BlankNodeOrIRI collectionSubject) {
Repository repo = ((RDF4JDataset)dataset).asRepository().orElseThrow();
try (RepositoryConnection conn = repo.getConnection()) {
Iterable<Statement> statements = conn.getStatements(Values.iri(collectionSubject.toString()), Values.iri("http://www.w3.org/2000/01/rdf-schema#member"), null);
return StreamSupport.stream(statements.spliterator(), false).count();
}
}
@Override
public List<IRI> subCollection(BlankNodeOrIRI graphName, BlankNodeOrIRI collectionSubject, int offset, int limit) {
List<IRI> all = getWholeListWithMemoryFootprint(graphName, collectionSubject);
int end = offset + limit;
end = end > all.size() ? all.size() : end;
List<IRI> subList = all.subList(offset, end);
all = null;
return subList;
}
/**
* This was an experiment, we have to check if it makes sence. Or if we have to
* move it to a statistics part.
*
* @param graphName
* @param collectionSubject
* @return All items of the collection.
*/
private List<IRI> getWholeListWithMemoryFootprint(BlankNodeOrIRI graphName, BlankNodeOrIRI collectionSubject) {
List<IRI> all;
try {
all = getCollection(graphName, collectionSubject);
List<String> ss = all.stream().map(Object::toString).collect(Collectors.toList());
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(ss);
log.debug("bos (kb): " + (bos.toByteArray().length/1024) + " - items: " + all.size());
} catch (Exception e) {
throw new IllegalStateException("oje", e);
}
return all;
}
}