Skip to content
Nuno Aguiar edited this page May 22, 2018 · 4 revisions

ow.ch.comms.getSubscriberFunc

ow.ch.comms.getSubscriberFunc(aURL, aUUID, aLogin, aPassword, aTimeout)

Returns a function to be used with ow.ch.subscribe for REST communication to the provided aURL intended to be used with ow.ch.server.*. Optionally aUUID can be provided to ignore subscriber request using it. Optionally you can provide aLogin, aPassword and/or a aTimeout (in ms).


ow.ch.create

ow.ch.create(aName, shouldCompress, type, options) : ow.ch

Creates a channel of key/values with aName. Optionally you can specify if keys should also  be compressed in memory (shouldCompress = true), a channels implementation type and corresponding options in a map.


ow.ch.destroy

ow.ch.destroy(aName) : ow.ch

Deletes all data and removes references to the channel aName.


ow.ch.forEach

ow.ch.forEach(aName, aFunction) : ow.ch

Will execute the provided aFunction with each key and value for the channel identified by the aName provided.


ow.ch.get

ow.ch.get(aName, aKey) : ow.ch

Returns the value associated with the provided aKey on the channel identified by aName.


ow.ch.getAll

ow.ch.getAll(aName, fullInfo) : Array

Will return all values for the channel identified by the aName provided.


ow.ch.getKeys

ow.ch.getKeys(aName, full) : Array

Returns all keys in the form of an array for the channel identified by the aName provided. Optionally you can specify full = yes to obtain the detailed internal key index.


ow.ch.getSet

ow.ch.getSet(aName, aMatchMap, aKey, aValue, aTimestamp, aUUID)

Tries to update the channel identified by the provided aName with aKey and aValue only if it's possible to get the same aKey and  aMatchMap matches the current aValue. Optionally you can provide the internal aTimestamp (if the provided aTimestamp is smaller than getVersion the value will not be set)


ow.ch.getSortedKeys

ow.ch.getSortedKeys(aName, full) : Array

Returns all keys in the form of an array for the channel identified by the aName provided by order of the last modification. Optionally you can specify full = yes to obtain the detailed internal key index.


ow.ch.getVersion

ow.ch.getVersion(aName) : Number

Returns the current version (usually a timestamp) for the aName channel.


ow.ch.list

ow.ch.list() : Array

Returns a list the current channels available.


ow.ch.persistence.create

ow.ch.persistence.create(aChannel, aFilename, anArrayOfKeys, shouldCompress)

Adds a channel identified by the name aChannel, trying to restore data from aFilename given anArrayOfKeys (strings representing the key fields). Optionally indicating if keys should be compressed in memory with shouldCompress = true.


ow.ch.persistence.getSubscriberFunc

ow.ch.persistence.getSubscriberFunc(aFilename)

Returns a function to be used with ow.ch.subscribe persisting any existing or new data into aFilename provided.


ow.ch.persistence.restore

ow.ch.persistence.restore(aChannel, aFilename, anArrayOfKeys) : Number

Tries to restore that previously persisted using ow.ch.getSubscribeFunc from the aFilename using  anArrayOfKeys (strings representing the string list of fields used as index). The values will be restored to the aChannel name provided. Returns -1 in case of error or the data loaded length otherwise.


ow.ch.pop

ow.ch.pop(aName) : Object

Mimics a LIFO queue behaviour by returning the last modified key/value entry for the channel aName. The entry will be removed from the channel.


ow.ch.push

ow.ch.push(aName, aKey, aValue) : ow.ch

Equivalent to ow.ch.set trying to set, for the channel aName, aKey and aValue trying to mimic a queue behaviour.


ow.ch.server.expose

ow.ch.server.expose(aName, aLocalPortORServer, aPath, aAuthFunc, aUnAuthFunc, noCheck) : String

Given aName channel and aLocalPortORServer will use the provided server, or start a simple http server on the  provided port, to expose access to the aName channel on the URL aPath. It will return an unique identifier to be use to identify incoming requests from this aPath and server on channel subscribe functions. Optionally you can also provide aAuthFunc(user, pass, aServer, aRequest) and aUnAuthFunc(aServer, aRequest) functions using ow.server.httpd.authBasic. The aAuthFunc can add aRequest.channelPermission to enforce read and/or write permissions on a channel (e.g. "r", "rw"). If needed you can ignore the checking if the aName channel exists with noCheck.

