In this doc, we explain how we can use the npm package **sfbulk2js** to load data into Salesforce using BulkAPI 2.0 ## Create a your nodejs project ``` $ mkdir awessome-project $ cd awessome-project/ $ npm init package name: (awessome-project) version: (1.0.0) 0.0.1 description: Project using sfbulk2js for loading data into Salesforce using BulkAPI 2.0 entry point: (index.js) test command: git repository: keywords: salesforce, bulkapi2.0, sfbulk2js author: mohan chinnappan license: (ISC) MIT About to write to /Users/mchinnappan/awessome-project/package.json: { "name": "awessome-project", "version": "0.0.1", "description": "Project using sfbulk2js for loading data into Salesforce using BulkAPI 2.0", "main": "index.js", "scripts": { "test": "echo \"Error: no test specified\" && exit 1" }, "keywords": [ "salesforce", "bulkapi2.0", "sfbulk2js" ], "author": "mohan chinnappan", "license": "MIT" } Is this OK? (yes) ```
## Install sfbulk2js npm package in your nodejs project ``` $ npm install -s sfbulk2js + sfbulk2js@0.0.1 added 2 packages from 2 contributors and audited 2 packages in 1.653s found 0 vulnerabilities ``` ### Now you are all set with the setup.
### Sample Code to load a CSV file into Case Object - You can create this CSV from data store (like Postgres, Spanner, MySQL...) for the Case Object - Here is the content of our CSV file: input.csv ``` $ cat input.csv Subject,Priority Engine cylinder has knocking,High Wiper Blade needs replacement,Low ``` ## Code to load the records in this CSV file to Salesforce Case Object using sfbulk2js ``` // filename:index.js // test file for sfbulk2js // author: mohan chinnappan (mar-18-2020) const sfb2 = require('sfbulk2js'); // the npm package we just installed const fs = require('fs'); const process = require('process'); // read access-token from the env const AT = process.env.AT; const cji = { instanceUrl: 'https://mohansun-fsc-21.my.salesforce.com', apiVersion: 'v46.0', accessToken: `${AT}`, contentType: 'CSV', lineEnding: 'LF' }; const waitTimeMs = 5000; function sleep(ms) { console.log('WAITING'); return new Promise(resolve => setTimeout(resolve, ms)); } async function dataload(datafile) { try { console.log(`=== CREATE JOB === `); const job = await sfb2.createJob(cji.instanceUrl, cji.apiVersion, cji.accessToken, 'insert', 'Case', cji.contentType, cji.lineEnding ); console.log(job); console.log(`jobId: ${job.id}`); console.log(`=== JOB STATUS === `); let jobStatus = await sfb2.getJobStatus(cji.instanceUrl, cji.apiVersion, cji.accessToken, 'ingest', job.id ); console.log(`=== JOB STATUS for job: ${job.id} ===`); console.log(jobStatus); console.log(`=== PUT DATA === `); const fdata = fs.readFileSync(datafile, 'utf8'); const putDataStatus = await sfb2.putData(cji.instanceUrl, cji.accessToken, job.contentUrl, fdata ); console.log(`=== JOB STATUS === `); jobStatus = await sfb2.getJobStatus(cji.instanceUrl, cji.apiVersion, cji.accessToken, 'ingest', job.id ); console.log(`=== JOB STATUS for job: ${job.id} ===`); console.log(jobStatus); console.log(`=== PATCH STATAE === `); const patchDataStatus = await sfb2.patchState(cji.instanceUrl, cji.apiVersion, cji.accessToken, job.id, 'UploadComplete' ); console.log(patchDataStatus); console.log(`=== JOB STATUS === `); jobStatus = await sfb2.getJobStatus(cji.instanceUrl, cji.apiVersion, cji.accessToken, 'ingest', job.id ); console.log(`=== JOB STATUS for job: ${job.id} ===`); console.log(jobStatus); while (jobStatus.state === 'InProgress') { // wait for it await sleep(waitTimeMs); jobStatus = await sfb2.getJobStatus(cji.instanceUrl, cji.apiVersion, cji.accessToken, 'ingest', job.id ); console.log(jobStatus); } console.log(`=== JOB Failure STATUS === `); jobStatus = await sfb2.getJobFailureStatus(cji.instanceUrl, cji.apiVersion, cji.accessToken, job.id ); console.log(`=== JOB Failure STATUS for job: ${job.id} ===`); console.log(jobStatus); console.log(`=== JOB getUnprocessedRecords STATUS === `); jobStatus = await sfb2.getUnprocessedRecords(cji.instanceUrl, cji.apiVersion, cji.accessToken, job.id ); console.log(`=== JOB getUnprocessedRecords STATUS for job: ${job.id} ===`); console.log(jobStatus); } catch (err) { console.log(`ERROR in dataload : ${err}`); } } // here we run it dataload('input.csv'); ```
## Results on the cli (console) ``` $ node index.js === CREATE JOB === { id: '7503h000000RcUEAA0', operation: 'insert', object: 'Case', createdById: '0053h000000IJFeAAO', createdDate: '2020-03-18T04:23:56.000+0000', systemModstamp: '2020-03-18T04:23:56.000+0000', state: 'Open', concurrencyMode: 'Parallel', contentType: 'CSV', apiVersion: 46, contentUrl: 'services/data/v46.0/jobs/ingest/7503h000000RcUEAA0/batches', lineEnding: 'LF', columnDelimiter: 'COMMA' } jobId: 7503h000000RcUEAA0 === JOB STATUS === === JOB STATUS for job: 7503h000000RcUEAA0 === { id: '7503h000000RcUEAA0', operation: 'insert', object: 'Case', createdById: '0053h000000IJFeAAO', createdDate: '2020-03-18T04:23:56.000+0000', systemModstamp: '2020-03-18T04:23:56.000+0000', state: 'Open', concurrencyMode: 'Parallel', contentType: 'CSV', apiVersion: 46, jobType: 'V2Ingest', contentUrl: 'services/data/v46.0/jobs/ingest/7503h000000RcUEAA0/batches', lineEnding: 'LF', columnDelimiter: 'COMMA', retries: 0, totalProcessingTime: 0, apiActiveProcessingTime: 0, apexProcessingTime: 0 } === PUT DATA === result: status: 201, statusText: Created === JOB STATUS === === JOB STATUS for job: 7503h000000RcUEAA0 === { id: '7503h000000RcUEAA0', operation: 'insert', object: 'Case', createdById: '0053h000000IJFeAAO', createdDate: '2020-03-18T04:23:56.000+0000', systemModstamp: '2020-03-18T04:23:56.000+0000', state: 'Open', concurrencyMode: 'Parallel', contentType: 'CSV', apiVersion: 46, jobType: 'V2Ingest', contentUrl: 'services/data/v46.0/jobs/ingest/7503h000000RcUEAA0/batches', lineEnding: 'LF', columnDelimiter: 'COMMA', numberRecordsProcessed: 0, numberRecordsFailed: 0, retries: 0, totalProcessingTime: 0, apiActiveProcessingTime: 0, apexProcessingTime: 0 } === PATCH STATAE === { id: '7503h000000RcUEAA0', operation: 'insert', object: 'Case', createdById: '0053h000000IJFeAAO', createdDate: '2020-03-18T04:23:56.000+0000', systemModstamp: '2020-03-18T04:23:56.000+0000', state: 'UploadComplete', concurrencyMode: 'Parallel', contentType: 'CSV', apiVersion: 46 } === JOB STATUS === === JOB STATUS for job: 7503h000000RcUEAA0 === { id: '7503h000000RcUEAA0', operation: 'insert', object: 'Case', createdById: '0053h000000IJFeAAO', createdDate: '2020-03-18T04:23:56.000+0000', systemModstamp: '2020-03-18T04:23:58.000+0000', state: 'InProgress', concurrencyMode: 'Parallel', contentType: 'CSV', apiVersion: 46, jobType: 'V2Ingest', lineEnding: 'LF', columnDelimiter: 'COMMA', numberRecordsProcessed: 0, numberRecordsFailed: 0, retries: 0, totalProcessingTime: 0, apiActiveProcessingTime: 0, apexProcessingTime: 0 } WAITING { id: '7503h000000RcUEAA0', operation: 'insert', object: 'Case', createdById: '0053h000000IJFeAAO', createdDate: '2020-03-18T04:23:56.000+0000', systemModstamp: '2020-03-18T04:23:59.000+0000', state: 'JobComplete', concurrencyMode: 'Parallel', contentType: 'CSV', apiVersion: 46, jobType: 'V2Ingest', lineEnding: 'LF', columnDelimiter: 'COMMA', numberRecordsProcessed: 2, numberRecordsFailed: 0, retries: 0, totalProcessingTime: 134, apiActiveProcessingTime: 47, apexProcessingTime: 0 } === JOB Failure STATUS === === JOB Failure STATUS for job: 7503h000000RcUEAA0 === "sf__Id","sf__Error",Priority,Subject === JOB getUnprocessedRecords STATUS === === JOB getUnprocessedRecords STATUS for job: 7503h000000RcUEAA0 === Subject,Priority ```
![case records](img/case-created-1.png)
- Create node js project with the following code in index.js ``` //------------------------------------------------------------------ // Code for the aws lambda function using Salesforce BulkAPI 2.0 // Mohan Chinnappan (jul-21-2020) //------------------------------------------------------------------ const sfb2 = require('sfbulk2js'); const AT = '00D3h000007R1Lu!AR0AQGyDeGEGhToyawRbG9rrc006T6.3Gi7V5JEU6X88ED7io7IUtzYDjhaLuQLQ_Ylp2TKux560e6rGzbQ8vAP7zd_hsYpy'; //AT can be read from env or from JWT flow (ref: https://github.com/mohan-chinnappan-n/bulkapi2-dx/blob/master/jwt.md) const opts = { instanceUrl: 'https://mohansun-ea-02-dev-ed.my.salesforce.com', // can be read from env apiVersion: 'v49.0', accessToken: `${AT}`, contentType: 'CSV', lineEnding: 'LF' }; const waitTimeMs = 3000; // adjust it as required exports.handler = async (event) => { // TODO: read the s3 bucket data here (max size: 150 MB) into fdata let fdata = ` Subject,Priority Engine cylinder has knocking,High Wiper Blade needs replacement,Low `; await data(fdata, 'Case', opts); return { statusCode: 200, body: "Ok"}; }; function sleep(ms) { //console.log('WAITING'); return new Promise(resolve => setTimeout(resolve, ms)); } async function dataload(fdata, sobj, cji) { try { console.log(`=== CREATE JOB === `); const job = await sfb2.createJob(cji.instanceUrl, cji.apiVersion, cji.accessToken, 'insert', sobj, cji.contentType, cji.lineEnding ); console.log(job); console.log(`jobId: ${job.id}`); console.log(`=== JOB STATUS === `); let jobStatus = await sfb2.getJobStatus(cji.instanceUrl, cji.apiVersion, cji.accessToken, 'ingest', job.id ); console.log(`=== JOB STATUS for job: ${job.id} ===`); console.log(jobStatus); console.log(`=== PUT DATA === `); const putDataStatus = await sfb2.putData(cji.instanceUrl, cji.accessToken, job.contentUrl, fdata ); console.log(`=== JOB STATUS === `); jobStatus = await sfb2.getJobStatus(cji.instanceUrl, cji.apiVersion, cji.accessToken, 'ingest', job.id ); console.log(`=== JOB STATUS for job: ${job.id} ===`); console.log(jobStatus); console.log(`=== PATCH STATAE === `); const patchDataStatus = await sfb2.patchState(cji.instanceUrl, cji.apiVersion, cji.accessToken, job.id, 'UploadComplete' ); console.log(patchDataStatus); console.log(`=== JOB STATUS === `); jobStatus = await sfb2.getJobStatus(cji.instanceUrl, cji.apiVersion, cji.accessToken, 'ingest', job.id ); console.log(`=== JOB STATUS for job: ${job.id} ===`); console.log(jobStatus); // TODO: This waiting may not required for lambda function to avoid lambda timeout // Modify it as needed while (jobStatus.state === 'InProgress') { // wait for it await sleep(waitTimeMs); jobStatus = await sfb2.getJobStatus(cji.instanceUrl, cji.apiVersion, cji.accessToken, 'ingest', job.id ); console.log(jobStatus); } console.log(`=== JOB Failure STATUS === `); jobStatus = await sfb2.getJobFailureStatus(cji.instanceUrl, cji.apiVersion, cji.accessToken, job.id ); console.log(`=== JOB Failure STATUS for job: ${job.id} ===`); console.log(jobStatus); console.log(`=== JOB getUnprocessedRecords STATUS === `); jobStatus = await sfb2.getUnprocessedRecords(cji.instanceUrl, cji.apiVersion, cji.accessToken, job.id ); console.log(`=== JOB getUnprocessedRecords STATUS for job: ${job.id} ===`); console.log(jobStatus); } catch (err) { console.log(`ERROR in dataload : ${err}`); } } ```
- Function Code ![Function Code](img/bulkapi2-aws-lambda-1.png) - package.json ```json { "name": "aws-bulkapi2", "version": "1.0.0", "description": "", "main": "index.js", "scripts": { "test": "echo \"Error: no test specified\" && exit 1" }, "keywords": [], "author": "Mohan Chinnappan", "license": "MIT", "dependencies": { "sfbulk2js": "0.0.1" } } ```
![Function Code Results](img/bulk-case-creation-2.png)
### Same package is available in python! ![py medium doc](img/py-bulkapi-2.png) - More info at: [Salesforce Bulk API 2.0](https://medium.com/@mohan.chinnappan.n/salesforce-bulk-api-2-0-95a7a81b9bb9) ``` """ bulkapi2-usage author: mohan chinnappan (mar-18-2020) pip install sfbulk2 pip install Faker """ instance_url = 'https://mohansun-fsc-21.my.salesforce.com' ## access_token can come from env var access_token = '00D3hmangledDC8N!ARcAQFAqd7iOlatMgWpfXmjfk0YRgtfnU0ntLE3OBZjJgn3Shqe9ssq7ZowsmoEhoAn_nHpjoIwVB40gBpmFbwx4sZR6b9RE' ## let us import SFBulk2 from sfbulk2 import SFBulk2 sfba2 = SFBulk2(api_version='v47.0', instance_url=instance_url, access_token=access_token) ## create bulkapi2 job job_id = sfba2.create_job(operation='insert', obj='Case') print ('job_id: {}'.format(job_id)) ## let us look at the job status res = sfba2.get_job_status(job_id=job_id, optype='ingest') print (res.json()) contentUrl = res.json()['contentUrl'] print(contentUrl) ## Insert Data ### you can read it from the database (Postgres, Spanner, MySQL...) and create this file data ="""Subject,Priority VPN2 Issues,High Proxy Server2 Issues,Low HVD2 Issues,High """ print (data) ## let us upload the data put_res = sfba2.put_data(data=data, contentUrl=contentUrl) print (put_res.content) ## get the job status res = sfba2.get_job_status(job_id=job_id, optype='ingest') print(res.json()) ## set the state to 'UploadComplete' patch_res = sfba2.patch_state(job_id=job_id, state="UploadComplete") print(patch_res.json()) ## get the job status res = sfba2.get_job_status(job_id=job_id, optype='ingest') print(res.json()) ## you will see the something like this """ {'apexProcessingTime': 0, 'apiActiveProcessingTime': 60, 'apiVersion': 47.0, 'columnDelimiter': 'COMMA', 'concurrencyMode': 'Parallel', 'contentType': 'CSV', 'createdById': '0053h000000IJFeAAO', 'createdDate': '2020-03-18T12:16:32.000+0000', 'id': '7503h000000RdYUAA0', 'jobType': 'V2Ingest', 'lineEnding': 'LF', 'numberRecordsFailed': 0, 'numberRecordsProcessed': 3, 'object': 'Case', 'operation': 'insert', 'retries': 0, 'state': 'JobComplete', 'systemModstamp': '2020-03-18T12:21:50.000+0000', 'totalProcessingTime': 168} """ ## check the failure status failure_res = sfba2.get_failure_status(job_id=job_id) print(failure_res.content) ## you will see something like this: """ b'"sf__Id","sf__Error",Priority,Subject\n' """ ```
![py-results](img/cases-py-1.png)
- Maximum number of records uploaded per rolling 24-hour period - **150,000,000** - Maximum number of times a batch can be retried. [See How Requests Are Processed](https://developer.salesforce.com/docs/atlas.en-us.api_bulk_v2.meta/api_bulk_v2/how_requests_are_processed.htm). - **The API automatically handles retries. If you receive a message that the API retried more than 10 times, use a smaller upload file and try again.** - Maximum file size - **150 MB** - Maximum number of characters in a field - **32,000** - Maximum number of fields in a record - **5,000** - Maximum number of characters in a record - **400,000** - [Refer: Bulk API 2.0 Limits](https://developer.salesforce.com/docs/atlas.en-us.api_bulk_v2.meta/api_bulk_v2/bulk_api_2_limits.htm)