1 package org.apache.torque.oid;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.math.BigDecimal;
23 import java.sql.Connection;
24 import java.sql.PreparedStatement;
25 import java.sql.ResultSet;
26 import java.sql.SQLException;
27 import java.sql.Statement;
28 import java.util.ArrayList;
29 import java.util.Hashtable;
30 import java.util.List;
31 import java.util.Map;
32
33 import org.apache.commons.configuration.Configuration;
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36 import org.apache.torque.Database;
37 import org.apache.torque.Torque;
38 import org.apache.torque.TorqueException;
39 import org.apache.torque.util.Transaction;
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76 public class IDBroker implements Runnable, IdGenerator
77 {
78
79 public static final String ID_TABLE = "ID_TABLE";
80
81
82 public static final String COL_TABLE_NAME = "TABLE_NAME";
83
84
85 public static final String TABLE_NAME = ID_TABLE + "." + COL_TABLE_NAME;
86
87
88 public static final String COL_TABLE_ID = "ID_TABLE_ID";
89
90
91 public static final String TABLE_ID = ID_TABLE + "." + COL_TABLE_ID;
92
93
94 public static final String COL_NEXT_ID = "NEXT_ID";
95
96
97 public static final String NEXT_ID = ID_TABLE + "." + COL_NEXT_ID;
98
99
100 public static final String COL_QUANTITY = "QUANTITY";
101
102
103 public static final String QUANTITY = ID_TABLE + "." + COL_QUANTITY;
104
105
106 private static final double PREFETCH_BACKUP_QUANTITY = 10d;
107
108
109 private static final double CLEVERQUANTITY_MAX_DEFAULT = 10000d;
110
111
112 private final String databaseName;
113
114
115
116
117
118 private static final int DEFAULT_SIZE = 40;
119
120
121
122
123
124
125
126 private final Map<String, List<BigDecimal>> ids
127 = new Hashtable<String, List<BigDecimal>>(DEFAULT_SIZE);
128
129
130
131
132
133
134
135 private final Map<String, BigDecimal> quantityStore
136 = new Hashtable<String, BigDecimal>(DEFAULT_SIZE);
137
138
139
140
141
142
143
144 private final Map<String, java.util.Date> lastQueryTime
145 = new Hashtable<String, java.util.Date>(DEFAULT_SIZE);
146
147
148
149
150 private static final long SLEEP_PERIOD = 60000;
151
152
153
154
155 private static final float SAFETY_MARGIN = 1.2f;
156
157
158
159
160 private Thread houseKeeperThread = null;
161
162
163
164
165 private boolean transactionsSupported = false;
166
167
168 private boolean threadRunning = false;
169
170
171
172
173 private static final BigDecimal ONE = new BigDecimal("1");
174
175
176 private Configuration configuration;
177
178
179 private static final String DB_IDBROKER_CLEVERQUANTITY =
180 "idbroker.clever.quantity";
181
182
183 private static final String DB_IDBROKER_CLEVERQUANTITY_MAX =
184 "idbroker.clever.quantity.max";
185
186
187 private static final String DB_IDBROKER_PREFETCH =
188 "idbroker.prefetch";
189
190
191 private static final String DB_IDBROKER_USENEWCONNECTION =
192 "idbroker.usenewconnection";
193
194
195 private final Log log = LogFactory.getLog(IDBroker.class);
196
197
198
199
200
201
202 public IDBroker(Database database)
203 {
204 this.databaseName = database.getName();
205 Torque.registerIDBroker(this);
206 }
207
208
209
210
211 public void start()
212 {
213 configuration = Torque.getConfiguration();
214
215
216 if (configuration.getBoolean(DB_IDBROKER_PREFETCH, true))
217 {
218 houseKeeperThread = new Thread(this);
219
220
221
222
223 houseKeeperThread.setDaemon(true);
224 houseKeeperThread.setName("Torque - ID Broker thread");
225 houseKeeperThread.start();
226 }
227
228
229
230
231 Connection dbCon = null;
232 try
233 {
234 dbCon = Transaction.begin(databaseName);
235 transactionsSupported = dbCon.getMetaData().supportsTransactions();
236 Transaction.commit(dbCon);
237 dbCon = null;
238 }
239 catch (Exception e)
240 {
241 log.warn("Could not read from connection Metadata"
242 + " whether transactions are supported for the database "
243 + databaseName,
244 e);
245 transactionsSupported = false;
246 }
247 finally
248 {
249 if (dbCon != null)
250 {
251 Transaction.safeRollback(dbCon);
252 }
253 }
254 if (!transactionsSupported)
255 {
256 log.warn("IDBroker is being used with db '" + databaseName
257 + "', which does not support transactions. IDBroker "
258 + "attempts to use transactions to limit the possibility "
259 + "of duplicate key generation. Without transactions, "
260 + "duplicate key generation is possible if multiple JVMs "
261 + "are used or other means are used to write to the "
262 + "database.");
263 }
264 }
265
266
267
268
269
270
271 public void setConfiguration(Configuration configuration)
272 {
273 this.configuration = configuration;
274 }
275
276
277
278
279
280
281
282
283
284
285
286
287
288 public int getIdAsInt(Connection connection, Object tableName)
289 throws TorqueException
290 {
291 return getIdAsBigDecimal(connection, tableName).intValue();
292 }
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307 public long getIdAsLong(Connection connection, Object tableName)
308 throws TorqueException
309 {
310 return getIdAsBigDecimal(connection, tableName).longValue();
311 }
312
313
314
315
316
317
318
319
320
321
322
323
324
325 public BigDecimal getIdAsBigDecimal(Connection connection,
326 Object tableName)
327 throws TorqueException
328 {
329 BigDecimal[] id = getNextIds((String) tableName, 1, connection);
330 return id[0];
331 }
332
333
334
335
336
337
338
339
340
341
342
343
344
345 public String getIdAsString(Connection connection, Object tableName)
346 throws TorqueException
347 {
348 return getIdAsBigDecimal(connection, tableName).toString();
349 }
350
351
352
353
354
355
356 public boolean isPriorToInsert()
357 {
358 return true;
359 }
360
361
362
363
364
365
366 public boolean isPostInsert()
367 {
368 return false;
369 }
370
371
372
373
374
375
376
377 public boolean isConnectionRequired()
378 {
379 return false;
380 }
381
382
383
384
385
386
387 public boolean isThreadRunning()
388 {
389 return threadRunning;
390 }
391
392
393
394
395
396
397
398
399
400 public synchronized BigDecimal[] getNextIds(String tableName,
401 int numOfIdsToReturn)
402 throws Exception
403 {
404 return getNextIds(tableName, numOfIdsToReturn, null);
405 }
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420 public synchronized BigDecimal[] getNextIds(String tableName,
421 int numOfIdsToReturn,
422 Connection connection)
423 throws TorqueException
424 {
425 if (tableName == null)
426 {
427 throw new TorqueException("getNextIds(): tableName == null");
428 }
429
430
431
432
433
434
435
436
437
438
439 List<BigDecimal> availableIds = ids.get(tableName);
440
441 if (availableIds == null || availableIds.size() < numOfIdsToReturn)
442 {
443 if (availableIds == null)
444 {
445 log.debug("Forced id retrieval - no available list for table "
446 + tableName);
447 }
448 else
449 {
450 log.debug("Forced id retrieval - " + availableIds.size()
451 + " ids still available for table " + tableName);
452 }
453 storeIDs(tableName, true, connection);
454 availableIds = ids.get(tableName);
455 }
456
457 int size = availableIds.size() < numOfIdsToReturn
458 ? availableIds.size() : numOfIdsToReturn;
459
460 BigDecimal[] results = new BigDecimal[size];
461
462
463
464
465
466
467 for (int i = size - 1; i >= 0; i--)
468 {
469 results[i] = availableIds.get(i);
470 availableIds.remove(i);
471 }
472
473
474 return results;
475 }
476
477
478
479
480
481
482
483
484 public boolean exists(String tableName)
485 throws Exception
486 {
487 String query = new StringBuilder()
488 .append("select ")
489 .append(TABLE_NAME)
490 .append(" where ")
491 .append(TABLE_NAME).append("='").append(tableName).append('\'')
492 .toString();
493
494 boolean exists = false;
495 Connection dbCon = null;
496 try
497 {
498 dbCon = Transaction.begin(databaseName);
499 Statement statement = dbCon.createStatement();
500 ResultSet rs = statement.executeQuery(query);
501 exists = rs.next();
502 statement.close();
503 Transaction.commit(dbCon);
504 dbCon = null;
505 }
506 finally
507 {
508 if (dbCon != null)
509 {
510 Transaction.safeRollback(dbCon);
511 }
512 }
513 return exists;
514 }
515
516
517
518
519
520
521 public void run()
522 {
523 log.debug("IDBroker thread was started.");
524 threadRunning = true;
525
526 Thread thisThread = Thread.currentThread();
527 while (houseKeeperThread == thisThread)
528 {
529 try
530 {
531 Thread.sleep(SLEEP_PERIOD);
532 }
533 catch (InterruptedException exc)
534 {
535 log.trace("InterruptedException caught and ignored "
536 + "during IdBroker sleep");
537 }
538
539
540 for (String tableName : ids.keySet())
541 {
542 if (log.isDebugEnabled())
543 {
544 log.debug("IDBroker thread checking for more keys "
545 + "on table: " + tableName);
546 }
547 List<BigDecimal> availableIds = ids.get(tableName);
548 int quantity = getQuantity(tableName, null).intValue();
549 if (quantity > availableIds.size())
550 {
551 try
552 {
553
554
555
556 storeIDs(tableName, false, null);
557 if (log.isDebugEnabled())
558 {
559 log.debug("Retrieved more ids for table: "
560 + tableName);
561 }
562 }
563 catch (Exception exc)
564 {
565 log.error("There was a problem getting new IDs "
566 + "for table: " + tableName, exc);
567 }
568 }
569 }
570 }
571 log.debug("IDBroker thread finished.");
572 threadRunning = false;
573 }
574
575
576
577
578
579
580
581 public void stop()
582 {
583 if (houseKeeperThread != null)
584 {
585 Thread localHouseKeeperThread = houseKeeperThread;
586 houseKeeperThread = null;
587 localHouseKeeperThread.interrupt();
588 }
589 ids.clear();
590 lastQueryTime.clear();
591 quantityStore.clear();
592 transactionsSupported = false;
593 }
594
595
596
597
598
599
600
601
602
603
604 private void checkTiming(String tableName)
605 {
606
607
608 if (!configuration.getBoolean(DB_IDBROKER_CLEVERQUANTITY, true)
609 || !configuration.getBoolean(DB_IDBROKER_PREFETCH, true))
610 {
611 return;
612 }
613
614
615 java.util.Date lastTime = lastQueryTime.get(tableName);
616 java.util.Date now = new java.util.Date();
617
618 if (lastTime != null)
619 {
620 long thenLong = lastTime.getTime();
621 long nowLong = now.getTime();
622 long timeLapse = nowLong - thenLong;
623 log.debug("checkTiming(): sleep time was "
624 + timeLapse + " milliseconds for table " + tableName);
625 if (timeLapse < SLEEP_PERIOD)
626 {
627 log.debug("checkTiming(): Unscheduled retrieval of ids "
628 + "for table " + tableName);
629
630
631 BigDecimal quantity = getQuantity(tableName, null);
632 double newQuantity;
633 if (timeLapse > 0)
634 {
635 float rate = quantity.floatValue() / timeLapse;
636 newQuantity
637 = Math.ceil(SLEEP_PERIOD * rate * SAFETY_MARGIN);
638 log.debug("checkTiming(): calculated new quantity "
639 + newQuantity + " from rate " + rate);
640 }
641 else
642 {
643
644
645 newQuantity = quantity.floatValue() * 2;
646 log.debug("checkTiming(): calculated new quantity "
647 + newQuantity
648 + " from double the old quantity (time lapse 0)");
649 }
650
651 Double maxQuantity = configuration.getDouble(
652 DB_IDBROKER_CLEVERQUANTITY_MAX,
653 CLEVERQUANTITY_MAX_DEFAULT);
654 if (maxQuantity != null && newQuantity > maxQuantity)
655 {
656 if (quantity.doubleValue() > maxQuantity)
657 {
658
659 newQuantity = quantity.doubleValue();
660 }
661 else
662 {
663 newQuantity = maxQuantity;
664 }
665 }
666 quantityStore.put(tableName, new BigDecimal(newQuantity));
667 log.debug("checkTiming(): new quantity " + newQuantity
668 + " stored in quantity store (not in db)");
669 }
670 }
671 lastQueryTime.put(tableName, now);
672 }
673
674
675
676
677
678
679
680
681
682
683
684 private synchronized void storeIDs(String tableName,
685 boolean adjustQuantity,
686 Connection connection)
687 throws TorqueException
688 {
689 log.debug("storeIDs(): Start retrieving ids from database.");
690 BigDecimal nextId = null;
691 BigDecimal quantity = null;
692
693
694
695
696
697
698 if (adjustQuantity)
699 {
700 checkTiming(tableName);
701 }
702
703 boolean useNewConnection = (connection == null) || (configuration
704 .getBoolean(DB_IDBROKER_USENEWCONNECTION, true));
705 try
706 {
707 if (useNewConnection)
708 {
709 connection = Transaction.begin(databaseName);
710 if (log.isTraceEnabled())
711 {
712 log.trace("storeIDs(): fetched connection, "
713 + "started transaction.");
714 }
715 }
716
717
718
719
720
721
722 quantity = getQuantity(tableName, connection);
723 updateQuantity(connection, tableName, quantity);
724
725
726 BigDecimal[] results = selectRow(connection, tableName);
727 nextId = results[0];
728
729
730
731 BigDecimal newNextId = nextId.add(quantity);
732 updateNextId(connection, tableName, newNextId.toString());
733
734 if (useNewConnection)
735 {
736 Transaction.commit(connection);
737 if (log.isTraceEnabled())
738 {
739 log.trace("storeIDs(): Transaction committed, "
740 + "connection returned");
741 }
742 }
743 }
744 catch (TorqueException e)
745 {
746 if (useNewConnection)
747 {
748 Transaction.safeRollback(connection);
749 }
750 throw e;
751 }
752
753 List<BigDecimal> availableIds = ids.get(tableName);
754 if (availableIds == null)
755 {
756 availableIds = new ArrayList<BigDecimal>();
757 ids.put(tableName, availableIds);
758 }
759
760
761 int numId = quantity.intValue();
762 for (int i = 0; i < numId; i++)
763 {
764 availableIds.add(nextId);
765 nextId = nextId.add(ONE);
766 }
767
768 }
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784 private BigDecimal getQuantity(String tableName, Connection connection)
785 {
786 BigDecimal quantity = null;
787
788
789 if (!configuration.getBoolean(DB_IDBROKER_PREFETCH, true))
790 {
791 quantity = new BigDecimal((double) 1);
792 }
793
794 else if (quantityStore.containsKey(tableName))
795 {
796 quantity = quantityStore.get(tableName);
797 }
798 else
799 {
800 log.debug("getQuantity() : start fetch quantity for table "
801 + tableName + " from database");
802 boolean useNewConnection = (connection == null) || (configuration
803 .getBoolean(DB_IDBROKER_USENEWCONNECTION, true));
804 try
805 {
806 if (useNewConnection)
807 {
808 connection = Transaction.begin(databaseName);
809 if (log.isTraceEnabled())
810 {
811 log.trace("getQuantity(): connection fetched, "
812 + "transaction started");
813 }
814 }
815
816
817 BigDecimal[] results = selectRow(connection, tableName);
818
819
820 quantity = results[1];
821 quantityStore.put(tableName, quantity);
822 log.debug("getQuantity() : quantity fetched for table "
823 + tableName + ", result is " + quantity);
824 if (useNewConnection)
825 {
826 Transaction.commit(connection);
827 connection = null;
828 if (log.isTraceEnabled())
829 {
830 log.trace("getQuantity(): transaction committed, "
831 + "connection returned");
832 }
833 }
834 }
835 catch (Exception e)
836 {
837 quantity = new BigDecimal(PREFETCH_BACKUP_QUANTITY);
838 }
839 finally
840 {
841 if (useNewConnection && connection != null)
842 {
843 Transaction.safeRollback(connection);
844 }
845 }
846 }
847 return quantity;
848 }
849
850
851
852
853
854
855
856
857
858
859 private BigDecimal[] selectRow(Connection con, String tableName)
860 throws TorqueException
861 {
862 StringBuffer stmt = new StringBuffer();
863 stmt.append("SELECT ")
864 .append(COL_NEXT_ID)
865 .append(", ")
866 .append(COL_QUANTITY)
867 .append(" FROM ")
868 .append(ID_TABLE)
869 .append(" WHERE ")
870 .append(COL_TABLE_NAME)
871 .append(" = ?");
872
873 PreparedStatement statement = null;
874 ResultSet rs = null;
875
876 BigDecimal[] results = new BigDecimal[2];
877
878 try
879 {
880 statement = con.prepareStatement(stmt.toString());
881 statement.setString(1, tableName);
882 rs = statement.executeQuery();
883
884 if (rs.next())
885 {
886
887
888
889 results[0] = new BigDecimal(rs.getString(1));
890 results[1] = new BigDecimal(rs.getString(2));
891 }
892 else
893 {
894 throw new TorqueException("The table " + tableName
895 + " does not have a proper entry in the " + ID_TABLE);
896 }
897 rs.close();
898 rs = null;
899 statement.close();
900 statement = null;
901
902 }
903 catch (SQLException e)
904 {
905 throw new TorqueException(e);
906 }
907 finally
908 {
909 if (rs != null)
910 {
911 try
912 {
913 rs.close();
914 }
915 catch (SQLException e)
916 {
917 log.warn("Could not close result set", e);
918 }
919 }
920 if (statement != null)
921 {
922 try
923 {
924 statement.close();
925 }
926 catch (SQLException e)
927 {
928 log.warn("Could not close statement", e);
929 }
930 }
931 }
932
933 return results;
934 }
935
936
937
938
939
940
941
942
943
944
945 private void updateNextId(Connection con, String tableName, String id)
946 throws TorqueException
947 {
948
949
950 StringBuilder stmt = new StringBuilder();
951 stmt.append("UPDATE " + ID_TABLE)
952 .append(" SET ")
953 .append(COL_NEXT_ID)
954 .append(" = ")
955 .append(id)
956 .append(" WHERE ")
957 .append(COL_TABLE_NAME)
958 .append(" = '")
959 .append(tableName)
960 .append('\'');
961
962 Statement statement = null;
963
964 if (log.isDebugEnabled())
965 {
966 log.debug("updateNextId: " + stmt.toString());
967 }
968
969 try
970 {
971 statement = con.createStatement();
972 statement.executeUpdate(stmt.toString());
973 }
974 catch (SQLException e)
975 {
976 throw new TorqueException(e);
977 }
978 finally
979 {
980 if (statement != null)
981 {
982 try
983 {
984 statement.close();
985 }
986 catch (SQLException e)
987 {
988 throw new TorqueException(e);
989 }
990 }
991 }
992 }
993
994
995
996
997
998
999
1000
1001
1002
1003 protected void updateQuantity(Connection con, String tableName,
1004 BigDecimal quantity)
1005 throws TorqueException
1006 {
1007 log.debug("updateQuantity(): start for table " + tableName
1008 + " and quantity " + quantity);
1009 StringBuilder stmt = new StringBuilder();
1010 stmt.append("UPDATE ")
1011 .append(ID_TABLE)
1012 .append(" SET ")
1013 .append(COL_QUANTITY)
1014 .append(" = ")
1015 .append(quantity)
1016 .append(" WHERE ")
1017 .append(COL_TABLE_NAME)
1018 .append(" = '")
1019 .append(tableName)
1020 .append('\'');
1021
1022 Statement statement = null;
1023
1024 if (log.isDebugEnabled())
1025 {
1026 log.debug("updateQuantity(): " + stmt.toString());
1027 }
1028
1029 try
1030 {
1031 statement = con.createStatement();
1032 statement.executeUpdate(stmt.toString());
1033 log.debug("updateQuantity(): quantity written, end");
1034 }
1035 catch (SQLException e)
1036 {
1037 throw new TorqueException(e);
1038 }
1039 finally
1040 {
1041 if (statement != null)
1042 {
1043 try
1044 {
1045 statement.close();
1046 }
1047 catch (SQLException e)
1048 {
1049 throw new TorqueException(e);
1050 }
1051 }
1052 }
1053 }
1054
1055
1056
1057
1058
1059
1060
1061
1062 protected BigDecimal getQuantity(String tableName)
1063 {
1064 return quantityStore.get(tableName);
1065 }
1066 }