Example:

// Exposing the a log dump channel on port 8090 for test proposes.
ow.ch.server.expose("__log", 8090, "/log");


ow.ch.server.peer

ow.ch.server.peer(aName, aLocalPortORServer, aPath, aRemoteURLArray, aAuthFunc, aUnAuthFunc)

Exposes aName channel in the same way as ow.ch.server.expose but it also add a subscribe function to  the aName channel to remotely peer with other expose channel(s) given aRemoteURLArray. Optionally you can also provide aAuthFunc(user, pass) and aUnAuthFunc(aServer, aRequest) functions using ow.server.httpd.authBasic. The aAuthFunc can add aRequest.channelPermission to enforce read and/or write permissions on a channel (e.g. "r", "rw").

Example:

ow.ch.server.peer("__log", 8090, [ "http://server1.local:8090/log", "https://l:p@server2.local:8090/log" ]);


ow.ch.server.routeProcessing

ow.ch.server.routeProcessing(aURI, aRequest, aName) : HTTPServerReply

Creates a server route processing for a ow.ch.comms client provided aURI, a HTTPServer aRequest and the  aName of a channel (please note that you should create the channel prior to using this function). Depending on aRequest.channelPermission some operations may not execute:

  • "r" - allows channel get/getAll operations
  • "w" - allows channel set/setAll/getSet/unset operations

Example:

var hs = ow.loadServer().httpd.start(17878);
ow.server.httpd.route(hs, { "/rest": function(r) { return ow.ch.server.routeProcessing("/rest", r, "my-channel") }});

for multiple channels:
var hs = ow.loadServer().httpd.start(17878);
ow.server.httpd.route(hs, {
  "/chan1": function(r) { return ow.ch.server.routeProcessing("/chan1", r, "chan-1") },
  "/chan2": function(r) { return ow.ch.server.routeProcessing("/chan2", r, "chan-2") }
};


ow.ch.server.setLog

ow.ch.server.setLog(aLogFunction)

Sets aLogFunction to act as audit for external communication to access a channel. The aLogFunction will be called passing, as a single argument, a map with:
  - name    (the channel name)
  - op      (the operation can be AUTH_OK, AUTH_NOT_OK, GET, SET, REMOVE or CREATE)
  - request (the HTTP request map)

The request, when available, will include an entry with the current user.


ow.ch.set

ow.ch.set(aName, aKey, aValue, aTimestamp)

Tries to insert/update the channel identified by the provided aName with aKey and aValue. Optionally you can provide the internal aTimestamp (if the provided aTimestamp is smaller than getVersion the value will not be set)


ow.ch.setAll

ow.ch.setAll(aName, anArrayOfKeys, anArrayOfMapData, aTimestamp)

Given anArrayOfKeys composed of strings identifying the key fields from the anArrayOfMapData provided will try to insert/update all values on the channel identified by aName. Optionally you can provide aTimestamp  (if the provided aTimestamp is smaller than getVersion the value will not be set)


ow.ch.shift

ow.ch.shift(aName) : Object

Mimics a FIFO queue behaviour by returning the first modified key/value entry for the channel aName. The entry will be removed from the channel.


ow.ch.size

ow.ch.size(aName) : Number

Returns the number of keys currently available for the channel aName.


ow.ch.stopAllJobs

ow.ch.stopAllJobs(aName) : ow.ch

Each channel subscription (using ow.ch.subscribe) will create internal jobs (threads). To stop all these immediately provide the channel aName.


ow.ch.subscribe

ow.ch.subscribe(aName, aFunction, onlyFromNowm, anId) : String

Adds a callback function to the channel aName. The callback function will receive, as arguments: the channel name, the operation, a key or an array of keys (for operation = setall), a value or an array  of values (for operation = setall) and the ow.ch object. Returns the subscriber id. Optionally you can specify that existing elements won't trigger operation = set callback calls and/or a custom subscriber anId.

Possible operations:
  - set
  - setall
  - unset


ow.ch.types.elasticsearch

ow.ch.types.elasticsearch

This OpenAF implementation connects to an ElasticSearch (ES) server/cluster. The creation options are:

  - index (String/Function) The ES index to use or a function to return the name (see also ow.ch.utils.getElasticIndex).
  - idKey (String) The ES key id field. Defaults to '_id'.
  - url (String) The HTTP(S) URL to access the ES server/cluster.
  - user (String) Optionally provide a user name to access the ES server/cluster.
  - pass (String) Optionally provide a password to access the ES server/cluster (encrypted or not).

The getAll/getKeys functions accept an extra argument to provide a ES query map to restrict the results.


ow.ch.types.ignite

ow.ch.types.ignite

This channel type will use an Ignite Data Grid. The creation options are:

  - ignite (Ignite) Use a previously instantiated Ignite plugin (defaults to a new instance).
  - gridName (String) Use a specific Ignite grid name.


ow.ch.types.mvs

ow.ch.types.mvs

The channel type mvs uses H2 MVStore to keep key/value structures either in memory or in files. The creation options are:

  - file (String) If not defined it will default to the in-memory implementation otherwise a file.
  - shouldCompress (Boolean) Specifies if it should compress the entire structure or not.
  - compact (Boolean) Upon channel create/destroy it will try to run a compact operation over the file to save space.
  - map (String/Function) The map name (defaults to 'default'). If defined as a function it will receive the key as argument if possible (only for get/set/unset/setall) for sharding proposes.

The map will be created if it doesn't exist.


ow.ch.unset

ow.ch.unset(aName, aKey, aTimestamp) : ow.obj.channel

Tries to remove all associations to the provided aKey on the channel identified by aName. Optionally aTimestamp can be provided.


ow.ch.unsubscribe

ow.ch.unsubscribe(aName, aId) : ow.ch

Tries to unsubscribe aId callback from the channel identified by the aName provided.


ow.ch.unsubscribeAll

ow.ch.unsubscribeAll(aName) : ow.ch

Tries to unsubscribe all callback functions from the channel identified by the aName provided.


ow.ch.utils.getElasticIndex

ow.ch.utils.getElasticIndex(aPrefix, aFormat) : Function

Returns a function to be used for generating ElasticSearch indexes with aPrefix-aDate (in the format of YYYY.MM.DD). This helps to generate a specific index per day. If a specific format is needed you can provided it as aFormat (see ow.format.fromDate)).


