BrightSide Workbench Full Report + Source Code
DaoStream.java
Go to the documentation of this file.
1 /*
2  * TurrĂ³ i Cutiller Foundation. License notice.
3  * Copyright (C) 2020 Lluis TurrĂ³ Cutiller <http://www.turro.org/>
4  *
5  * This program is free software: you can redistribute it and/or modify
6  * it under the terms of the GNU Affero General Public License as published by
7  * the Free Software Foundation, either version 3 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  * GNU Affero General Public License for more details.
14  *
15  * You should have received a copy of the GNU Affero General Public License
16  * along with this program. If not, see <http://www.gnu.org/licenses/>.
17  */
18 package org.turro.jpa.stream;
19 
20 import java.util.function.Consumer;
21 import java.util.stream.Stream;
22 import org.hibernate.query.Query;
23 import org.turro.elephant.db.WhereClause;
24 import org.turro.jpa.Dao;
25 import org.turro.jpa.DaoManager;
26 import org.turro.jpa.query.AbstractJpaQuery;
27 import org.turro.sql.SqlClause;
28 import org.turro.util.AtomicCounter;
29 
34 public class DaoStream<T> {
35 
36  private final Dao dao;
37  private final WhereClause wc;
38  private final SqlClause sc;
39  private final AbstractJpaQuery<T> query;
40  private DaoManager manager;
41 // private ScrollableResults results;
42 
43  public DaoStream(Dao dao, String query) {
44  this(dao, new WhereClause());
45  wc.addClause(query);
46  }
47 
48  public DaoStream(Dao dao, WhereClause wc) {
49  this.dao = dao;
50  this.wc = wc;
51  this.sc = null;
52  this.query = null;
53  }
54 
55  public DaoStream(Dao dao, SqlClause sc) {
56  this.dao = dao;
57  this.wc = null;
58  this.sc = sc;
59  this.query = null;
60  }
61 
62  public DaoStream(Dao dao, AbstractJpaQuery<T> query) {
63  this.dao = dao;
64  this.wc = null;
65  this.sc = null;
66  this.query = query;
67  }
68 
69  public <T> void forEach(Consumer<T> action) {
70  try(Stream stream = stream()) {
71  stream.forEach(action);
72  }
73  }
74 
75  public Stream<T> stream() {
76  return stream(100);
77  }
78 
79  public Stream<T> stream(int batchSize) {
80  manager = new DaoManager(dao);
81  Query q = null;
82  if(query != null) {
83  q = (Query) manager.createQuery(query.query());
84  } else if(wc != null) {
85  q = (Query) manager.createQuery(wc.getClause());
86  wc.setNamedParameters(q);
87  } else if(sc != null) {
88  q = (Query) manager.createQuery(sc.clause());
89  manager.setNamedParameters(q, sc.getNamedValues());
90  }
91  if(q != null) {
92  //q.setReadOnly(true);
93  //q.setLockMode(LockModeType.NONE);
94  //q.setFetchSize(batchSize);
95  openStreams.increment();
96  return (Stream<T>) q.<T>stream().onClose(() -> {
97  manager.close();
98  openStreams.decrement();
99  });
100  }
101  return null;
102 // results = q.scroll(ScrollMode.FORWARD_ONLY);
103 // return Stream
104 // .generate(() -> next(batchSize))
105 // .takeWhile(Predicate.not(List::isEmpty))
106 // .flatMap(List::stream)
107 // .onClose(() -> {
108 // results.close();
109 // manager.close();
110 // openStreams.decrement();
111 // });
112  }
113 
114 // private List<T> next(int batchSize) {
115 // List<T> result = new ArrayList<>();
116 // int count = 0;
117 // while (count++ < batchSize) {
118 // if(results.next()) {
119 // Object[] row = results.get();
120 // if(row.length > 1) {
121 // result.add((T) row);
122 // } else {
123 // result.add((T) row[0]);
124 // }
125 // } else {
126 // break;
127 // }
128 // }
129 // return result;
130 // }
131 
132  /* Debug */
133 
134  private static final AtomicCounter openStreams = new AtomicCounter();
135 
136  public static AtomicCounter getCounter() {
137  return openStreams;
138  }
139 
140 }
void setNamedParameters(Query q, Map< String, Object > parameters)
DaoStream(Dao dao, WhereClause wc)
Definition: DaoStream.java:48
DaoStream(Dao dao, AbstractJpaQuery< T > query)
Definition: DaoStream.java:62
static AtomicCounter getCounter()
Definition: DaoStream.java:136
DaoStream(Dao dao, SqlClause sc)
Definition: DaoStream.java:55
Stream< T > stream(int batchSize)
Definition: DaoStream.java:79
DaoStream(Dao dao, String query)
Definition: DaoStream.java:43