参考文献:Hyperledger Fabric: A Distributed Operation System for Permissioned Blockchains
架构
整体结构
Fabric是一个许可区块链的分布式操作系统,可以执行多种编程语言编写的分布式应用。它能够在一个只能追加的数据结构中安全地跟踪执行历史,并且没有内置的加密账本。
Fabric使用了“执行-order-验证”的区块链架构,而没有遵顼标准的“order-执行”设计。其整体的分布式应用可以分为两个部分
-
智能合约,称为链码 (Chaincode) ,是一段实现了应用逻辑的程序代码,并在执行过程中运行。链码是Fabric分布式应用的核心部分,可能会被未受信任的开发者修改。有一种特殊的链码被用于管理区块链系统并维护参数,被称为系统链码 (system chaincode)
-
背书政策 (endorsement policy) 在验证阶段进行评价。许可政策无法被未受信任的开发者选择或修改。其在区块链中充当一个用于事物验证的静态库,且只能通过链码进行参数化。只有指定的管理员有权限使用系统管理功能修改。
“执行-排序-验证”结构示意图:
一个客户端向背书政策指定的peer节点(peers)发送交易。这个交易被特殊的peer节点执行并且记录输出信息,这一步骤被称为背书。执行完成后,交易进入了排序(ordering)阶段,在这一个阶段中,使用了一种可插入的共识协议来产生一个完成排序的已背书交易的序列,并按区块分组。这些交易被广播到所有的节点。这个序列的顺序由输出和状态的依赖性决定。在验证阶段,每个peer节点都需要根据背书政策和执行的一致性对已背书交易的状态变化进行验证。所有peer节点都已相同的顺序进行验证,因此结果具有确定性。
Fabric网络支持多个区块链链接到同一个排序服务上。每个区块链被称为一个通道(Channel),可以有不同的peer节点作为其成员。这些通道可以被用来分离不同的区块链之间的状态,但每个通道的共识并不一致,并且通道中的交易顺序是相互分开的。认为所有排序都是可信的部署,可以通过通道访问控制来实现对节点的控制。
其中的节点可分为:客户端、peer节点(其中一部分为背书节点)、排序服务节点(OSN, Ordering Service Nodes or orderers)
交易的三个阶段
执行阶段
在执行阶段,客户端签署并发送一个交易提案给一个或多个背书节点来执行(根据背书政策,每个chaincode都有特定的一组背书节点)。一个交易提案包含了提交客户端的身份(根据MSP),交易的载荷、参数、链码标识符、每个客户端只能使用一次的nonce(随机值或计数器),以及客户端标识符和nonce生成的交易标识符。
背书节点会对交易提案进行模拟,在指定的区块链链码上执行操作。模拟操作在背书节点的本地区块链状态中进行模拟,不与其它节点同步,也不会将模拟的结果永久化到帐本中。区块链的状态仍然由节点交易管理器(Peer Transaction Manager)维护。一个链码创建的状态只能限定在该链码上,不能被其他的链码直接访问。需要注意的是,链码只能维护GetState、PutState、DelState操作访问的内容(可能意思是:需要使用这些接口进行访问,不能直接修改,猜测状态权限为private)。给予适当的权限,链码可以调用同一个通道的链码,并访问其状态。
在模拟后,背书节点会产生一个writeset,其中包括模拟产生的状态更新,和一个readset,代表交易提案模拟的版本依赖(模拟时的所有密钥以及对应的版本号)。在模拟结束后,背书节点会以加密的方式签署一份“背书”消息,包括了writeset和readset(包括交易id和背书节点的一些数据),随后以响应的方式发送给客户端。客户端收集背书,直到满足链码的背书政策,开始交易。特别的,这要求政策决定的所有背书节点都返回相同的writeset和readset。然后,客户端将继续创建交易,并传递给排序阶段。
排序阶段
客户端收集到足够的背书后,将会把交易组装起来并发送给排序服务端。整个交易包括了载荷(payload)(包括参数的链码操作)、交易元数据、背书的集合。排序阶段对每个通道的所有提交建立了一个排序。排序服务端会将多个交易分成块,输出包含交易的哈希链序列,以提高广播协议的吞吐量。
broadcast(tx)
:客户端调用这个函数来广播交易tx
,包含了载荷和签名。B
$\leftarrow$deliver(s)
:客户端调用这个函数来获取非负序列号s
的区块B
,$B=([tx_1,tx_2,\ldots,tx_k],h)$,$h$为s-1
区块的哈希值。
排序服务确保了一个通道中的交付区块被完全排序,确保安全。
然而,每一个单独的排序实现都允许在客户端请求中保证自己的活跃性与公平性。
由于区块链中包含了大量的节点,但仅有少部分节点实现了排序服务,因此Fabric可以配置使用内置的gossip服务,将排序服务中交付的区块分发给所有的节点。
验证阶段
排序服务会把区块直接分发给各个节点(或通过gossip)。随后,一个新的区块进入验证阶段,包含三个连续步骤:
- 并行执行区块中所有的交易的背书政策评估。评估是验证系统链码(VSCC, validation system chaincode)的任务。VSCC是一个静态库,是区块链配置的一部分,负责根据链码中的背书政策验证背书。如果不满足,这个交易会被标记为无效,并被忽略。
- 对块中的所有交易进行读写冲突检查(read-write conflict check)(版本号比较)。对于每个交易,将会对比其中readset的版本号和节点本地存储的账本当前状态中的版本号,确保版本相同。如果版本不匹配,交易会被标记为无效,并被忽略。
- 最后进行账本的更新阶段,在这个阶段,区块被追加到本地存储的账本中,并且更新区块链的状态。将区块添加到账本时,前两步的检查结果将被持久化以掩码的形式表示区块中的交易有效。这有助于后面进行重建状态。此外,所有的状态更新都是通过将writeset中的键值对写入本地完成的。
信任与故障模型
Fabric可以适应灵活的信任和故障假设。通常情况下,所有的客户端都被认为是潜在的恶意用户或是Byzantine。节点都被归入组织(organization)并且每个组织组成一个信任域(trust domain)。每个节点信任其组织内部的其他节点,而不信任其他组织的节点。排序服务同样认为所有的节点(或客户端)都是潜在的拜占庭。
Fabric网络的完整性依赖于排序服务的一致性。排序服务的信任模型取决于其实现。
在Fabric中,分布式应用可以定义自己的信任假设,通过背书政策来传达,并且独立于排序服务实现的共识的信任假设。
组件
Fabric使用了gRPC架构实现客户端、节点和排序服务之间的通信。
What is gRPC?
A high-performance, open-source universal RPC framework
gRPC框架具有高性能,开源,跨语言的特点,使用了RPC框架,基于HTTP/2设计。
RPC(remote procedure call,远程过程调用)框架提供了一套机制,使得应用程序之间可以进行通信,遵从server/client模型。
RPC将一个服务调用封装在一个本地方法中,让调用者像使用本地方法一样调用服务,对其屏蔽实现细节。具体的实现则通过调用方和服务方的协议,基于TCP连接进行数据交互达成。
在本地调用过程中,通常需要通过接口,调用具体实现,最终获取相应的数据。
而在RPC中,本地获取到接口,随后通过网络,调用远程的实现。在使用网络时,需要通过数据序列化来传输数据。
在gRPC中,支持多个语言的应用程序的远程调用,数据交换格式则采用了Protocol Buffer。
如下图所示,左侧为c++语言编写的客户端,其中包含了接口的具体实现。右侧为Ruby和Java客户端,其中对接口进行了调用。客户端获取接口后,将数据通过Protocol Buffer的序列化处理后交给服务端,服务端调用具体实现,并把数据同样以序列化的形式返回给客户端。
成员身份管理服务(MSP)
成员身份管理服务(MSP, membership service provider)维护了系统中所有节点(客户端、节点、排序服务)的身分,并负责发布节点的凭证,用于认证和授权。节点之间的所有交互都是通过已认证的消息(通常是数字签名)发生的。成员身份管理包括每个节点的组件,可以认证交易、验证交易的完整性、签署并确认背书、验证其他的区块链操作。MSP中还包括了用于密钥管理和节点注册的工具。
MSP是抽象的,可以有不同的实例。Fabric中,默认的MSP实现可以处理基于数字签名的PKI认证方法,并且容纳商业认证机构(CA)。Fabric提供了独立的CA——Fabric-CA。
Fabric允许设置区块链网络的两种模式。在离线模式下,凭证由CA生成,并分发到所有节点。peer节点和排序节点只能在离线模式下注册。对于客户端的注册,Fabric-CA提供了在线模式,向客户端发送加密凭证。MSP的配置必须要确保所有的节点都能识别相同的身份和认证认定为有效(特别是所有的peer节点)。
例如,当多个组织在同一个区块链网络中时,MSP需要能够允许身份的联合。每个组织向自己的成员发放身份,每个peer节点都能识别所有组织的成员。这可以通过多个MSP实例来实现。例如,在每个组织和一个MSP之间建立一个映射。
排序服务
排序服务可管理多个通道,对每个通道,都提供如下三个服务:
- Atomic broadcast,用于对交易进行排序,实现广播和分发
- 重新配置(reconfiguration)通道,成员通过广播一个配置更新事物(configuration update transaction)修改通道。
- (可选择)访问控制(access control),在这些配置中,排序服务作为可信的实体,限制交易的广播和指定peer节点和客户端的块的接收
排序服务通过系统通道上的生成块进行引导。生成块携带了配置事务(configuration transaction),定义了排序服务的操作。
当前版本的实现由OSN组成,OSN实现了描述的操作并且通过系统通道通信。实际的atomic broadcast函数由基于ZooKeeper的Kafka实例提供,这个实例提供了可以扩展的发布-订阅消息,在节点崩溃后仍具有强一致性(consistency)。这个实现可以运行在与OSN分离的物理节点上。OSN作为peers和Kafka实例之间的代理。
OSN直接将收到的新交易注入到广播中。OSN将从广播中接收到的交易转换成块。只要满足三个情况之一,块就会被断开。
- 区块包含了交易允许的最大值
- 区块达到最大大小
- 从接收到第一个交易后超过某特定时间
批处理的过程是确定的,因此在所有节点上都会产生相同的区块。考虑到从广播中接收的交易数据流,前两项情况是已经确定的。对于第三个条件,当节点读入块中的第一个交易时,会开启一个计时器。如果计时器超时后,这个区块仍然没有断开,OSN会在通道上广播一个特殊的“到时断开(time-to-cut)”交易,其中记录了需要断开的区块的序列号。另一方面,每个OSN在收到“到时断开”交易后,会根据其给定的序列号,立即切断新的区块。这个交易是原子性的分发给所有连接的OSN,所以区块中都包含相同的交易列表。OSN将最近交付的一系列区块直接持久化到文件系统中,因此可以通过分发回答peer节点来回收区块。
Peer Gossip
将三个阶段分开的好处是在开发的时候可以独立扩展。然而,由于共识算法往往有带宽限制,排序服务的吞吐量被节点的网络容量所限制。共识无法通过增加节点而扩展,反而会让吞吐量降低。然而,由于排序和验证是分开的,在排序阶段后,重要的是如何有效地广播执行阶段的结果,以交给peer节点进行验证。此外,还有如何对新加入的peer节点和长期断开的peer节点进行状态转移。这两个问题就是gossip组件所需要解决的。Fabric gossip利用流行组播(epidemic multicast)来解决这个问题。区块是由排序服务签署的。这意味着一个peer节点在收到所有区块后,可以独立的组装为区块链并验证其完整性。
gossip的通信基于gRPC,利用TLS(传输层安全协议)进行相互识别,使得每一方都可以将TLS凭证与远程peer节点的身份绑定。gossip组件用来维护系统中当前在线的peer节点的成员视图。所有的peer节点通过定期传播成员数据独立建立一个本地视图。此外,一个peer节点可以在网络崩溃或者中断后重新连入成员视图。
Fabric的gossip组件采用了两个阶段进行信息的传播:
- 在push阶段,每个peer节点从成员视图中选择一个随机的活跃邻居集合,并向他们发送信息
- 在pull阶段,每个peer节点定期探测一个随机选择的peer节点集合,并请求缺失的信息
研究表明,同时使用这两种方法可以优化可用带宽并确保所有peer节点有很高的概率接收到所有信息。
为了减少从排序节点向网络中发送的区块的负载,协议同时在peer中选择了一个领导者(elects a leader peer),代表peer节点从排序服务中pull区块,并启动gossip分发。这个机制对领导者出现故障具有弹性。
分布式账本(Ledger)
账本组件在各个peer节点中维持分布式账本并建立持久性的存储,并实现账本的模拟、验证和更新阶段。整体上由一个区块存储(block store)和一个peer节点交易管理器(peer transaction manager)组成。
区块存储(Ledger Block Store)
账本区块存储持久化交易区块,并以一组仅追加文件的形式实现。由于区块无法被改变,并以一个特定的顺序到达,一个仅追加的结构能够带来最好的性能。此外,区块存储中维护了一些索引(indices),用来随机访问区块或区块中的交易。
peer交易管理器(Peer Transaction Manager)
peer节点交易管理器(PTM)保持版本键值对处于最新状态。它能为每一个唯一的条目(entry)的key以(key,val, ver)存储一个元组,其中包含了最近存储的值val,和最新的版本号var。版本由区块序列号和区块内的交易序列号组成,因此版本号都是唯一且递增的。PTM使用了本地键值存储来识别版本变量,具体实现使用了LevelDB和Apache CouchDB。
在模拟时,PTM提供了一个稳定的最新状态快照。执行阶段中提到,PTM在readset中为GetState访问的每个条目记录一个元组(key, ver),在writeset中为交易调用PutState更新的每个条目记录一个元组(key, val)。此外,PTM支持范围查询(range queries),并计算出查询结果的加密哈希(一组(key, ver)元组),并将查询字符串和哈希添加到readset中。
在验证阶段,PTM依次验证一个区块中的所有交易,检查交易是否与之前的任何交易产生冲突。对于readset中的所有key,如果readset中的版本记录与最新状态的版本不同,PTM就会把这个交易标记为无效。对于范围查找,PTM会重新执行查询并与之前的readset比较哈希值,确保不发生错误的读取。
崩溃处理
账本组件在更新时需要能处理peer节点的崩溃。
在接收到一个新区块后,PTM已经对其进行了验证,并使用了掩码将区块中的交易标记为有效或无效。此时,账本将区块写入账本的区块存储中,刷新到磁盘,随后更新区块存储的索引。随后,PTM将所有有效交易的writeset状态变化应用到本地存储中。最后,计算并持久化一个savepoint的值,表示成功应用区块数量的最大值。从崩溃中恢复时,savepoint值用来从持久化块中恢复索引和最新状态。
链码执行
Chaincode在一个与其余peer节点松散耦合的环境中执行。支持增添新的链码编程语言的插件。目前支持Go、Java和Node。
每个用户级或应用链码都在Docker容器环境中的独立进程中执行,这使得链码和peer节点之间相互隔离,简化了链码的生命周期管理(启动、停止、中止)。链码和peer节点通过gRCP通信。通过松散的耦合关系,peer节点不知道链码具体使用了什么语言实现。
不同于应用链码,系统链码直接在peer节点的进程中运行,可以实现Fabric所需要的特定功能,可以用于用户链码之间的隔离限制过多的情况。
配置和系统链码
Fabric通过通道配置(channel configuration)和系统链码(system chaincodes)进行定制。
Fabric中每个通道都会形成一个逻辑区块链。通道的配置由元数据进行维护,并永久保留在配置区块(configuration blocks)中。每个配置区块都会包含完整的通道配置,并不会包括任何其他的东西。每个区块链都以一个配置区块开始,称之为创世块(genesis block),用于引导通道。通道的配置包括:
- 参与节点的MSP定义;
- OSN的网络地址;
- 共识实现和配许服务的共享配置,比如大小和超时等设置;
- 管理对排序服务操作(广播和分发)访问的规则;
- 管理如何修改通道配置的各个部分的规则。
通道的配置可以使用通道配置更新事务(channel configuration update transaction)进行更新,其中需要包括对配置进行的修改和一组签名。排序节点通过使用当前的配置验证该签名是否得到授权,来评估此次更新是否有效。然后,排序节点生成一个新的配置区块,嵌入新的配置和配置更新事务。Peer节点接收后,根据当前的配置验证更新是否得到了授权,有效则进行配置更新。
在部署应用链码时,会参考一个认可系统链码(endorsement system chaincode, ESCC)和一个验证系统链码(validation system chaincode, VSCC)。这两个链码的选择使得ESCC输出的背书,可以作为VSCC输入的一部分进行验证。ESCC将一个提案和提案模拟的结果作为输入。如果结果符合要求,则ESCC产生一个包含结果和认可的响应。对于默认的ESCC,这个背书只是peer节点的本地签名身份的签名。VSCC将一个事务作为输入,并输出该事务是否有效。对于默认的VSCC,背书被收集,并根据为链码指定的背书策略进行评估。进一步的系统链码实现其他支持功能,如链码生命周期。
系统链码的核心代码在/fabric/core/common/sysccprovider和/fabric/core/scc下
系统链码
与普通链码对比
系统链码 | 普通链码 | |
---|---|---|
链码源码 | 无main函数 | 有main函数 |
运行空间 | 背书节点进程 | Docker |
调用方式 | 网络+进程内部 | 网络 |
启动参数 | 内置 | 动态输入 |
通信方式 | Golang的通道机制 | 网络 |
数据存取 | Golang的通道+本地文件 | 网络 |
升级方式 | 和背书节点一起升级 | 单独升级 |
背书策略 | 无 | 有 |
系统链码在Peer节点上的注册与部署
peer节点存在于docker容器中,在启动时通过执行peer node start
命令来启动peer节点。
/internal/peer/node/start.go
start命令的入口函数为serve函数。
以下源代码中仅保留部署系统链码的部分。
func serve(args []string) error {
// Initialize chaincode service
// deploy system chaincodes
for _, cc := range []scc.SelfDescribingSysCC{lsccInst, csccInst, qsccInst, lifecycleSCC} {
if enabled, ok := chaincodeConfig.SCCWhitelist[cc.Name()]; !ok || !enabled {
logger.Infof("not deploying chaincode %s as it is not enabled", cc.Name())
continue
} // 判断链码的设置是否为enabled
scc.DeploySysCC(cc, chaincodeSupport) // 部署系统链码
}
logger.Infof("Deployed system chaincodes")
//...
}
通过遍历scc.SelfDescribingSysCC返回值,调用scc.DeploySysCC函数。
找到scc源文件:
corn/scc/scc.go
type SelfDescribingSysCC interface {
//Unique name of the system chaincode
Name() string
// Chaincode returns the underlying chaincode
Chaincode() shim.Chaincode
}
SelfDescribingSysCC
的第二个方法为Chaincode()
返回值为shim.Chaincode
类型。在core\scc\cscc\configure.go
中发现如下代码:
func (e *PeerConfiger) Chaincode() shim.Chaincode { return e }
可知start.go
中的cc
为每个链码的配置
type PeerConfiger struct {
policyChecker policy.PolicyChecker
configMgr config.Manager
aclProvider aclmgmt.ACLProvider
deployedCCInfoProvider ledger.DeployedChaincodeInfoProvider
legacyLifecycle plugindispatcher.LifecycleResources
newLifecycle plugindispatcher.CollectionAndLifecycleResources
peer *peer.Peer
bccsp bccsp.BCCSP
}
下面是DeploySysCC
的代码
type ChaincodeStreamHandler interface {
HandleChaincodeStream(ccintf.ChaincodeStream) error
LaunchInProc(packageID string) <-chan struct{}
} // 负责peer节点与链码之间的通信
func DeploySysCC(sysCC SelfDescribingSysCC, chaincodeStreamHandler ChaincodeStreamHandler) {
sysccLogger.Infof("deploying system chaincode '%s'", sysCC.Name())
ccid := ChaincodeID(sysCC.Name())
done := chaincodeStreamHandler.LaunchInProc(ccid)
peerRcvCCSend := make(chan *pb.ChaincodeMessage)
ccRcvPeerSend := make(chan *pb.ChaincodeMessage)
// 并行传输数据
go func() {
stream := newInProcStream(peerRcvCCSend, ccRcvPeerSend)
defer stream.CloseSend()
sysccLogger.Debugf("starting chaincode-support stream for %s", ccid)
err := chaincodeStreamHandler.HandleChaincodeStream(stream)
sysccLogger.Criticalf("shim stream ended with err: %v", err)
}()
go func(sysCC SelfDescribingSysCC) {
stream := newInProcStream(ccRcvPeerSend, peerRcvCCSend)
defer stream.CloseSend()
sysccLogger.Debugf("chaincode started for %s", ccid)
err := shim.StartInProc(ccid, stream, sysCC.Chaincode())
sysccLogger.Criticalf("system chaincode ended with err: %v", err)
}(sysCC)
<-done
}
因此,在serve.go代码中,对lscc,qscc,cscc三个系统链码进行相应的初始创建,随后根据配置信息部署到peer节点中。
查询系统链码 (Querier System Chaincode, QSCC)
源代码
/core/scc/qscc/query.go
在所有Peer节点上运行,提供账本查询接口,包括了区块查询、交易查询等。
package qscc
import (
"fmt" // 格式化IO
"strconv" // 数据类型转换
"github.com/hyperledger/fabric-chaincode-go/shim" // shim包提供API访问链码chaincode的状态变量,chaincode用来和peer沟通的接口
pb "github.com/hyperledger/fabric-protos-go/peer" // 调用.pb.go文件,由proto文件生成
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/core/aclmgmt"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/protoutil" // Proto Buffer的通用方法
)
// LedgerGetter gets the PeerLedger associated with a channel.
type LedgerGetter interface {
GetLedger(cid string) ledger.PeerLedger
}
// New returns an instance of QSCC.
// Typically this is called once per peer.
func New(aclProvider aclmgmt.ACLProvider, ledgers LedgerGetter) *LedgerQuerier {
return &LedgerQuerier{
aclProvider: aclProvider,
ledgers: ledgers,
}
}
// (e *LedgerQuerier) 表示这是LedgerQuerier结构体的方法
func (e *LedgerQuerier) Name() string { return "qscc" }
func (e *LedgerQuerier) Chaincode() shim.Chaincode { return e }
// LedgerQuerier implements the ledger query functions, including:
// - GetChainInfo returns BlockchainInfo
// - GetBlockByNumber returns a block
// - GetBlockByHash returns a block
// - GetTransactionByID returns a transaction
type LedgerQuerier struct {
aclProvider aclmgmt.ACLProvider
ledgers LedgerGetter
}
var qscclogger = flogging.MustGetLogger("qscc")
// 将函数名定义为字符串常量,用于invoke的第一个arg参数
const (
GetChainInfo string = "GetChainInfo"
GetBlockByNumber string = "GetBlockByNumber"
GetBlockByHash string = "GetBlockByHash"
GetTransactionByID string = "GetTransactionByID"
GetBlockByTxID string = "GetBlockByTxID"
)
// Init is called once per chain when the chain is created.
// This allows the chaincode to initialize any variables on the ledger prior
// to any transaction execution on the chain.
func (e *LedgerQuerier) Init(stub shim.ChaincodeStubInterface) pb.Response {
qscclogger.Info("Init QSCC")
return shim.Success(nil)
}
// 调用Invoke时,args[0]为查询函数名,args[1]为chain id,在加到stub包之前为暂时的id
// 每个函数都需要额外的参数,如下:
// # GetChainInfo: 返回一个以字节为单位的BlockchainInfo对象
// # GetBlockByNumber: 返回args[2]中所指定的块
// # GetBlockByHash: 返回args[2]中的块哈希值所指定的块
// # GetTransactionByID: 返回args[2]中的id所指定的交易事务
func (e *LedgerQuerier) Invoke(stub shim.ChaincodeStubInterface) pb.Response { // 通过args调用内部函数
args := stub.GetArgs() // stub是RPC中客户端与服务器端传输的消息包,将其解包并获取args
// args数量小于2时,返回错误信息
// 第一个args为调用的函数名,第二个args为chain ID
fname := string(args[0])
cid := string(args[1])
sp, err := stub.GetSignedProposal() // 获取签名交易提议的解码对象,类型为SignedProposal
name, err := protoutil.InvokedChaincodeName(sp.ProposalBytes)
targetLedger := e.ledgers.GetLedger(cid)
qscclogger.Debugf("Invoke function: %s on chain: %s", fname, cid)
// Handle ACL: (ACL:访问控制权限)
res := getACLResource(fname)
if err = e.aclProvider.CheckACL(res, cid, sp); err != nil {
return shim.Error(fmt.Sprintf("access denied for [%s][%s]: [%s]", fname, cid, err))
}
// 调用相应的函数并传递参数
switch fname {
case GetTransactionByID:
return getTransactionByID(targetLedger, args[2])
case GetBlockByNumber:
return getBlockByNumber(targetLedger, args[2])
case GetBlockByHash:
return getBlockByHash(targetLedger, args[2])
case GetChainInfo:
return getChainInfo(targetLedger)
case GetBlockByTxID:
return getBlockByTxID(targetLedger, args[2])
}
return shim.Error(fmt.Sprintf("Requested function %s not found.", fname))
}
// 以下为五个private方法,只能通过invoke进行调用。
// 五个方法在进行异常处理后,调用账本中对应的方法,并将成功或错误结果通过shim发送
// - getTransactionByID: 返回对应id的交易内容
func getTransactionByID(vledger ledger.PeerLedger, tid []byte) pb.Response {
// 异常:交易为空
processedTran, err := vledger.GetTransactionByID(string(tid)) // 调用ledger.PeerLedger中的方法
bytes, err := protoutil.Marshal(processedTran) // 使用proto工具进行编排 my_TODO: read protoutil
return shim.Success(bytes)
}
//以下四个方法的整体结果与第一个相似
func getBlockByNumber(vledger ledger.PeerLedger, number []byte) pb.Response {
bnum, err := strconv.ParseUint(string(number), 10, 64) // 转换成uint型
block, err := vledger.GetBlockByNumber(bnum) // 调用账本中的方法
bytes, err := protoutil.Marshal(block)
return shim.Success(bytes)
}
func getBlockByHash(vledger ledger.PeerLedger, hash []byte) pb.Response {
block, err := vledger.GetBlockByHash(hash)
bytes, err := protoutil.Marshal(block)
return shim.Success(bytes)
}
func getChainInfo(vledger ledger.PeerLedger) pb.Response {
binfo, err := vledger.GetBlockchainInfo()
bytes, err := protoutil.Marshal(binfo)
return shim.Success(bytes)
}
func getBlockByTxID(vledger ledger.PeerLedger, rawTxID []byte) pb.Response {
txID := string(rawTxID)
block, err := vledger.GetBlockByTxID(txID)
bytes, err := protoutil.Marshal(block)
return shim.Success(bytes)
}
func getACLResource(fname string) string {
return "qscc/" + fname
}
Get方法
QSCC提供的方法都是get方法,用于从通道中获取各种数据。
配置系统链码 (Configuration System Chaincode, CSCC)
/core/scc/cscc/configure.go
CSCC管理peer节点上通道相关的信息并执行通道配置交易
type PeerConfiger struct {
policyChecker policy.PolicyChecker
configMgr config.Manager
aclProvider aclmgmt.ACLProvider
deployedCCInfoProvider ledger.DeployedChaincodeInfoProvider
legacyLifecycle plugindispatcher.LifecycleResources
newLifecycle plugindispatcher.CollectionAndLifecycleResources
peer *peer.Peer
bccsp bccsp.BCCSP
}
CSCC中提供了PeerConfiger类。这个类实现了peer节点配置信息的处理。对于所有的从排序服务中进来的配置交易,都会调用这个系统链码来处理交易。
CSCC提供了三种方法
const (
JoinChain string = "JoinChain"
GetConfigBlock string = "GetConfigBlock"
GetChannels string = "GetChannels"
)
JoinChain方法
JoinChain 方法用来使一个peer加入通道。它需要一个参数,即通道配置区块的序列化的protobuf byte。
block, err := protoutil.UnmarshalBlock(args[1])
cid, err := protoutil.GetChainIDFromBlock(block)
// 1. 检查配置块的格式和要求
// 2. 检查加入的政策
// 初始化txsFilter
txsFilter := util.TxValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])
if len(txsFilter) == 0 {
// add array of validation code hardcoded to valid
txsFilter = util.NewTxValidationFlagsSetValue(len(block.Data.Data), pb.TxValidationCode_VALID)
block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] = txsFilter
}
return e.joinChain(cid, block, e.deployedCCInfoProvider, e.legacyLifecycle, e.newLifecycle)
GetConfigBlock方法
这个方法用于获取给定通道的当前的配置区块。需要一个参数:通道名字的bytes形式。
case GetConfigBlock:
// 2. check policy
if err = e.aclProvider.CheckACL(resources.Cscc_GetConfigBlock, string(args[1]), sp); err != nil {
return shim.Error(fmt.Sprintf("access denied for [%s][%s]: %s", fname, args[1], err))
}
return e.getConfigBlock(args[1])
GetChannel方法
这个方法用于获取peer节点目前所加入的通道。
case GetChannels:
// 2. check get channels policy
if err = e.aclProvider.CheckACL(resources.Cscc_GetChannels, "", sp); err != nil {
return shim.Error(fmt.Sprintf("access denied for [%s]: %s", fname, err))
}
return e.getChannels()
生命周期系统链码 (Life Cycle System Chaincode, LSCC)
LSCC主要管理链码的生命周期,主要包括:
- 在peer节点上安装链码
- 在通道上部署和升级链码
- 用户从运行中的链码获取信息
链码的生命周期
打包链码:
在被安装到peer节点之前,链码需要被打包进一个.tar.gz
文件,其中包含两个文件:“metadata.json"和另一个包含链码文件的文件"code.tar.gz”
“metadata.json"包含了指定链码语言、代码路径、以及包标签的JSON文件。
安装链码:
每个要执行和背书交易的peer节点上都需要安装链码包。安装完成后,peer节点会构造链码。一般建议每个组织下的所有peer使用相同的链码包。
批准链码定义:
通过 链码定义 来管理链码。当通道成员批准一个链码定义,这个批准便作为一个组织在接受链码参数方面的投票。这些同意的组织定义允许通道成员在链码可以在通道上使用之前达成一致意见(同意链码运行在此通道上)。
链码定义包含以下参数(需要在组织之间保持一致):名称、版本、序列号、背书策略、私有数据集合配置、ECSS/VSCC插件、初始化
提交链码定义到通道:
足够多的成员同意一个链码定义后,某个组织能够提交定义到通道。提交交易首先发送给通道成员的peer节点,peer节点会查询链码定义的被同意状况,确认组织同意后为其背书,交易随后被提交到排序服务,排序服务会把链码定义提交给通道。
源代码
部署时的config
lsccInst := &lscc.SCC{
BuiltinSCCs: builtinSCCs,
// type: FilesystemSupport
// Support提供了一些静态函数的实现
Support: &lscc.SupportImpl{
GetMSPIDs: peerInstance.GetMSPIDs,
},
// type: sysccprovider.SystemChaincodeProvider
// SCCProvider是用于访问系统其他部分的接口
SCCProvider: &lscc.PeerShim{Peer: peerInstance},
// type: aclmgmt.ACLProvider
// ACLProvider负责访问权限控制
ACLProvider: aclProvider,
GetMSPIDs: peerInstance.GetMSPIDs,
// type: policy.PolicyChecker
// PolicyChecker是用于执行访问控制的接口
PolicyChecker: policyChecker,
BCCSP: factory.GetDefault(),
BuildRegistry: buildRegistry,
ChaincodeBuilder: containerRouter,
EbMetadataProvider: ebMetadataProvider,
}
/core/scc/lscc/lscc.go
Invoke
代码的形式与QSCC部分类似,都是使用switch语句选择所调用的方法(将函数名定义为常量)
由于代码过长,以下代码省略异常处理部分:
func (lscc *SCC) Invoke(stub shim.ChaincodeStubInterface) pb.Response {
args := stub.GetArgs()
function := string(args[0]) // 第一个参数是函数名
switch function {
case INSTALL:
depSpec := args[1]
err := lscc.executeInstall(stub, depSpec) // 根据第二个参数 deployment spec 安装链码
return shim.Success([]byte("OK"))
case DEPLOY, UPGRADE:
// 至少有三个参数: 函数名,链码名和deployment spec
channel := string(args[1])
ac, exists := lscc.SCCProvider.GetApplicationConfig(channel) // 通过第二个参数获取配置
depSpec := args[2]
cds := &pb.ChaincodeDeploymentSpec{}
// 可选参数:
// 第四个参数是 已编组的 SignaturePolicyEnvelope 代表了背书政策
// 第五个参数是 escc名
// 第六个参数是 vscc名
// 第七个参数是 一个已编组的 CollectionConfigPackage 类
// 第四个参数:
var EP []byte
if len(args) > 3 && len(args[3]) > 0 {
EP = args[3] // EP为一个背书政策
} else {
mspIDs := lscc.GetMSPIDs(channel)
p := policydsl.SignedByAnyMember(mspIDs) // 根据通道的mspID获取背书政策?
EP, err = protoutil.Marshal(p) // 将结果进行编组
}
// 第五个参数:
var escc []byte
if len(args) > 4 && len(args[4]) > 0 {
escc = args[4]
} else {
escc = []byte("escc") // 默认为escc
}
// 第六个参数:
var vscc []byte
if len(args) > 5 && len(args[5]) > 0 {
vscc = args[5]
} else {
vscc = []byte("vscc") // 默认为vscc
}
// 如果有第七个参数:
var collectionsConfig []byte
if ac.Capabilities().PrivateChannelData() && len(args) > 6 {
collectionsConfig = args[6]
}
// 将所有的参数传入executeDeployOrUpgrade方法中执行
cd, err := lscc.executeDeployOrUpgrade(stub, channel, cds, EP, escc, vscc, collectionsConfig, function)
return shim.Success(cdbytes)
case CCEXISTS, CHAINCODEEXISTS, GETDEPSPEC, GETDEPLOYMENTSPEC, GETCCDATA, GETCHAINCODEDATA:
// 变量个数必须为3
channel := string(args[1])
ccname := string(args[2])
cdbytes, err := lscc.getCCInstance(stub, ccname) // 获取实例
switch function {
case CCEXISTS, CHAINCODEEXISTS:
cd, err := lscc.getChaincodeData(ccname, cdbytes) // 获取链码数据
case GETCCDATA, GETCHAINCODEDATA:
return shim.Success(cdbytes)
case GETDEPSPEC, GETDEPLOYMENTSPEC:
_, depspecbytes, err := lscc.getCCCode(ccname, cdbytes)
return shim.Success(depspecbytes)
default:
panic("unreachable")
}
case GETCHAINCODES, GETCHAINCODESALIAS:
// 变量个数必须为1
return lscc.getChaincodes(stub) // 调用对应方法
case GETINSTALLEDCHAINCODES, GETINSTALLEDCHAINCODESALIAS:
// 变量个数必须为1
return lscc.getInstalledChaincodes() // 调用对应方法
case GETCOLLECTIONSCONFIG, GETCOLLECTIONSCONFIGALIAS:
// 变量个数必须为2
chaincodeName := string(args[1])
return lscc.getChaincodeCollectionData(stub, chaincodeName) // 调用对应方法
}
return shim.Error(InvalidFunctionErr(function).Error())
}
Install方法:
用于存储chaincode程序到peer的文件系统,需要一个参数,及chancode deployment spec的序列化protobuf bytes。
Deploy方法:
用于在给定的通道上实例化合约,可以接受五个参数,前两个参数是必须的:通道名称与chaincode deployment spec。另外三个参数为:倍数策略、背书系统合约的名字和验证系统合约的名字。
Upgrade方法:
用于升级合约
Get方法:
剩下的get方法都用于获取相应的合约数据
背书系统链码 (Endorser System Chaincode, ESCC)
在背书节点上运行,对交易结束进行结构转换和签名背书。
/core/endorser/endorser.go
package endorser
import (
"context"
"fmt"
"strconv"
"time"
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric-chaincode-go/shim"
pb "github.com/hyperledger/fabric-protos-go/peer"
"github.com/hyperledger/fabric-protos-go/transientstore"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/util"
"github.com/hyperledger/fabric/core/chaincode/lifecycle"
"github.com/hyperledger/fabric/core/common/ccprovider"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/internal/pkg/identity"
"github.com/hyperledger/fabric/msp"
"github.com/hyperledger/fabric/protoutil"
"github.com/pkg/errors"
"go.uber.org/zap"
)
var endorserLogger = flogging.MustGetLogger("endorser")
// The Jira issue that documents Endorser flow along with its relationship to
// the lifecycle chaincode - https://jira.hyperledger.org/browse/FAB-181
//go:generate counterfeiter -o fake/prvt_data_distributor.go --fake-name PrivateDataDistributor . PrivateDataDistributor
type PrivateDataDistributor interface {
DistributePrivateData(channel string, txID string, privateData *transientstore.TxPvtReadWriteSetWithConfigInfo, blkHt uint64) error
}
// Support contains functions that the endorser requires to execute its tasks
type Support interface {
identity.SignerSerializer
// GetTxSimulator returns the transaction simulator for the specified ledger
// a client may obtain more than one such simulator; they are made unique
// by way of the supplied txid
GetTxSimulator(ledgername string, txid string) (ledger.TxSimulator, error)
// GetHistoryQueryExecutor gives handle to a history query executor for the
// specified ledger
GetHistoryQueryExecutor(ledgername string) (ledger.HistoryQueryExecutor, error)
// GetTransactionByID retrieves a transaction by id
GetTransactionByID(chid, txID string) (*pb.ProcessedTransaction, error)
// IsSysCC returns true if the name matches a system chaincode's
// system chaincode names are system, chain wide
IsSysCC(name string) bool
// Execute - execute proposal, return original response of chaincode
Execute(txParams *ccprovider.TransactionParams, name string, input *pb.ChaincodeInput) (*pb.Response, *pb.ChaincodeEvent, error)
// ExecuteLegacyInit - executes a deployment proposal, return original response of chaincode
ExecuteLegacyInit(txParams *ccprovider.TransactionParams, name, version string, spec *pb.ChaincodeInput) (*pb.Response, *pb.ChaincodeEvent, error)
// ChaincodeEndorsementInfo returns the information from lifecycle required to endorse the chaincode.
ChaincodeEndorsementInfo(channelID, chaincodeID string, txsim ledger.QueryExecutor) (*lifecycle.ChaincodeEndorsementInfo, error)
// CheckACL checks the ACL for the resource for the channel using the
// SignedProposal from which an id can be extracted for testing against a policy
CheckACL(channelID string, signedProp *pb.SignedProposal) error
// EndorseWithPlugin endorses the response with a plugin
EndorseWithPlugin(pluginName, channnelID string, prpBytes []byte, signedProposal *pb.SignedProposal) (*pb.Endorsement, []byte, error)
// GetLedgerHeight returns ledger height for given channelID
GetLedgerHeight(channelID string) (uint64, error)
// GetDeployedCCInfoProvider returns ledger.DeployedChaincodeInfoProvider
GetDeployedCCInfoProvider() ledger.DeployedChaincodeInfoProvider
}
//go:generate counterfeiter -o fake/channel_fetcher.go --fake-name ChannelFetcher . ChannelFetcher
// ChannelFetcher fetches the channel context for a given channel ID.
type ChannelFetcher interface {
Channel(channelID string) *Channel
}
type Channel struct {
IdentityDeserializer msp.IdentityDeserializer
}
// Endorser provides the Endorser service ProcessProposal
type Endorser struct {
ChannelFetcher ChannelFetcher
LocalMSP msp.IdentityDeserializer
PrivateDataDistributor PrivateDataDistributor
Support Support
PvtRWSetAssembler PvtRWSetAssembler
Metrics *Metrics
}
// call specified chaincode (system or user)
func (e *Endorser) callChaincode(txParams *ccprovider.TransactionParams, input *pb.ChaincodeInput, chaincodeName string) (*pb.Response, *pb.ChaincodeEvent, error) {
defer func(start time.Time) {
logger := endorserLogger.WithOptions(zap.AddCallerSkip(1))
logger = decorateLogger(logger, txParams)
elapsedMillisec := time.Since(start).Milliseconds()
logger.Infof("finished chaincode: %s duration: %dms", chaincodeName, elapsedMillisec)
}(time.Now())
meterLabels := []string{
"channel", txParams.ChannelID,
"chaincode", chaincodeName,
}
res, ccevent, err := e.Support.Execute(txParams, chaincodeName, input)
if err != nil {
e.Metrics.SimulationFailure.With(meterLabels...).Add(1)
return nil, nil, err
}
// per doc anything < 400 can be sent as TX.
// fabric errors will always be >= 400 (ie, unambiguous errors )
// "lscc" will respond with status 200 or 500 (ie, unambiguous OK or ERROR)
if res.Status >= shim.ERRORTHRESHOLD {
return res, nil, nil
}
// Unless this is the weirdo LSCC case, just return
if chaincodeName != "lscc" || len(input.Args) < 3 || (string(input.Args[0]) != "deploy" && string(input.Args[0]) != "upgrade") {
return res, ccevent, nil
}
// ----- BEGIN - SECTION THAT MAY NEED TO BE DONE IN LSCC ------
// if this a call to deploy a chaincode, We need a mechanism
// to pass TxSimulator into LSCC. Till that is worked out this
// special code does the actual deploy, upgrade here so as to collect
// all state under one TxSimulator
//
// NOTE that if there's an error all simulation, including the chaincode
// table changes in lscc will be thrown away
cds, err := protoutil.UnmarshalChaincodeDeploymentSpec(input.Args[2])
if err != nil {
e.Metrics.SimulationFailure.With(meterLabels...).Add(1)
return nil, nil, err
}
// this should not be a system chaincode
if e.Support.IsSysCC(cds.ChaincodeSpec.ChaincodeId.Name) {
e.Metrics.SimulationFailure.With(meterLabels...).Add(1)
return nil, nil, errors.Errorf("attempting to deploy a system chaincode %s/%s", cds.ChaincodeSpec.ChaincodeId.Name, txParams.ChannelID)
}
if len(cds.CodePackage) != 0 {
e.Metrics.SimulationFailure.With(meterLabels...).Add(1)
return nil, nil, errors.Errorf("lscc upgrade/deploy should not include a code packages")
}
_, _, err = e.Support.ExecuteLegacyInit(txParams, cds.ChaincodeSpec.ChaincodeId.Name, cds.ChaincodeSpec.ChaincodeId.Version, cds.ChaincodeSpec.Input)
if err != nil {
// increment the failure to indicate instantion/upgrade failures
meterLabels = []string{
"channel", txParams.ChannelID,
"chaincode", cds.ChaincodeSpec.ChaincodeId.Name,
}
e.Metrics.InitFailed.With(meterLabels...).Add(1)
return nil, nil, err
}
return res, ccevent, err
}
// SimulateProposal simulates the proposal by calling the chaincode
func (e *Endorser) SimulateProposal(txParams *ccprovider.TransactionParams, chaincodeName string, chaincodeInput *pb.ChaincodeInput) (*pb.Response, []byte, *pb.ChaincodeEvent, error) {
logger := decorateLogger(endorserLogger, txParams)
meterLabels := []string{
"channel", txParams.ChannelID,
"chaincode", chaincodeName,
}
// ---3. execute the proposal and get simulation results
res, ccevent, err := e.callChaincode(txParams, chaincodeInput, chaincodeName)
if err != nil {
logger.Errorf("failed to invoke chaincode %s, error: %+v", chaincodeName, err)
return nil, nil, nil, err
}
if txParams.TXSimulator == nil {
return res, nil, ccevent, nil
}
// Note, this is a little goofy, as if there is private data, Done() gets called
// early, so this is invoked multiple times, but that is how the code worked before
// this change, so, should be safe. Long term, let's move the Done up to the create.
defer txParams.TXSimulator.Done()
simResult, err := txParams.TXSimulator.GetTxSimulationResults()
if err != nil {
e.Metrics.SimulationFailure.With(meterLabels...).Add(1)
return nil, nil, nil, err
}
if simResult.PvtSimulationResults != nil {
if chaincodeName == "lscc" {
// TODO: remove once we can store collection configuration outside of LSCC
e.Metrics.SimulationFailure.With(meterLabels...).Add(1)
return nil, nil, nil, errors.New("Private data is forbidden to be used in instantiate")
}
pvtDataWithConfig, err := AssemblePvtRWSet(txParams.ChannelID, simResult.PvtSimulationResults, txParams.TXSimulator, e.Support.GetDeployedCCInfoProvider())
// To read collection config need to read collection updates before
// releasing the lock, hence txParams.TXSimulator.Done() moved down here
txParams.TXSimulator.Done()
if err != nil {
e.Metrics.SimulationFailure.With(meterLabels...).Add(1)
return nil, nil, nil, errors.WithMessage(err, "failed to obtain collections config")
}
endorsedAt, err := e.Support.GetLedgerHeight(txParams.ChannelID)
if err != nil {
e.Metrics.SimulationFailure.With(meterLabels...).Add(1)
return nil, nil, nil, errors.WithMessage(err, fmt.Sprintf("failed to obtain ledger height for channel '%s'", txParams.ChannelID))
}
// Add ledger height at which transaction was endorsed,
// `endorsedAt` is obtained from the block storage and at times this could be 'endorsement Height + 1'.
// However, since we use this height only to select the configuration (3rd parameter in distributePrivateData) and
// manage transient store purge for orphaned private writesets (4th parameter in distributePrivateData), this works for now.
// Ideally, ledger should add support in the simulator as a first class function `GetHeight()`.
pvtDataWithConfig.EndorsedAt = endorsedAt
if err := e.PrivateDataDistributor.DistributePrivateData(txParams.ChannelID, txParams.TxID, pvtDataWithConfig, endorsedAt); err != nil {
e.Metrics.SimulationFailure.With(meterLabels...).Add(1)
return nil, nil, nil, err
}
}
pubSimResBytes, err := simResult.GetPubSimulationBytes()
if err != nil {
e.Metrics.SimulationFailure.With(meterLabels...).Add(1)
return nil, nil, nil, err
}
return res, pubSimResBytes, ccevent, nil
}
// preProcess checks the tx proposal headers, uniqueness and ACL
func (e *Endorser) preProcess(up *UnpackedProposal, channel *Channel) error {
// at first, we check whether the message is valid
err := up.Validate(channel.IdentityDeserializer)
if err != nil {
e.Metrics.ProposalValidationFailed.Add(1)
return errors.WithMessage(err, "error validating proposal")
}
if up.ChannelHeader.ChannelId == "" {
// chainless proposals do not/cannot affect ledger and cannot be submitted as transactions
// ignore uniqueness checks; also, chainless proposals are not validated using the policies
// of the chain since by definition there is no chain; they are validated against the local
// MSP of the peer instead by the call to ValidateUnpackProposal above
return nil
}
// labels that provide context for failure metrics
meterLabels := []string{
"channel", up.ChannelHeader.ChannelId,
"chaincode", up.ChaincodeName,
}
// Here we handle uniqueness check and ACLs for proposals targeting a chain
// Notice that ValidateProposalMessage has already verified that TxID is computed properly
if _, err = e.Support.GetTransactionByID(up.ChannelHeader.ChannelId, up.ChannelHeader.TxId); err == nil {
// increment failure due to duplicate transactions. Useful for catching replay attacks in
// addition to benign retries
e.Metrics.DuplicateTxsFailure.With(meterLabels...).Add(1)
return errors.Errorf("duplicate transaction found [%s]. Creator [%x]", up.ChannelHeader.TxId, up.SignatureHeader.Creator)
}
// check ACL only for application chaincodes; ACLs
// for system chaincodes are checked elsewhere
if !e.Support.IsSysCC(up.ChaincodeName) {
// check that the proposal complies with the Channel's writers
if err = e.Support.CheckACL(up.ChannelHeader.ChannelId, up.SignedProposal); err != nil {
e.Metrics.ProposalACLCheckFailed.With(meterLabels...).Add(1)
return err
}
}
return nil
}
// ProcessProposal process the Proposal
func (e *Endorser) ProcessProposal(ctx context.Context, signedProp *pb.SignedProposal) (*pb.ProposalResponse, error) {
// start time for computing elapsed time metric for successfully endorsed proposals
startTime := time.Now()
e.Metrics.ProposalsReceived.Add(1)
addr := util.ExtractRemoteAddress(ctx)
endorserLogger.Debug("request from", addr)
// variables to capture proposal duration metric
success := false
up, err := UnpackProposal(signedProp)
if err != nil {
e.Metrics.ProposalValidationFailed.Add(1)
return &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}, err
}
var channel *Channel
if up.ChannelID() != "" {
channel = e.ChannelFetcher.Channel(up.ChannelID())
if channel == nil {
return &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: fmt.Sprintf("channel '%s' not found", up.ChannelHeader.ChannelId)}}, nil
}
} else {
channel = &Channel{
IdentityDeserializer: e.LocalMSP,
}
}
// 0 -- check and validate
err = e.preProcess(up, channel)
if err != nil {
return &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}, err
}
defer func() {
meterLabels := []string{
"channel", up.ChannelHeader.ChannelId,
"chaincode", up.ChaincodeName,
"success", strconv.FormatBool(success),
}
e.Metrics.ProposalDuration.With(meterLabels...).Observe(time.Since(startTime).Seconds())
}()
pResp, err := e.ProcessProposalSuccessfullyOrError(up)
if err != nil {
return &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}, nil
}
if pResp.Endorsement != nil || up.ChannelHeader.ChannelId == "" {
// We mark the tx as successfull only if it was successfully endorsed, or
// if it was a system chaincode on a channel-less channel and therefore
// cannot be endorsed.
success = true
// total failed proposals = ProposalsReceived-SuccessfulProposals
e.Metrics.SuccessfulProposals.Add(1)
}
return pResp, nil
}
func (e *Endorser) ProcessProposalSuccessfullyOrError(up *UnpackedProposal) (*pb.ProposalResponse, error) {
txParams := &ccprovider.TransactionParams{
ChannelID: up.ChannelHeader.ChannelId,
TxID: up.ChannelHeader.TxId,
SignedProp: up.SignedProposal,
Proposal: up.Proposal,
}
logger := decorateLogger(endorserLogger, txParams)
if acquireTxSimulator(up.ChannelHeader.ChannelId, up.ChaincodeName) {
txSim, err := e.Support.GetTxSimulator(up.ChannelID(), up.TxID())
if err != nil {
return nil, err
}
// txsim acquires a shared lock on the stateDB. As this would impact the block commits (i.e., commit
// of valid write-sets to the stateDB), we must release the lock as early as possible.
// Hence, this txsim object is closed in simulateProposal() as soon as the tx is simulated and
// rwset is collected before gossip dissemination if required for privateData. For safety, we
// add the following defer statement and is useful when an error occur. Note that calling
// txsim.Done() more than once does not cause any issue. If the txsim is already
// released, the following txsim.Done() simply returns.
defer txSim.Done()
hqe, err := e.Support.GetHistoryQueryExecutor(up.ChannelID())
if err != nil {
return nil, err
}
txParams.TXSimulator = txSim
txParams.HistoryQueryExecutor = hqe
}
cdLedger, err := e.Support.ChaincodeEndorsementInfo(up.ChannelID(), up.ChaincodeName, txParams.TXSimulator)
if err != nil {
return nil, errors.WithMessagef(err, "make sure the chaincode %s has been successfully defined on channel %s and try again", up.ChaincodeName, up.ChannelID())
}
// 1 -- simulate
res, simulationResult, ccevent, err := e.SimulateProposal(txParams, up.ChaincodeName, up.Input)
if err != nil {
return nil, errors.WithMessage(err, "error in simulation")
}
cceventBytes, err := CreateCCEventBytes(ccevent)
if err != nil {
return nil, errors.Wrap(err, "failed to marshal chaincode event")
}
prpBytes, err := protoutil.GetBytesProposalResponsePayload(up.ProposalHash, res, simulationResult, cceventBytes, &pb.ChaincodeID{
Name: up.ChaincodeName,
Version: cdLedger.Version,
})
if err != nil {
logger.Warning("Failed marshaling the proposal response payload to bytes", err)
return nil, errors.WithMessage(err, "failed to create the proposal response")
}
// if error, capture endorsement failure metric
meterLabels := []string{
"channel", up.ChannelID(),
"chaincode", up.ChaincodeName,
}
switch {
case res.Status >= shim.ERROR:
return &pb.ProposalResponse{
Response: res,
Payload: prpBytes,
}, nil
case up.ChannelID() == "":
// Chaincode invocations without a channel ID is a broken concept
// that should be removed in the future. For now, return unendorsed
// success.
return &pb.ProposalResponse{
Response: res,
}, nil
case res.Status >= shim.ERRORTHRESHOLD:
meterLabels = append(meterLabels, "chaincodeerror", strconv.FormatBool(true))
e.Metrics.EndorsementsFailed.With(meterLabels...).Add(1)
logger.Debugf("chaincode error %d", res.Status)
return &pb.ProposalResponse{
Response: res,
}, nil
}
escc := cdLedger.EndorsementPlugin
logger.Debugf("escc for chaincode %s is %s", up.ChaincodeName, escc)
// Note, mPrpBytes is the same as prpBytes by default endorsement plugin, but others could change it.
endorsement, mPrpBytes, err := e.Support.EndorseWithPlugin(escc, up.ChannelID(), prpBytes, up.SignedProposal)
if err != nil {
meterLabels = append(meterLabels, "chaincodeerror", strconv.FormatBool(false))
e.Metrics.EndorsementsFailed.With(meterLabels...).Add(1)
return nil, errors.WithMessage(err, "endorsing with plugin failed")
}
return &pb.ProposalResponse{
Version: 1,
Endorsement: endorsement,
Payload: mPrpBytes,
Response: res,
}, nil
}
// determine whether or not a transaction simulator should be
// obtained for a proposal.
func acquireTxSimulator(chainID string, chaincodeName string) bool {
if chainID == "" {
return false
}
// ¯\_(ツ)_/¯ locking.
// Don't get a simulator for the query and config system chaincode.
// These don't need the simulator and its read lock results in deadlocks.
switch chaincodeName {
case "qscc", "cscc":
return false
default:
return true
}
}
// shorttxid replicates the chaincode package function to shorten txids.
// ~~TODO utilize a common shorttxid utility across packages.~~
// TODO use a formal type for transaction ID and make it a stringer
func shorttxid(txid string) string {
if len(txid) < 8 {
return txid
}
return txid[0:8]
}
func CreateCCEventBytes(ccevent *pb.ChaincodeEvent) ([]byte, error) {
if ccevent == nil {
return nil, nil
}
return proto.Marshal(ccevent)
}
func decorateLogger(logger *flogging.FabricLogger, txParams *ccprovider.TransactionParams) *flogging.FabricLogger {
return logger.With("channel", txParams.ChannelID, "txID", shorttxid(txParams.TxID))
}
验证系统链码 (Validator System Chaincode, VSCC)
/core/committer/txvalidator/v14/vscc_validator.go
被记账节点(validator)调用,根据合约的背书政策验证交易的有效性和背书的正确性。
package txvalidator
import (
"fmt"
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric-protos-go/peer"
commonerrors "github.com/hyperledger/fabric/common/errors"
"github.com/hyperledger/fabric/common/policydsl"
"github.com/hyperledger/fabric/core/common/ccprovider"
"github.com/hyperledger/fabric/core/common/sysccprovider"
validation "github.com/hyperledger/fabric/core/handlers/validation/api"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/rwsetutil"
"github.com/hyperledger/fabric/protoutil"
"github.com/pkg/errors"
)
// VsccValidatorImpl is the implementation used to call
// the vscc chaincode and validate block transactions
type VsccValidatorImpl struct {
channelID string
cr ChannelResources
pluginValidator *PluginValidator
} // 用于调用vscc链码并验证区块事务
// newVSCCValidator creates new vscc validator
func newVSCCValidator(channelID string, cr ChannelResources, pluginValidator *PluginValidator) *VsccValidatorImpl {
return &VsccValidatorImpl{
channelID: channelID,
cr: cr,
pluginValidator: pluginValidator,
}
}
func getChaincodeHeaderExtension(hdr *common.Header) (*peer.ChaincodeHeaderExtension, error) {
chdr, err := protoutil.UnmarshalChannelHeader(hdr.ChannelHeader)
if err != nil {
return nil, err
}
chaincodeHdrExt := &peer.ChaincodeHeaderExtension{}
err = proto.Unmarshal(chdr.Extension, chaincodeHdrExt)
return chaincodeHdrExt, errors.Wrap(err, "error unmarshaling ChaincodeHeaderExtension")
}
// VSCCValidateTx executes vscc validation for transaction
func (v *VsccValidatorImpl) VSCCValidateTx(seq int, payload *common.Payload, envBytes []byte, block *common.Block) (error, peer.TxValidationCode) {
chainID := v.channelID
logger.Debugf("[%s] VSCCValidateTx starts for bytes %p", chainID, envBytes)
// get header extensions so we have the chaincode ID
hdrExt, err := getChaincodeHeaderExtension(payload.Header)
if err != nil {
return err, peer.TxValidationCode_BAD_HEADER_EXTENSION
}
// get channel header
chdr, err := protoutil.UnmarshalChannelHeader(payload.Header.ChannelHeader)
if err != nil {
return err, peer.TxValidationCode_BAD_CHANNEL_HEADER
}
/* obtain the list of namespaces we're writing stuff to;
at first, we establish a few facts about this invocation:
1) which namespaces does it write to?
2) does it write to LSCC's namespace?
3) does it write to any cc that cannot be invoked? */
writesToLSCC := false
writesToNonInvokableSCC := false
respPayload, err := protoutil.GetActionFromEnvelope(envBytes)
if err != nil {
return errors.WithMessage(err, "GetActionFromEnvelope failed"), peer.TxValidationCode_BAD_RESPONSE_PAYLOAD
}
txRWSet := &rwsetutil.TxRwSet{}
if err = txRWSet.FromProtoBytes(respPayload.Results); err != nil {
return errors.WithMessage(err, "txRWSet.FromProtoBytes failed"), peer.TxValidationCode_BAD_RWSET
}
// Verify the header extension and response payload contain the ChaincodeId
if hdrExt.ChaincodeId == nil {
return errors.New("nil ChaincodeId in header extension"), peer.TxValidationCode_INVALID_OTHER_REASON
}
if respPayload.ChaincodeId == nil {
return errors.New("nil ChaincodeId in ChaincodeAction"), peer.TxValidationCode_INVALID_OTHER_REASON
}
// get name and version of the cc we invoked
ccID := hdrExt.ChaincodeId.Name
ccVer := respPayload.ChaincodeId.Version
// sanity check on ccID
if ccID == "" {
err = errors.New("invalid chaincode ID")
logger.Errorf("%+v", err)
return err, peer.TxValidationCode_INVALID_OTHER_REASON
}
if ccID != respPayload.ChaincodeId.Name {
err = errors.Errorf("inconsistent ccid info (%s/%s)", ccID, respPayload.ChaincodeId.Name)
logger.Errorf("%+v", err)
return err, peer.TxValidationCode_INVALID_OTHER_REASON
}
// sanity check on ccver
if ccVer == "" {
err = errors.New("invalid chaincode version")
logger.Errorf("%+v", err)
return err, peer.TxValidationCode_INVALID_OTHER_REASON
}
var wrNamespace []string
alwaysEnforceOriginalNamespace := v.cr.Capabilities().V1_2Validation()
if alwaysEnforceOriginalNamespace {
wrNamespace = append(wrNamespace, ccID)
if respPayload.Events != nil {
ccEvent := &peer.ChaincodeEvent{}
if err = proto.Unmarshal(respPayload.Events, ccEvent); err != nil {
return errors.Wrapf(err, "invalid chaincode event"), peer.TxValidationCode_INVALID_OTHER_REASON
}
if ccEvent.ChaincodeId != ccID {
return errors.Errorf("chaincode event chaincode id does not match chaincode action chaincode id"), peer.TxValidationCode_INVALID_OTHER_REASON
}
}
}
namespaces := make(map[string]struct{})
for _, ns := range txRWSet.NsRwSets {
// check to make sure there is no duplicate namespace in txRWSet
if _, ok := namespaces[ns.NameSpace]; ok {
return errors.Errorf("duplicate namespace '%s' in txRWSet", ns.NameSpace),
peer.TxValidationCode_ILLEGAL_WRITESET
}
namespaces[ns.NameSpace] = struct{}{}
if !v.txWritesToNamespace(ns) {
continue
}
// Check to make sure we did not already populate this chaincode
// name to avoid checking the same namespace twice
if ns.NameSpace != ccID || !alwaysEnforceOriginalNamespace {
wrNamespace = append(wrNamespace, ns.NameSpace)
}
if !writesToLSCC && ns.NameSpace == "lscc" {
writesToLSCC = true
}
if !writesToNonInvokableSCC && IsSysCCAndNotInvokableCC2CC(ns.NameSpace) {
writesToNonInvokableSCC = true
}
if !writesToNonInvokableSCC && IsSysCCAndNotInvokableExternal(ns.NameSpace) {
writesToNonInvokableSCC = true
}
}
// we've gathered all the info required to proceed to validation;
// validation will behave differently depending on the type of
// chaincode (system vs. application)
if !IsSysCC(ccID) {
// if we're here, we know this is an invocation of an application chaincode;
// first of all, we make sure that:
// 1) we don't write to LSCC - an application chaincode is free to invoke LSCC
// for instance to get information about itself or another chaincode; however
// these legitimate invocations only ready from LSCC's namespace; currently
// only two functions of LSCC write to its namespace: deploy and upgrade and
// neither should be used by an application chaincode
if writesToLSCC {
return errors.Errorf("chaincode %s attempted to write to the namespace of LSCC", ccID),
peer.TxValidationCode_ILLEGAL_WRITESET
}
// 2) we don't write to the namespace of a chaincode that we cannot invoke - if
// the chaincode cannot be invoked in the first place, there's no legitimate
// way in which a transaction has a write set that writes to it; additionally
// we don't have any means of verifying whether the transaction had the rights
// to perform that write operation because in v1, system chaincodes do not have
// any endorsement policies to speak of. So if the chaincode can't be invoked
// it can't be written to by an invocation of an application chaincode
if writesToNonInvokableSCC {
return errors.Errorf("chaincode %s attempted to write to the namespace of a system chaincode that cannot be invoked", ccID),
peer.TxValidationCode_ILLEGAL_WRITESET
}
// validate *EACH* read write set according to its chaincode's endorsement policy
for _, ns := range wrNamespace {
// Get latest chaincode version, vscc and validate policy
txcc, vscc, policy, err := v.GetInfoForValidate(chdr, ns)
if err != nil {
logger.Errorf("GetInfoForValidate for txId = %s returned error: %+v", chdr.TxId, err)
return err, peer.TxValidationCode_INVALID_OTHER_REASON
}
// if the namespace corresponds to the cc that was originally
// invoked, we check that the version of the cc that was
// invoked corresponds to the version that lscc has returned
if ns == ccID && txcc.ChaincodeVersion != ccVer {
err = errors.Errorf("chaincode %s:%s/%s didn't match %s:%s/%s in lscc", ccID, ccVer, chdr.ChannelId, txcc.ChaincodeName, txcc.ChaincodeVersion, chdr.ChannelId)
logger.Errorf("%+v", err)
return err, peer.TxValidationCode_EXPIRED_CHAINCODE
}
// do VSCC validation
ctx := &Context{
Seq: seq,
Envelope: envBytes,
Block: block,
TxID: chdr.TxId,
Channel: chdr.ChannelId,
Namespace: ns,
Policy: policy,
VSCCName: vscc.ChaincodeName,
}
if err = v.VSCCValidateTxForCC(ctx); err != nil {
switch err.(type) {
case *commonerrors.VSCCEndorsementPolicyError:
return err, peer.TxValidationCode_ENDORSEMENT_POLICY_FAILURE
default:
return err, peer.TxValidationCode_INVALID_OTHER_REASON
}
}
}
} else {
// make sure that we can invoke this system chaincode - if the chaincode
// cannot be invoked through a proposal to this peer, we have to drop the
// transaction; if we didn't, we wouldn't know how to decide whether it's
// valid or not because in v1, system chaincodes have no endorsement policy
if IsSysCCAndNotInvokableExternal(ccID) {
return errors.Errorf("committing an invocation of cc %s is illegal", ccID),
peer.TxValidationCode_ILLEGAL_WRITESET
}
// Get latest chaincode version, vscc and validate policy
_, vscc, policy, err := v.GetInfoForValidate(chdr, ccID)
if err != nil {
logger.Errorf("GetInfoForValidate for txId = %s returned error: %+v", chdr.TxId, err)
return err, peer.TxValidationCode_INVALID_OTHER_REASON
}
// validate the transaction as an invocation of this system chaincode;
// vscc will have to do custom validation for this system chaincode
// currently, VSCC does custom validation for LSCC only; if an hlf
// user creates a new system chaincode which is invokable from the outside
// they have to modify VSCC to provide appropriate validation
ctx := &Context{
Seq: seq,
Envelope: envBytes,
Block: block,
TxID: chdr.TxId,
Channel: chdr.ChannelId,
Namespace: ccID,
Policy: policy,
VSCCName: vscc.ChaincodeName,
}
if err = v.VSCCValidateTxForCC(ctx); err != nil {
switch err.(type) {
case *commonerrors.VSCCEndorsementPolicyError:
return err, peer.TxValidationCode_ENDORSEMENT_POLICY_FAILURE
default:
return err, peer.TxValidationCode_INVALID_OTHER_REASON
}
}
}
logger.Debugf("[%s] VSCCValidateTx completes env bytes %p", chainID, envBytes)
return nil, peer.TxValidationCode_VALID
}
func (v *VsccValidatorImpl) VSCCValidateTxForCC(ctx *Context) error {
logger.Debug("Validating", ctx, "with plugin")
err := v.pluginValidator.ValidateWithPlugin(ctx)
if err == nil {
return nil
}
// If the error is a pluggable validation execution error, cast it to the common errors ExecutionFailureError.
if e, isExecutionError := err.(*validation.ExecutionFailureError); isExecutionError {
return &commonerrors.VSCCExecutionFailureError{Err: e}
}
// Else, treat it as an endorsement error.
return &commonerrors.VSCCEndorsementPolicyError{Err: err}
}
func (v *VsccValidatorImpl) getCDataForCC(chid, ccid string) (*ccprovider.ChaincodeData, error) {
l := v.cr.Ledger()
if l == nil {
return nil, errors.New("nil ledger instance")
}
qe, err := l.NewQueryExecutor()
if err != nil {
return nil, errors.WithMessage(err, "could not retrieve QueryExecutor")
}
defer qe.Done()
bytes, err := qe.GetState("lscc", ccid)
if err != nil {
return nil, &commonerrors.VSCCInfoLookupFailureError{
Reason: fmt.Sprintf("Could not retrieve state for chaincode %s, error %s", ccid, err),
}
}
if bytes == nil {
return nil, errors.Errorf("lscc's state for [%s] not found.", ccid)
}
cd := &ccprovider.ChaincodeData{}
err = proto.Unmarshal(bytes, cd)
if err != nil {
return nil, errors.Wrap(err, "unmarshalling ChaincodeQueryResponse failed")
}
if cd.Vscc == "" {
return nil, errors.Errorf("lscc's state for [%s] is invalid, vscc field must be set", ccid)
}
if len(cd.Policy) == 0 {
return nil, errors.Errorf("lscc's state for [%s] is invalid, policy field must be set", ccid)
}
return cd, err
}
// GetInfoForValidate gets the ChaincodeInstance(with latest version) of tx, vscc and policy from lscc
func (v *VsccValidatorImpl) GetInfoForValidate(chdr *common.ChannelHeader, ccID string) (*sysccprovider.ChaincodeInstance, *sysccprovider.ChaincodeInstance, []byte, error) {
cc := &sysccprovider.ChaincodeInstance{
ChannelID: chdr.ChannelId,
ChaincodeName: ccID,
}
vscc := &sysccprovider.ChaincodeInstance{
ChannelID: chdr.ChannelId,
ChaincodeName: "vscc", // default vscc for system chaincodes
}
var policy []byte
var err error
if !IsSysCC(ccID) {
// when we are validating a chaincode that is not a
// system CC, we need to ask the CC to give us the name
// of VSCC and of the policy that should be used
// obtain name of the VSCC and the policy
cd, err := v.getCDataForCC(chdr.ChannelId, ccID)
if err != nil {
msg := fmt.Sprintf("Unable to get chaincode data from ledger for txid %s, due to %s", chdr.TxId, err)
logger.Errorf(msg)
return nil, nil, nil, err
}
cc.ChaincodeName = cd.Name
cc.ChaincodeVersion = cd.Version
vscc.ChaincodeName, policy = cd.Vscc, cd.Policy
} else {
// when we are validating a system CC, we use the default
// VSCC and a default policy that requires one signature
// from any of the members of the channel
p := policydsl.SignedByAnyMember(v.cr.GetMSPIDs())
policy, err = protoutil.Marshal(p)
if err != nil {
return nil, nil, nil, err
}
}
return cc, vscc, policy, nil
}
// txWritesToNamespace returns true if the supplied NsRwSet
// performs a ledger write
func (v *VsccValidatorImpl) txWritesToNamespace(ns *rwsetutil.NsRwSet) bool {
// check for public writes first
if ns.KvRwSet != nil && len(ns.KvRwSet.Writes) > 0 {
return true
}
// only look at collection data if we support that capability
if v.cr.Capabilities().PrivateChannelData() {
// check for private writes for all collections
for _, c := range ns.CollHashedRwSets {
if c.HashedRwSet != nil && len(c.HashedRwSet.HashedWrites) > 0 {
return true
}
// only look at private metadata writes if we support that capability
if v.cr.Capabilities().KeyLevelEndorsement() {
// private metadata updates
if c.HashedRwSet != nil && len(c.HashedRwSet.MetadataWrites) > 0 {
return true
}
}
}
}
// only look at metadata writes if we support that capability
if v.cr.Capabilities().KeyLevelEndorsement() {
// public metadata updates
if ns.KvRwSet != nil && len(ns.KvRwSet.MetadataWrites) > 0 {
return true
}
}
return false
}
func IsSysCCAndNotInvokableExternal(name string) bool {
return name == "vscc" || name == "escc"
}
func IsSysCC(name string) bool {
return name == "vscc" || name == "escc" || name == "lscc" || name == "qscc" || name == "cscc"
}
func IsSysCCAndNotInvokableCC2CC(name string) bool {
return name == "vscc" || name == "escc" || name == "cscc"
}