ow.ch.utils.getFileHousekeepSubscriber

ow.ch.utils.getFileHousekeepSubscriber(aFolder, aRegExPattern, howLongAgoInMinutes, dontCompress, aBackupFolder) : Function

Returns a channel subscriber function to perform file housekeep (specially useful with ow.ch.utils.getLogFilePerDate) given the main aFolder where just the newest file whose filename matches aRegExPattern (e.g. "log\d{4}-\d{2}-\d{2}\.log") will be kept uncompressed. All other files will be gziped (if dontCompress = false) and moved to aBackupFolder (if defined, defaults to aFolder). If howLongAgoInMinutes is defined all  files older than now - howLongAgoInMinutes will be deleted from the aBackupFolder.


ow.ch.utils.getHousekeepSubscriber

ow.ch.utils.getHousekeepSubscriber(aTargetCh, maxNumberOfKeys) : Function

Returns a channel subscriber function that will keep the channel size to the maximum of maxNumberOfKeys (defaults to 100). If the number of keys is bigger than maxNumberOfKeys than it will perform a channel shift operation (that will, depending on the type of channel, remove the oldest element).


ow.ch.utils.getLogFilePerDate

ow.ch.utils.getLogFilePerDate(aLogFolder, aTemplate, aFileDateFormat, aLineTemplate, aLineDateFormat) : Function

Returns a function to be used to generate a log file in aLogFolder path. If the log file already exists it will append to it. You can customize the log filename format using a aTemplate (e.g. "log-{{timedate}}.log" by default). The "timedate" is defined on the aFileDateFormat (e.g. "yyyy-MM-dd" by day, "yyyy-MM-dd-HH" by hour (check ow.format.fromDate for more options)). Each line that will be append to the file can be defined by aLineTemplate (e.g. "{{timedate}} | {{type}} | {{message}}"  by default) where "type" is the of logging (INFO, WARN, ERROR), "message" the logged message and "timedate" is defined on the aLineDateFormat (e.g. "yyyy-MM-dd HH:mm:ss.SSS" by default (check ow.format.fromDate for more options)).


