Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add getConnectionForSql and use it #988

Merged
merged 12 commits into from
Aug 3, 2021
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/BuildInfo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/
/** @ignore *//** */
import {MemberVersion} from './core/MemberVersion';

const clientVersion = require('../package.json').version;

Expand Down Expand Up @@ -52,4 +53,7 @@ export class BuildInfo {
return clientVersion;
}

public static calculateMemberVersion(m: MemberVersion) : number {
return BuildInfo.calculateServerVersion(m.major, m.minor, m.patch);
}
Comment on lines +56 to +58
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is added as a utility

}
3 changes: 2 additions & 1 deletion src/HazelcastClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ export class HazelcastClient {
this.clusterFailoverService
);
this.connectionRegistry = new ConnectionRegistryImpl(
this.config.connectionStrategy,
this.config.connectionStrategy.asyncStart,
this.config.connectionStrategy.reconnectMode,
Comment on lines +164 to +165
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactored constructor to accept specific configs it needs

this.config.network.smartRouting,
this.loadBalancer,
this.clusterService
Expand Down
2 changes: 2 additions & 0 deletions src/core/LoadBalancer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,15 @@ export interface LoadBalancer {
* Returns the next data member to route to.
*
* @return Returns the next data member or `null` if no data member is available
* @deprecated Since 5.0, the method is unused
srknzl marked this conversation as resolved.
Show resolved Hide resolved
*/
nextDataMember(): Member | null;

/**
* Returns whether this instance supports getting data members through a call to {@link nextDataMember()}.
*
* @return Returns `true` if this load balancer can get a data member.
* @deprecated Since 5.0, the method is unused
*/
canGetNextDataMember(): boolean;
}
Expand Down
25 changes: 25 additions & 0 deletions src/core/MemberVersion.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,29 @@ export class MemberVersion {
this.minor = minor;
this.patch = patch;
}

/**
* @param other other version to compare to
* @param ignorePatchVersion whether patch in the version should be ignored
* @return true if this version equals `other`
*/
equals(other: MemberVersion, ignorePatchVersion = false): boolean {
if (ignorePatchVersion) {
return this.major === other.major && this.minor === other.minor;
} else {
return this.major === other.major && this.minor === other.minor && this.patch === other.patch;
}
}

/**
* @param ignorePatchVersion whether patch in the version should be ignored
* @return string format of this `MemberVersion`
*/
toString(ignorePatchVersion = false): string {
if (ignorePatchVersion) {
return `${this.major}.${this.minor}`;
} else {
return `${this.major}.${this.minor}.${this.patch}`;
}
}
}
2 changes: 1 addition & 1 deletion src/invocation/ClusterService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ export class ClusterService implements Cluster {
* @param uuid The UUID of the member as a string.
* @return The member that was found, or undefined if not found.
*/
getMember(uuid: string): MemberImpl {
getMember(uuid: string): MemberImpl | undefined {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

side change that is unrelated

assertNotNull(uuid);
return this.memberListSnapshot.members.get(uuid);
}
Expand Down
101 changes: 63 additions & 38 deletions src/network/ConnectionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,15 @@ import {
scheduleWithRepetition,
shuffleArray,
Task,
timedPromise
timedPromise,
memberOfLargerSameVersionGroup
} from '../util/Util';
import {BasicSSLOptionsFactory} from '../connection/BasicSSLOptionsFactory';
import {ILogger} from '../logging/ILogger';
import {HeartbeatManager} from './HeartbeatManager';
import {UuidUtil} from '../util/UuidUtil';
import {WaitStrategy} from './WaitStrategy';
import {ConnectionStrategyConfig, ReconnectMode} from '../config/ConnectionStrategyConfig';
import {ReconnectMode} from '../config/ConnectionStrategyConfig';
import {ClientConfig, ClientConfigImpl} from '../config/Config';
import {LifecycleState, LifecycleServiceImpl, LifecycleService} from '../LifecycleService';
import {ClientMessage} from '../protocol/ClientMessage';
Expand All @@ -81,6 +82,7 @@ export const CLIENT_TYPE = 'NJS';
const SERIALIZATION_VERSION = 1;
const SET_TIMEOUT_MAX_DELAY = 2147483647;
const BINARY_PROTOCOL_VERSION = Buffer.from('CP2');
const SQL_CONNECTION_RANDOM_ATTEMPTS = 10;

enum ConnectionState {
/**
Expand Down Expand Up @@ -131,10 +133,21 @@ export interface ConnectionRegistry {

/**
* Returns a random connection from active connections
* @param dataMember true if only data members should be considered
* @return Connection if there is at least one connection, otherwise null
*/
getRandomConnection(dataMember?: boolean): Connection | null;
getRandomConnection(): Connection | null;

/**
* Returns a connection for executing SQL.
*
* @throws IllegalStateError If there are more than 2 distinct member versions found
* @return
* * A random connection to a data member from the larger same-version group
* * If there's no such connection, return connection to a random data member
* * If there's no such connection, return any random connection
* * If there are no connections, null is returned
*/
getConnectionForSql(): Connection | null;

/**
* Returns if invocation allowed. Invocation is allowed only if connection state is {@link INITIALIZED_ON_CLUSTER}
Expand All @@ -148,24 +161,15 @@ export class ConnectionRegistryImpl implements ConnectionRegistry {

private active = false;
private readonly activeConnections = new Map<string, Connection>();
private readonly loadBalancer: LoadBalancer;
private connectionState = ConnectionState.INITIAL;
private readonly smartRoutingEnabled: boolean;
private readonly asyncStart: boolean;
private readonly reconnectMode: ReconnectMode;
private readonly clusterService: ClusterService;

constructor(
connectionStrategy: ConnectionStrategyConfig,
smartRoutingEnabled: boolean,
loadBalancer: LoadBalancer,
clusterService: ClusterService
private readonly asyncStart: boolean,
private readonly reconnectMode: ReconnectMode,
private readonly smartRoutingEnabled: boolean,
private readonly loadBalancer: LoadBalancer,
private readonly clusterService: ClusterService
Comment on lines +167 to +171
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

preferred to use compact construction syntax(like other classes I refactored in #797 )

) {
this.smartRoutingEnabled = smartRoutingEnabled;
this.asyncStart = connectionStrategy.asyncStart;
this.reconnectMode = connectionStrategy.reconnectMode;
this.loadBalancer = loadBalancer;
this.clusterService = clusterService;
}

isActive(): boolean {
Expand All @@ -188,41 +192,62 @@ export class ConnectionRegistryImpl implements ConnectionRegistry {
return this.activeConnections.get(uuid.toString());
}

getRandomConnection(dataMember = false): Connection | null {
getRandomConnection(): Connection | null {
if (this.smartRoutingEnabled) {
let member;
if (dataMember) {
if (this.loadBalancer.canGetNextDataMember()) {
member = this.loadBalancer.nextDataMember();
} else {
member = null;
const member = this.loadBalancer.next();
if (member != null) {
const connection = this.getConnection(member.uuid);
if (connection != null) {
return connection;
}
} else {
member = this.loadBalancer.next();
}
}

const iterator = this.activeConnections.values();
const next = iterator.next();
if (!next.done) {
return next.value;
} else {
return null;
}
}
Comment on lines +195 to +213
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reverted to old version


if (member !== null) {
getConnectionForSql(): Connection | null {
if (this.smartRoutingEnabled) {
// There might be a race - the chosen member might be just connected or disconnected - try a
// couple of times, the memberOfLargerSameVersionGroup returns a random connection,
// we might be lucky...
for (let i = 0; i < SQL_CONNECTION_RANDOM_ATTEMPTS; i++) {
const member = memberOfLargerSameVersionGroup(this.clusterService.getMembers());

if (member === null) {
break;
}
const connection = this.getConnection(member.uuid);
if (connection != null) {
if (connection !== undefined) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

connection can only be undefined, I chose to be more specific here

return connection;
}
}
}


for (const entry of this.activeConnections.entries()) {
const uuid = entry[0];
// Otherwise iterate over connections and return the first one that's not to a lite member
let firstConnection: Connection | null = null;
for (const entry of this.activeConnections) {
const memberId = entry[0];
const connection = entry[1];
if (dataMember) {
const member = this.clusterService.getMember(uuid);
if (!member || member.liteMember) {
continue;
}

if (firstConnection === null) {
firstConnection = connection;
}
const member = this.clusterService.getMember(memberId);
if (member === undefined || member.liteMember) {
continue;
}
return connection;
}

return null;
// Failed to get a connection to a data member
return firstConnection;
}

forEachConnection(fn: (conn: Connection) => void): void {
Expand Down
4 changes: 2 additions & 2 deletions src/sql/SqlResult.ts
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ export class SqlResultImpl implements SqlResult {
this.sqlService.close(this.connection, this.queryId).then(() => {
this.closeDeferred.resolve();
}).catch(err => {
this.closeDeferred.reject(this.sqlService.toHazelcastSqlException(err, this.connection));
this.closeDeferred.reject(this.sqlService.rethrow(err, this.connection));
});

this.closed = true;
Expand Down Expand Up @@ -354,7 +354,7 @@ export class SqlResultImpl implements SqlResult {
this.fetchDeferred.resolve(sqlPage);
this.fetchDeferred = undefined; // Set fetchDeferred to undefined to be able to fetch again
}).catch(err => {
this.fetchDeferred.reject(this.sqlService.toHazelcastSqlException(err, this.connection));
this.fetchDeferred.reject(this.sqlService.rethrow(err, this.connection));
});

return this.fetchDeferred.promise;
Expand Down
45 changes: 28 additions & 17 deletions src/sql/SqlService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ export class SqlServiceImpl implements SqlService {
* @param connection
* @returns {@link HazelcastSqlException}
*/
toHazelcastSqlException(err: any, connection: Connection) : HazelcastSqlException {
rethrow(err: any, connection: Connection): HazelcastSqlException {
Copy link
Member Author

@srknzl srknzl Jul 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactored so that rethrow takes err and connection and returns a HazelcastSqlException whereas toHazelcastSqlException is just for converting an err to HazelcastSqlException

Done this because I needed toHazelcastSqlException logic as a separate func

if (!connection.isAlive()) {
return new HazelcastSqlException(
this.connectionManager.getClientUuid(),
Expand All @@ -265,19 +265,23 @@ export class SqlServiceImpl implements SqlService {
err
)
} else {
if (err instanceof HazelcastSqlException) {
return err;
}
let originatingMemberId;
if (err.hasOwnProperty('originatingMemberId')) {
originatingMemberId = err.originatingMemberId;
} else {
originatingMemberId = this.connectionManager.getClientUuid();
}
return new HazelcastSqlException(
originatingMemberId, SqlErrorCode.GENERIC, err.message, err
);
return this.toHazelcastSqlException(err);
}
}

toHazelcastSqlException(err: any): HazelcastSqlException {
if (err instanceof HazelcastSqlException) {
return err;
}
let originatingMemberId;
if (err.hasOwnProperty('originatingMemberId')) {
originatingMemberId = err.originatingMemberId;
} else {
originatingMemberId = this.connectionManager.getClientUuid();
}
return new HazelcastSqlException(
originatingMemberId, SqlErrorCode.GENERIC, err.message, err
);
}

executeStatement(sqlStatement: SqlStatement): SqlResult {
Expand All @@ -287,9 +291,16 @@ export class SqlServiceImpl implements SqlService {
throw new IllegalArgumentError(`Invalid argument given to execute(): ${error.message}`, error)
}

const connection = this.connectionRegistry.getRandomConnection(true);
let connection: Connection | null;

try {
connection = this.connectionRegistry.getConnectionForSql();
} catch (e) {
throw this.toHazelcastSqlException(e);
}

if (connection === null) {
// Either the client is not connected to the cluster, or there are no data members in the cluster.
// The client is not connected to the cluster.
throw new HazelcastSqlException(
this.connectionManager.getClientUuid(),
SqlErrorCode.CONNECTION_PROBLEM,
Expand Down Expand Up @@ -352,13 +363,13 @@ export class SqlServiceImpl implements SqlService {
SqlServiceImpl.handleExecuteResponse(clientMessage, res);
}).catch(err => {
res.onExecuteError(
this.toHazelcastSqlException(err, connection)
this.rethrow(err, connection)
);
});

return res;
} catch (error) {
throw this.toHazelcastSqlException(error, connection);
throw this.rethrow(error, connection);
}
}

Expand Down
Loading