-
Notifications
You must be signed in to change notification settings - Fork 2.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[hbase10] Address #701 by mimicking the same locks from the HBase 0.9… #1028
Changes from 2 commits
4a4ab6e
32c8609
1ced4a2
292f984
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -66,10 +66,12 @@ | |
* durability. | ||
*/ | ||
public class HBaseClient10 extends com.yahoo.ycsb.DB { | ||
private static final AtomicInteger THREAD_COUNT = new AtomicInteger(0); | ||
|
||
private static final Object TABLE_LOCK = new Object(); | ||
|
||
private Configuration config = HBaseConfiguration.create(); | ||
|
||
private static AtomicInteger threadCount = new AtomicInteger(0); | ||
|
||
|
||
private boolean debug = false; | ||
|
||
private String tableName = ""; | ||
|
@@ -82,7 +84,6 @@ public class HBaseClient10 extends com.yahoo.ycsb.DB { | |
* @See #CONNECTION_LOCK. | ||
*/ | ||
private static Connection connection = null; | ||
private static final Object CONNECTION_LOCK = new Object(); | ||
|
||
// Depending on the value of clientSideBuffering, either bufferedMutator | ||
// (clientSideBuffering) or currentTable (!clientSideBuffering) will be used. | ||
|
@@ -144,12 +145,19 @@ public void init() throws DBException { | |
} | ||
} | ||
|
||
String table = getProperties().getProperty(TABLENAME_PROPERTY, TABLENAME_PROPERTY_DEFAULT); | ||
try { | ||
threadCount.getAndIncrement(); | ||
synchronized (CONNECTION_LOCK) { | ||
THREAD_COUNT.getAndIncrement(); | ||
synchronized (THREAD_COUNT) { | ||
if (connection == null) { | ||
// Initialize if not set up already. | ||
connection = ConnectionFactory.createConnection(config); | ||
|
||
// Terminate right now if table does not exist, since the client | ||
// will not propagate this error upstream once the workload | ||
// starts. | ||
final TableName tName = TableName.valueOf(table); | ||
connection.getTable(tName).getTableDescriptor(); | ||
} | ||
} | ||
} catch (java.io.IOException e) { | ||
|
@@ -172,19 +180,6 @@ public void init() throws DBException { | |
throw new DBException("No columnfamily specified"); | ||
} | ||
columnFamilyBytes = Bytes.toBytes(columnFamily); | ||
|
||
// Terminate right now if table does not exist, since the client | ||
// will not propagate this error upstream once the workload | ||
// starts. | ||
String table = getProperties().getProperty(TABLENAME_PROPERTY, TABLENAME_PROPERTY_DEFAULT); | ||
try { | ||
final TableName tName = TableName.valueOf(table); | ||
synchronized (CONNECTION_LOCK) { | ||
connection.getTable(tName).getTableDescriptor(); | ||
} | ||
} catch (IOException e) { | ||
throw new DBException(e); | ||
} | ||
} | ||
|
||
/** | ||
|
@@ -208,15 +203,10 @@ public void cleanup() throws DBException { | |
long en = System.nanoTime(); | ||
final String type = clientSideBuffering ? "UPDATE" : "CLEANUP"; | ||
measurements.measure(type, (int) ((en - st) / 1000)); | ||
threadCount.decrementAndGet(); | ||
if (threadCount.get() <= 0) { | ||
// Means we are done so ok to shut down the Connection. | ||
synchronized (CONNECTION_LOCK) { | ||
if (connection != null) { | ||
connection.close(); | ||
connection = null; | ||
} | ||
} | ||
int threadCount = THREAD_COUNT.decrementAndGet(); | ||
if (threadCount <= 0 && connection != null) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this needs to be in a lock that covers checking if it's null and setting it to null, othewise we race the initialization above that only creates a new connection when it's null. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But Alan was right in that this should only fire when the atomic int returns a 0 at which point only one thread will perform the null check and close + null the connection. Or are you thinking of the case where there may be some initialization issue wherein a run initializes multiple threads but the initializations fail and possibly cleanup is executed on a thread before initialization begins in another thread? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right exactly. |
||
connection.close(); | ||
connection = null; | ||
} | ||
} catch (IOException e) { | ||
throw new DBException(e); | ||
|
@@ -225,7 +215,7 @@ public void cleanup() throws DBException { | |
|
||
public void getHTable(String table) throws IOException { | ||
final TableName tName = TableName.valueOf(table); | ||
synchronized (CONNECTION_LOCK) { | ||
synchronized (TABLE_LOCK) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. #701 asked for this lock to be removed, if I am reading it correctly. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. He did mention it but I need confirmation from an HBase dev that the table fetch is thread safe in 1.X. I think it wasn't in the past. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Connection objects in hbase 1.y are threadsafe, so getTable is fine. The returned Table object is not threadsafe, but making them is supposed to be lightweight and done per-thread when needed. |
||
this.currentTable = connection.getTable(tName); | ||
if (clientSideBuffering) { | ||
final BufferedMutatorParams p = new BufferedMutatorParams(tName); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we merge this try into the try after the "Terminate right now" comment?