ow.ch.utils.getLogStashSubscriber

ow.ch.utils.getLogStashSubscriber(aTargetCh, aType, aHost, aErrorFunc, shouldHK, stampMap) : Function

Returns a channel subscriber function that will transform changes to a log channel (see startLog and the __log channel) and will replicate them in aTargetCh (this means expecting the value to have a 'd': date; 'm': message and 't' as the level/type). The value set on aTargetCh will follow the LogStash format setting type to aType and host to aHost. The id and key will be set  to a sha1 hash of the stringify version of the value being set. In case of error aErrorFunc will be invoked providing the exception as an argument. You can also indicate if you want to house keep the original channel to save the script's memory and a stampMap to force entries on all maps sent.


ow.ch.utils.getMirrorSubscriber

ow.ch.utils.getMirrorSubscriber(aTargetCh, aFunc) : Function

Returns a channel subscriber function that will mirror any changes to aTargetCh if aFunc returns true when invoked with the key being changed.


ow.ch.utils.keepHistory

ow.ch.utils.keepHistory(timeRepeatExpression, aName, aFunction, withKeys, historySize) : Object

For a given aName channel (preferably a temporary one) will execute aFunction periodically given a timeRepeatExpression (if a Number the ms interval, if a String a cron expression). That aFunction should return aMap for which the withKeys array will be used to determine the keys entries to set it on the provided aName channel (if withKeys is not provided, id will be assumed and filled with nowNano()). The channel aName will be filled with entries with the result of executing aFunction periodically thus keeping an history of results (by default 10 if a historySize is not provided). An object with a stop function will be returned so the keepHistory periodical behaviour of runnning aFunction and keeping the results in the channel aName.


ow.ch.utils.mvs.list

ow.ch.utils.mvs.list(aMVSFile) : Array

Returns a list of names of maps in the corresponding aMVSFile.


ow.ch.utils.mvs.remove

ow.ch.utils.mvs.remove(aMVSFile, aMapToRemove)

Removes aMapToRemove on the provided aMVSFile.


ow.ch.utils.mvs.rename

ow.ch.utils.mvs.rename(aMVSFile, anOriginalMap, aDestinationMap)

Renames anOriginalMap by aDestinationMap on the provided aMVSFile.


ow.ch.utils.setLogToFile

ow.ch.utils.setLogToFile(aConfigMap)

Shortcut to set OpenAF's logging into a rotating per date set of log files. You can set any of these options:

logFolder             (string)  Where the current log file should be written (defaults to '.')
filenameTemplate      (string)  ow.template for the log filename (defaults to 'log-{{timedate}}.log')
fileDateFormat        (string)  File date format to be used in filenameTemplate (defaults to 'yyyy-MM-dd')
lineTemplate          (string)  ow.template for each log line (defaults to '{{timedate}} | {{type}} | {{message}}\n')
lineDateFormat        (string)  Date format to be used in lineTemplate (defaults to 'yyyy-MM-dd HH:mm:ss.SSS')
HKRegExPattern        (string)  Housekeeping regular expression pattern to find log files (defaults to 'log-\d{4}-\d{2}-\d{2}\.log')
HKhowLongAgoInMinutes (number)  How many minutes of logs should be kept (if not defined won't delete files)
dontCompress          (boolean) Defines if older files should not be gzip (default to false)
backupFolder          (string)  If defined older log files will be moved to this folder (if not defined they won't be moved)
numberOfEntriesToKeep (number)  Number of OpenAF log channel entries to keep in memory (defaults to 100)
setLogOff             (boolean) Turns off console logging (defaults to false)


ow.ch.waitForJobs

ow.ch.waitForJobs(aName, aTimeout) : ow.ch

Each channel subscription (using ow.ch.subscribe) will create internal jobs (threads). To wait for  this jobs to finish provide the channel aName and, optionally, aTimeout.

Clone this wiki locally